import numpy as np import torch import torch.nn as nn import torch.optim as optim import matplotlib.pyplot as plt # 设置随机种子以确保结果可重复 np.random.seed(0) torch.manual_seed(0) # 1. 生成随机一位序列 sequence_length = 100 # random_sequence = np.random.randint(0, 200, size=sequence_length) random_sequence = np.linspace(0, 200, sequence_length) # 将序列保存到文件 np.savetxt('random_sequence.csv', random_sequence, delimiter=',') # 2. 定义 LSTM 模型 class LSTMModel(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) out, _ = self.lstm(x, (h0, c0)) out = self.fc(out[:, -1, :]) return out # 3. 准备数据 def create_sequences(data, seq_length): xs, ys = [], [] for i in range(len(data) - seq_length): x = data[i:i+seq_length] y = data[i+seq_length] xs.append(x) ys.append(y) return np.array(xs), np.array(ys) # 参数设置 input_size = 1 hidden_size = 32 num_layers = 2 output_size = 1 seq_length = 10 # 创建序列 X, y = create_sequences(random_sequence, seq_length) # 转换为 PyTorch 张量 X = torch.tensor(X, dtype=torch.float32).unsqueeze(2) y = torch.tensor(y, dtype=torch.float32).unsqueeze(1) # 4. 训练 LSTM 模型 model = LSTMModel(input_size, hidden_size, num_layers, output_size) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.01) num_epochs = 100*100 for epoch in range(num_epochs): model.train() outputs = model(X) loss = criterion(outputs, y) optimizer.zero_grad() loss.backward() optimizer.step() if (epoch+1) % 10 == 0: print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') # 5. 生成预测结果 model.eval() with torch.no_grad(): # 初始化 test_input 为前 seq_length 个序列 test_input = X[0].unsqueeze(0) # 形状 (1, 5, 1) predicted = [] for _ in range(sequence_length - seq_length): output = model(test_input) predicted.append(output.item()) # 更新 test_input: 移除第一个时间步并添加预测的输出 test_input = torch.cat((test_input[:, 1:], output.view(1, 1, 1)), dim=1) predicted = np.array(predicted) np.savetxt('random_sequence_predicted.csv', predicted, delimiter=',') # 6. 使用 matplotlib 绘制对比图 plt.figure(figsize=(12, 6)) # 绘制原始序列 plt.subplot(1, 2, 1) plt.scatter(range(sequence_length), random_sequence, label='Original Sequence', color='blue', s=50) plt.title('原始序列') plt.xlabel('时间步') plt.ylabel('值') plt.legend() # 绘制预测序列 plt.subplot(1, 2, 2) plt.scatter(range(seq_length, sequence_length), predicted, label='Predicted Sequence', color='red', s=50) plt.title('预测序列') plt.xlabel('时间步') plt.ylabel('值') plt.legend() plt.tight_layout() plt.show()