test_pytorch_lstm.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import numpy as np
  2. import torch
  3. import torch.nn as nn
  4. import torch.optim as optim
  5. import matplotlib.pyplot as plt
  6. # 设置随机种子以确保结果可重复
  7. np.random.seed(0)
  8. torch.manual_seed(0)
  9. # 1. 生成随机一位序列
  10. sequence_length = 100
  11. # random_sequence = np.random.randint(0, 200, size=sequence_length)
  12. random_sequence = np.linspace(0, 200, sequence_length)
  13. # 将序列保存到文件
  14. np.savetxt('random_sequence.csv', random_sequence, delimiter=',')
  15. # 2. 定义 LSTM 模型
  16. class LSTMModel(nn.Module):
  17. def __init__(self, input_size, hidden_size, num_layers, output_size):
  18. super(LSTMModel, self).__init__()
  19. self.hidden_size = hidden_size
  20. self.num_layers = num_layers
  21. self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
  22. self.fc = nn.Linear(hidden_size, output_size)
  23. def forward(self, x):
  24. h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
  25. c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
  26. out, _ = self.lstm(x, (h0, c0))
  27. out = self.fc(out[:, -1, :])
  28. return out
  29. # 3. 准备数据
  30. def create_sequences(data, seq_length):
  31. xs, ys = [], []
  32. for i in range(len(data) - seq_length):
  33. x = data[i:i+seq_length]
  34. y = data[i+seq_length]
  35. xs.append(x)
  36. ys.append(y)
  37. return np.array(xs), np.array(ys)
  38. # 参数设置
  39. input_size = 1
  40. hidden_size = 32
  41. num_layers = 2
  42. output_size = 1
  43. seq_length = 10
  44. # 创建序列
  45. X, y = create_sequences(random_sequence, seq_length)
  46. # 转换为 PyTorch 张量
  47. X = torch.tensor(X, dtype=torch.float32).unsqueeze(2)
  48. y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
  49. # 4. 训练 LSTM 模型
  50. model = LSTMModel(input_size, hidden_size, num_layers, output_size)
  51. criterion = nn.MSELoss()
  52. optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
  53. num_epochs = 100*100
  54. for epoch in range(num_epochs):
  55. model.train()
  56. outputs = model(X)
  57. loss = criterion(outputs, y)
  58. optimizer.zero_grad()
  59. loss.backward()
  60. optimizer.step()
  61. if (epoch+1) % 10 == 0:
  62. print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
  63. # 5. 生成预测结果
  64. model.eval()
  65. with torch.no_grad():
  66. # 初始化 test_input 为前 seq_length 个序列
  67. test_input = X[0].unsqueeze(0) # 形状 (1, 5, 1)
  68. predicted = []
  69. for _ in range(sequence_length - seq_length):
  70. output = model(test_input)
  71. predicted.append(output.item())
  72. # 更新 test_input: 移除第一个时间步并添加预测的输出
  73. test_input = torch.cat((test_input[:, 1:], output.view(1, 1, 1)), dim=1)
  74. predicted = np.array(predicted)
  75. np.savetxt('random_sequence_predicted.csv', predicted, delimiter=',')
  76. # 6. 使用 matplotlib 绘制对比图
  77. plt.figure(figsize=(12, 6))
  78. # 绘制原始序列
  79. plt.subplot(1, 2, 1)
  80. plt.scatter(range(sequence_length), random_sequence, label='Original Sequence', color='blue', s=50)
  81. plt.title('原始序列')
  82. plt.xlabel('时间步')
  83. plt.ylabel('值')
  84. plt.legend()
  85. # 绘制预测序列
  86. plt.subplot(1, 2, 2)
  87. plt.scatter(range(seq_length, sequence_length), predicted, label='Predicted Sequence', color='red', s=50)
  88. plt.title('预测序列')
  89. plt.xlabel('时间步')
  90. plt.ylabel('值')
  91. plt.legend()
  92. plt.tight_layout()
  93. plt.show()