原创博客,转载请注明出处,谢谢!

代码下载地址:

https://github.com/XiuzeZhou/RUL

参考论文:

D. Chen, W. Hong, and X. Zhou, "Transformer Network for Remaining Useful Life Prediction of Lithium-Ion Batteries", IEEE Access, 2022, vol. 10, pp. 19621-19628.

欢迎引用!

1. 需求

Transformer 在计算机各个领域的热度都快爆表了,锂电池相关方向的文章依旧空白。所以,动手写了篇 Transformer 和 锂电池的文章,希望能给他人带来帮助。

2. 问题定义

随着充放电次数的增加,锂电池的性能逐渐下降。电池的性能可以用容量来表示,故寿命预测 (RUL) 可以定义如下:
\(
{\textit{SOH}}(t)=\frac{C_t}{C_0}\times 100\%,
\)

其中,\(C_0\) 表示额定容量,\(C_t\) 表示 \(t\) 时刻的容量。等到 SOH 降到 70-80% 时,电池可以报废。

我们要做的是用电池的历史数据,比如电流、电压和容量,对电池的下降趋势进行建模。然后,用训练好的模型来预测电池的 RUL,示意图如下所示:

3. 模型介绍

模型总共有四个部分:输入、归一化、降噪和 Transformer:

输入和归一化:输入是有 n 个点的容量序列 \(\mathbf{x}=\left\{x_{1},x_{2},\dots,x_{n} \right\}\),将其归一化到 (0, 1]:
\(
\mathbf{x}'=\frac{\mathbf{x}}{C_0}
\)

\(
\mathbf{x}'=\frac{\mathbf{x}}{C_0}
\)

