锂电池的研究之六——基于 Pytorch 的 Transformer 锂电池寿命预测
原创博客,转载请注明出处,谢谢!
代码下载地址:
https://github.com/XiuzeZhou/RUL
参考论文:
D. Chen, W. Hong, and X. Zhou, "Transformer Network for Remaining Useful Life Prediction of Lithium-Ion Batteries", IEEE Access, 2022, vol. 10, pp. 19621-19628.
欢迎引用!
1. 需求
Transformer 在计算机各个领域的热度都快爆表了,锂电池相关方向的文章依旧空白。所以,动手写了篇 Transformer 和 锂电池的文章,希望能给他人带来帮助。
2. 问题定义
随着充放电次数的增加,锂电池的性能逐渐下降。电池的性能可以用容量来表示,故寿命预测 (RUL) 可以定义如下:
\(
{\textit{SOH}}(t)=\frac{C_t}{C_0}\times 100\%,
\)
其中,\(C_0\) 表示额定容量,\(C_t\) 表示 \(t\) 时刻的容量。等到 SOH 降到 70-80% 时,电池可以报废。
我们要做的是用电池的历史数据,比如电流、电压和容量,对电池的下降趋势进行建模。然后,用训练好的模型来预测电池的 RUL,示意图如下所示:
3. 模型介绍
模型总共有四个部分:输入、归一化、降噪和 Transformer:
输入和归一化:输入是有 n 个点的容量序列 \(\mathbf{x}=\left\{x_{1},x_{2},\dots,x_{n} \right\}\),将其归一化到 (0, 1]:
\(
\mathbf{x}'=\frac{\mathbf{x}}{C_0}
\)
\mathbf{x}'=\frac{\mathbf{x}}{C_0}
\)
降噪:原始的记录数据中,总是有不少的噪声数据。为了模型的稳定性,我们最好将这些数据先进行去噪。我们采用降噪自编码通过对原始数据进行重组达到降噪的目的。如果\(\widetilde{\mathbf{x}}_t\)是\(\mathbf{x}'_t\)加高斯噪声后的结果,那么降噪自编码公式如下:
\(
\mathbf{z}=a\left({W}^T\widetilde{\mathbf{x}}_t+{b} \right)
\)
\widehat{\mathbf{x}}_t=f'\left({W}'\bf{z} + {b}'\right)
\)
该模块的损失函数为:
\(
\mathcal L_d=\frac{1}{n}\sum_{t=1}^{n}
\ell(\widetilde{{x}}_t-\widehat{{x}}_t) +
\lambda \left ( \left \| {W} \right \|^2_{F} + \left \| {W'} \right \|^2_{F} \right)
\)
\mathcal L_d=\frac{1}{n}\sum_{t=1}^{n}
\ell(\widetilde{{x}}_t-\widehat{{x}}_t) +
\lambda \left ( \left \| {W} \right \|^2_{F} + \left \| {W'} \right \|^2_{F} \right)
\)
Transformer:该模块主要是通过从降噪后的数据中学到序列的下降趋势。关于 Transformer 的相关介绍网上有很多,这里只做简单介绍。
首先,我们用 cosine 函数来定义序列的位置信息:
\(
PE(t, 2k) =sin(t/10000^{2k/m})
\)
PE(t, 2k+1)=cos(t/10000^{2k/m})
\)
然后,用多头注意力来计算特征之间的关系:
\(
\textit{MultiHead}\left({H}^{l-1}\right) = [head_1;head_2;\cdots;head_h]{W}^{O}
\)
head_i = \textit{Attention}\left({H}^{l-1}{W}^{l}_{Q},{H}^{l-1}{W}^{l}_{K},{H}^{l-1}{W}^{l}_{V} \right)
\) \(
\textit{Attention}\left({Q},{K},{V} \right) = \textit{softmax} \left(\frac{{Q}{K}^T}{\sqrt{d_h} } \right){V}
\)
接着,我们将多头注意力的输出
\({H}^{l} = \textit{FFN}\left( \textit{MultiHead}\left( {H}^{l-1} \right) \right)
\) \(
\textit{FFN}\left({x} \right) = \textit{ReLU}\left({x}{W}_1 + {b}_1 \right){W}_2 + {b}_2
\)
预测:
\(
\widehat{x}_t=f\left({W}_p{H}^h + {b}_p\right)
\)
损失函数:
\(
\mathcal L=\sum_{t=T+1}^{n}\left(x_t-\widehat{x}_{t} \right)^2+\alpha\sum_{i=1}^{n}
\ell(\widetilde{\mathbf{x}}_i-\widehat{\mathbf{x}}_i)+\lambda\Omega \left(\Theta \right)
\)
4. 实验
4.1 数据集
我们主要用了两个锂电池寿命预测最常用的数据集:NASA 和 CALCE 数据集。NASA 是美国宇航局 NASA 埃姆斯研究中心提供的锂电池老化实验数据,CALCE 是马里兰大学高级生命周期工程中心的电池循环测试数据集。它们都包含四条电池的充放电记录。有关数据集的处理和提取,可以查看我的其他博客(NASA 锂电池数据集,CALCE 锂电池数据集)。
4.2 代码讲解
4.2.1 降噪自编码
该模块主要用于降噪,来自传感器的数据常常带有不少噪声。这些噪声会对网络的训练进行干扰。所以,训练前最好能进行降噪处理。
class Autoencoder(nn.Module):
def __init__(self, input_size=16, hidden_dim=8, noise_level=0.01):
super(Autoencoder, self).__init__()
self.input_size, self.hidden_dim, self.noise_level = input_size, hidden_dim, noise_level
self.fc1 = nn.Linear(self.input_size, self.hidden_dim)
self.fc2 = nn.Linear(self.hidden_dim, self.input_size)
def encoder(self, x):
x = self.fc1(x)
h1 = F.relu(x)
return h1
def mask(self, x):
corrupted_x = x + self.noise_level * torch.randn_like(x)
return corrupted_x
def decoder(self, x):
h2 = self.fc2(x)
return h2
def forward(self, x):
out = self.mask(x)
encode = self.encoder(out)
decode = self.decoder(encode)
return encode, decode
4.2.2 主网络
整个网络模块总共有两个部分:降噪和 Transformer 网络。
class Net(nn.Module):
def __init__(self, feature_size=16, hidden_dim=32, num_layers=1, nhead=8,
dropout=0.0, noise_level=0.01, is_autoencoder=False):
super(Net, self).__init__()
self.is_autoencoder = is_autoencoder
self.auto_hidden = int(feature_size/2)
input_size = self.auto_hidden if is_autoencoder else feature_size
self.pos = PositionalEncoding(d_model=input_size, max_len=input_size)
encoder_layers = nn.TransformerEncoderLayer(d_model=input_size, nhead=nhead,
dim_feedforward=hidden_dim, dropout=dropout)
self.cell = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
self.linear = nn.Linear(input_size, 1)
if self.is_autoencoder:
self.autoencoder = Autoencoder(input_size=feature_size,
hidden_dim=self.auto_hidden, noise_level=noise_level)
def forward(self, x):
batch_size, feature_num, feature_size = x.shape
if self.is_autoencoder:
encode, decode = self.autoencoder(x.reshape(batch_size, -1))# batch_size*seq_len
out = encode.reshape(batch_size, -1, self.auto_hidden)
else:
decode = x.reshape(batch_size, -1)
out = x
out = self.pos(out)
out = out.reshape(1, batch_size, -1) # (1, batch_size, feature_size)
out = self.cell(out)
out = out.reshape(batch_size, -1) # (batch_size, hidden_dim)
out = self.linear(out) # out shape: (batch_size, 1)
return out, decode
4.2.3 训练函数
def train(lr=0.01, feature_size=8, hidden_dim=32, num_layers=1, nhead=8,
weight_decay=0.0, EPOCH=1000, seed=0, is_autoencoder=True, alpha=0.0,
noise_level=0.0, dropout=0.0, metric='re', is_load_weights=True):
score_list, result_list = [], []
setup_seed(seed)
for i in range(4):
name = Battery_list[i]
window_size = feature_size
train_x, train_y, train_data, test_data = get_train_test(Battery, name,
window_size)
# print('sample size: {}'.format(train_size))
model = Net(feature_size=feature_size, hidden_dim=hidden_dim,
num_layers=num_layers, nhead=nhead, dropout=dropout,
is_autoencoder=is_autoencoder, noise_level=noise_level)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr,
weight_decay=weight_decay)
criterion = nn.MSELoss()
'''
# save ramdom data for repetition
if torch.__version__.split('+')[0] >= '1.6.0':
torch.save(model.state_dict(), 'model_NASA'+str(seed)+'.pth')
else:
torch.save(model.state_dict(),
'model_CALCE.pth', _use_new_zipfile_serialization=False)
'''
# load the random data generated by my device
if is_load_weights:
if torch.__version__.split('+')[0] >= '1.6.0':
model.load_state_dict(torch.load('model_NASA.pth'))
else:
model.load_state_dict(torch.load('model_NASA_1.5.0.pth'))
test_x = train_data.copy()
loss_list, y_ = [0], []
rmse, re = 1, 1
score_, score = [1],[1]
for epoch in range(EPOCH):
X = np.reshape(train_x/Rated_Capacity,(-1, 1,
feature_size)).astype(np.float32)
#(batch_size, seq_len, input_size)
y = np.reshape(train_y[:,-1]/Rated_Capacity,(-1,1)).astype(np.float32)
# shape 为 (batch_size, 1)
X, y = torch.from_numpy(X).to(device), torch.from_numpy(y).to(device)
output, decode = model(X)
output = output.reshape(-1, 1)
loss = criterion(output, y) + alpha * criterion(
decode, X.reshape(-1, feature_size))
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
if (epoch + 1)%10 == 0:
test_x = train_data.copy()
point_list = []
while (len(test_x) - len(train_data)) < len(test_data):
x = np.reshape(np.array(test_x[-feature_size:])/Rated_Capacity,
(-1, 1, feature_size)).astype(np.float32)
x = torch.from_numpy(x).to(device)
# (batch_size,feature_size=1,input_size)
pred, _ = model(x) # pred shape (batch_size=1, feature_size=1)
next_point = pred.data.cpu().numpy()[0,0] * Rated_Capacity
test_x.append(next_point)
point_list.append(next_point)
# Saves the predicted value of the last point in the output sequence
y_.append(point_list) # Save all the predicted values
loss_list.append(loss)
rmse = evaluation(y_test=test_data, y_predict=y_[-1])
re = relative_error(
y_test=test_data, y_predict=y_[-1], threshold=Rated_Capacity*0.7)
if metric == 're':
score = [re]
elif metric == 'rmse':
score = [rmse]
else:
score = [re, rmse]
if (loss < 1e-3) and (score_[0] < score[0]):
break
score_ = score.copy()
score_list.append(score_)
result_list.append(y_[-1])
return score_list, result_list
4.2.4 主函数
如下参数是我的电脑用网格搜索法找到比较优的参数。如果你的电脑结果和我不一样,可以加载我最优结果时电脑生成的随机权重。当然,最好是用 4.4.5 网格搜索法重新训练模型获取最有参数。
Rated_Capacity = 2.0
window_size = 16
feature_size = window_size
is_autoencoder = True
dropout = 0.0
EPOCH = 2000
nhead = 8
hidden_dim = 16
num_layers = 1
lr = 0.01 # learning rate
weight_decay = 0.0
noise_level = 0.0
alpha = 1e-5
is_load_weights = True
metric = 're'
seed = 0
SCORE = []
print('seed:{}'.format(seed))
score_list, _ = train(lr=lr, feature_size=feature_size, hidden_dim=hidden_dim,
num_layers=num_layers, nhead=nhead, weight_decay=weight_decay,
EPOCH=EPOCH, seed=seed, dropout=dropout,
is_autoencoder=is_autoencoder, alpha=alpha,
noise_level=noise_level, metric=metric,
is_load_weights=is_load_weights)
print(np.array(score_list))
for s in score_list:
SCORE.append(s)
print('------------------------------------------------------------------')
print(metric + ' mean: {:<6.4f}'.format(np.mean(np.array(SCORE))))
4.2.5 网格搜索法
网格搜索法重新训练模型获取最有参数。
Rated_Capacity = 2.0
window_size = 16
feature_size = window_size
is_autoencoder = True
dropout = 0.0
EPOCH = 2000
nhead = 8
is_load_weights = False
weight_decay = 0.0
noise_level = 0.0
alpha = 0.0
metric = 're'
states = {}
for lr in [1e-3, 1e-2]:
for num_layers in [1, 2]:
for hidden_dim in [16, 32]:
for alpha in [1e-5, 1e-4]:
show_str = 'lr={}, num_layers={}, hidden_dim={}, alpha={}'.format(
lr, num_layers, hidden_dim)
print(show_str)
SCORE = []
for seed in range(5):
print('seed:{}'.format(seed))
score_list, _ = train(lr=lr, feature_size=feature_size,
hidden_dim=hidden_dim, num_layers=num_layers,
nhead=nhead, weight_decay=weight_decay,
EPOCH=EPOCH, seed=seed, dropout=dropout,
is_autoencoder=is_autoencoder, alpha=alpha,
noise_level=noise_level, metric=metric,
is_load_weights=is_load_weights)
print(np.array(score_list))
print(metric + ': {:<6.4f}'.format(np.mean(np.array(score_list))))
print('----------------------------------------------------------------')
for s in score_list:
SCORE.append(s)
print(metric + ' mean: {:<6.4f}'.format(np.mean(np.array(SCORE))))
states[show_str] = np.mean(np.array(SCORE))
print('===================================================================')
min_key = min(states, key = states.get)
print('optimal parameters: {}, result: {}'.format(min_key, states[min_key]))
更多内容
1. NASA 锂电池数据集,基于 Python 的锂电池寿命预测: https://snailwish.com/395/
2. CALCE 锂电池数据集,基于 Python 数据处理: https://snailwish.com/437/
3. NASA 锂电池数据集,基于 python 的 MLP 锂电池寿命预测: https://snailwish.com/427/
4. NASA 锂电池数据集,基于 python 的 MLP 锂电池寿命预测: https://snailwish.com/464/
5. NASA 和 CALCE 锂电池数据集,基于 Pytorch 的 RNN、LSTM、GRU 寿命预测: https://snailwish.com/497/
6. 锂电池研究之七——基于 Pytorch 的高斯函数拟合时间序列数据: https://snailwish.com/576/