降噪:原始的记录数据中,总是有不少的噪声数据。为了模型的稳定性,我们最好将这些数据先进行去噪。我们采用降噪自编码通过对原始数据进行重组达到降噪的目的。如果\(\widetilde{\mathbf{x}}_t\)是\(\mathbf{x}'_t\)加高斯噪声后的结果,那么降噪自编码公式如下:
\(
\mathbf{z}=a\left({W}^T\widetilde{\mathbf{x}}_t+{b} \right)
\)

\(
\widehat{\mathbf{x}}_t=f'\left({W}'\bf{z} + {b}'\right)
\)

该模块的损失函数为:
\(
\mathcal L_d=\frac{1}{n}\sum_{t=1}^{n}
\ell(\widetilde{{x}}_t-\widehat{{x}}_t) +
\lambda \left ( \left \| {W} \right \|^2_{F} + \left \| {W'} \right \|^2_{F} \right)
\)

\(
\mathcal L_d=\frac{1}{n}\sum_{t=1}^{n}
\ell(\widetilde{{x}}_t-\widehat{{x}}_t) +
\lambda \left ( \left \| {W} \right \|^2_{F} + \left \| {W'} \right \|^2_{F} \right)
\)

Transformer:该模块主要是通过从降噪后的数据中学到序列的下降趋势。关于 Transformer 的相关介绍网上有很多,这里只做简单介绍。

首先,我们用 cosine 函数来定义序列的位置信息:
\(
PE(t, 2k) =sin(t/10000^{2k/m})
\)

\(
PE(t, 2k+1)=cos(t/10000^{2k/m})
\)

然后,用多头注意力来计算特征之间的关系:
\(
\textit{MultiHead}\left({H}^{l-1}\right) = [head_1;head_2;\cdots;head_h]{W}^{O}
\)

\(
head_i = \textit{Attention}\left({H}^{l-1}{W}^{l}_{Q},{H}^{l-1}{W}^{l}_{K},{H}^{l-1}{W}^{l}_{V} \right)
\) \(
\textit{Attention}\left({Q},{K},{V} \right) = \textit{softmax} \left(\frac{{Q}{K}^T}{\sqrt{d_h} } \right){V}
\)

接着,我们将多头注意力的输出

\(
{H}^{l} = \textit{FFN}\left( \textit{MultiHead}\left( {H}^{l-1} \right) \right)
\) \(
\textit{FFN}\left({x} \right) = \textit{ReLU}\left({x}{W}_1 + {b}_1 \right){W}_2 + {b}_2
\)

预测:
\(
\widehat{x}_t=f\left({W}_p{H}^h + {b}_p\right)
\)

损失函数:
\(
\mathcal L=\sum_{t=T+1}^{n}\left(x_t-\widehat{x}_{t} \right)^2+\alpha\sum_{i=1}^{n}
\ell(\widetilde{\mathbf{x}}_i-\widehat{\mathbf{x}}_i)+\lambda\Omega \left(\Theta \right)
\)

4. 实验

4.1 数据集

我们主要用了两个锂电池寿命预测最常用的数据集:NASA 和 CALCE 数据集。NASA 是美国宇航局 NASA 埃姆斯研究中心提供的锂电池老化实验数据,CALCE 是马里兰大学高级生命周期工程中心的电池循环测试数据集。它们都包含四条电池的充放电记录。有关数据集的处理和提取,可以查看我的其他博客(NASA 锂电池数据集CALCE 锂电池数据集)。

4.2 代码讲解

4.2.1 降噪自编码

该模块主要用于降噪,来自传感器的数据常常带有不少噪声。这些噪声会对网络的训练进行干扰。所以,训练前最好能进行降噪处理。

class Autoencoder(nn.Module):
    def __init__(self, input_size=16, hidden_dim=8, noise_level=0.01):
        super(Autoencoder, self).__init__()
        self.input_size, self.hidden_dim, self.noise_level = input_size, hidden_dim, noise_level
        self.fc1 = nn.Linear(self.input_size, self.hidden_dim)
        self.fc2 = nn.Linear(self.hidden_dim, self.input_size)

    def encoder(self, x):
        x = self.fc1(x)
        h1 = F.relu(x)
        return h1

    def mask(self, x):
        corrupted_x = x + self.noise_level * torch.randn_like(x)
        return corrupted_x

    def decoder(self, x):
        h2 = self.fc2(x)
        return h2

    def forward(self, x):
        out = self.mask(x)
        encode = self.encoder(out)
        decode = self.decoder(encode)
        return encode, decode

4.2.2 主网络

整个网络模块总共有两个部分:降噪和 Transformer 网络。

class Net(nn.Module):
    def __init__(self, feature_size=16, hidden_dim=32, num_layers=1, nhead=8,
                 dropout=0.0, noise_level=0.01, is_autoencoder=False):
        super(Net, self).__init__()
        self.is_autoencoder = is_autoencoder
        self.auto_hidden = int(feature_size/2)
        input_size = self.auto_hidden if is_autoencoder else feature_size
        self.pos = PositionalEncoding(d_model=input_size, max_len=input_size)
        encoder_layers = nn.TransformerEncoderLayer(d_model=input_size, nhead=nhead,
                                                    dim_feedforward=hidden_dim, dropout=dropout)
        self.cell = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.linear = nn.Linear(input_size, 1)

        if self.is_autoencoder:
            self.autoencoder = Autoencoder(input_size=feature_size,
                                           hidden_dim=self.auto_hidden, noise_level=noise_level)

    def forward(self, x): 
        batch_size, feature_num, feature_size  = x.shape 
        if self.is_autoencoder:
            encode, decode = self.autoencoder(x.reshape(batch_size, -1))# batch_size*seq_len
            out = encode.reshape(batch_size, -1, self.auto_hidden)
        else:
            decode = x.reshape(batch_size, -1)
            out = x
        out = self.pos(out)
        out = out.reshape(1, batch_size, -1) # (1, batch_size, feature_size)
        out = self.cell(out)  
        out = out.reshape(batch_size, -1) # (batch_size, hidden_dim)
        out = self.linear(out)            # out shape: (batch_size, 1)

        return out, decode

4.2.3 训练函数

def train(lr=0.01, feature_size=8, hidden_dim=32, num_layers=1, nhead=8, 
          weight_decay=0.0, EPOCH=1000, seed=0, is_autoencoder=True, alpha=0.0, 
          noise_level=0.0, dropout=0.0, metric='re', is_load_weights=True):
    score_list, result_list = [], []
    setup_seed(seed)
    for i in range(4):
        name = Battery_list[i]
        window_size = feature_size
        train_x, train_y, train_data, test_data = get_train_test(Battery, name, 
                                                                 window_size)
        # print('sample size: {}'.format(train_size))

        model = Net(feature_size=feature_size, hidden_dim=hidden_dim, 
                    num_layers=num_layers, nhead=nhead, dropout=dropout,
                    is_autoencoder=is_autoencoder, noise_level=noise_level)
        model = model.to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, 
                                     weight_decay=weight_decay)
        criterion = nn.MSELoss()

        '''
        # save ramdom data for repetition
        if torch.__version__.split('+')[0] >= '1.6.0':
            torch.save(model.state_dict(), 'model_NASA'+str(seed)+'.pth')
        else:
            torch.save(model.state_dict(), 
            'model_CALCE.pth', _use_new_zipfile_serialization=False)        
        '''
        # load the random data generated by my device
        if is_load_weights: 
            if torch.__version__.split('+')[0] >= '1.6.0':
                model.load_state_dict(torch.load('model_NASA.pth')) 
            else:
                model.load_state_dict(torch.load('model_NASA_1.5.0.pth'))

        test_x = train_data.copy()
        loss_list, y_ = [0], []
        rmse, re = 1, 1
        score_, score = [1],[1]
        for epoch in range(EPOCH):
            X = np.reshape(train_x/Rated_Capacity,(-1, 1, 
                                                   feature_size)).astype(np.float32)
            #(batch_size, seq_len, input_size)
            y = np.reshape(train_y[:,-1]/Rated_Capacity,(-1,1)).astype(np.float32)
            # shape 为 (batch_size, 1)

            X, y = torch.from_numpy(X).to(device), torch.from_numpy(y).to(device)
            output, decode = model(X)
            output = output.reshape(-1, 1)
            loss = criterion(output, y) + alpha * criterion(
                decode, X.reshape(-1, feature_size))
            optimizer.zero_grad()     # clear gradients for this training step
            loss.backward()           # backpropagation, compute gradients
            optimizer.step()          # apply gradients

            if (epoch + 1)%10 == 0:
                test_x = train_data.copy()
                point_list = []
                while (len(test_x) - len(train_data)) < len(test_data):
                    x = np.reshape(np.array(test_x[-feature_size:])/Rated_Capacity,
                                   (-1, 1, feature_size)).astype(np.float32)
                    x = torch.from_numpy(x).to(device) 
                    # (batch_size,feature_size=1,input_size)
                    pred, _ = model(x)      # pred shape (batch_size=1, feature_size=1)
                    next_point = pred.data.cpu().numpy()[0,0] * Rated_Capacity
                    test_x.append(next_point)     
                    point_list.append(next_point) 
                    # Saves the predicted value of the last point in the output sequence
                y_.append(point_list)       # Save all the predicted values
                loss_list.append(loss)
                rmse = evaluation(y_test=test_data, y_predict=y_[-1])
                re = relative_error(
                    y_test=test_data, y_predict=y_[-1], threshold=Rated_Capacity*0.7)
            if metric == 're':
                score = [re]
            elif metric == 'rmse':
                score = [rmse]
            else:
                score = [re, rmse]
            if (loss < 1e-3) and (score_[0] < score[0]):
                break
            score_ = score.copy()

        score_list.append(score_)
        result_list.append(y_[-1])
    return score_list, result_list

4.2.4 主函数

如下参数是我的电脑用网格搜索法找到比较优的参数。如果你的电脑结果和我不一样,可以加载我最优结果时电脑生成的随机权重。当然,最好是用 4.4.5 网格搜索法重新训练模型获取最有参数。

Rated_Capacity = 2.0
window_size = 16
feature_size = window_size
is_autoencoder = True
dropout = 0.0
EPOCH = 2000
nhead = 8
hidden_dim = 16
num_layers = 1
lr = 0.01    # learning rate
weight_decay = 0.0
noise_level = 0.0
alpha = 1e-5
is_load_weights = True
metric = 're'
seed = 0

SCORE = []
print('seed:{}'.format(seed))
score_list, _ = train(lr=lr, feature_size=feature_size, hidden_dim=hidden_dim,
                      num_layers=num_layers, nhead=nhead, weight_decay=weight_decay,
                      EPOCH=EPOCH, seed=seed, dropout=dropout, 
                      is_autoencoder=is_autoencoder, alpha=alpha, 
                      noise_level=noise_level, metric=metric,
                      is_load_weights=is_load_weights)
print(np.array(score_list))
for s in score_list:
    SCORE.append(s)
print('------------------------------------------------------------------')
print(metric + ' mean: {:<6.4f}'.format(np.mean(np.array(SCORE))))

4.2.5 网格搜索法

网格搜索法重新训练模型获取最有参数。

Rated_Capacity = 2.0
window_size = 16
feature_size = window_size
is_autoencoder = True
dropout = 0.0
EPOCH = 2000
nhead = 8
is_load_weights = False

weight_decay = 0.0
noise_level = 0.0
alpha = 0.0
metric = 're'

states = {}
for lr in [1e-3, 1e-2]:
    for num_layers in [1, 2]:
        for hidden_dim in [16, 32]:
            for alpha in [1e-5, 1e-4]:
                show_str = 'lr={}, num_layers={}, hidden_dim={}, alpha={}'.format(
                lr, num_layers, hidden_dim)
                print(show_str)
                SCORE = []
                for seed in range(5):
                    print('seed:{}'.format(seed))
                    score_list, _ = train(lr=lr, feature_size=feature_size, 
                                      hidden_dim=hidden_dim, num_layers=num_layers, 
                                      nhead=nhead, weight_decay=weight_decay, 
                                      EPOCH=EPOCH, seed=seed, dropout=dropout, 
                                      is_autoencoder=is_autoencoder, alpha=alpha, 
                                      noise_level=noise_level, metric=metric, 
                                      is_load_weights=is_load_weights)
                    print(np.array(score_list))
                    print(metric + ': {:<6.4f}'.format(np.mean(np.array(score_list))))
                    print('----------------------------------------------------------------')
                    for s in score_list:
                        SCORE.append(s)

                print(metric + ' mean: {:<6.4f}'.format(np.mean(np.array(SCORE))))
                states[show_str] = np.mean(np.array(SCORE))
                print('===================================================================')

min_key = min(states, key = states.get)
print('optimal parameters: {}, result: {}'.format(min_key, states[min_key]))

更多内容

1. NASA 锂电池数据集,基于 Python 的锂电池寿命预测: https://snailwish.com/395/

2. CALCE 锂电池数据集,基于 Python 数据处理: https://snailwish.com/437/

3. NASA 锂电池数据集,基于 python 的 MLP 锂电池寿命预测: https://snailwish.com/427/

4. NASA 锂电池数据集,基于 python 的 MLP 锂电池寿命预测: https://snailwish.com/464/

5. NASA 和 CALCE 锂电池数据集,基于 Pytorch 的 RNN、LSTM、GRU 寿命预测: https://snailwish.com/497/

6. 锂电池研究之七——基于 Pytorch 的高斯函数拟合时间序列数据: https://snailwish.com/576/