123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import matplotlib.pyplot as plt
- # 设置随机种子以确保结果可重复
- np.random.seed(0)
- torch.manual_seed(0)
- # 1. 生成随机一位序列
- sequence_length = 100
- # random_sequence = np.random.randint(0, 200, size=sequence_length)
- random_sequence = np.linspace(0, 200, sequence_length)
- # 将序列保存到文件
- np.savetxt('random_sequence.csv', random_sequence, delimiter=',')
- # 2. 定义 LSTM 模型
- class LSTMModel(nn.Module):
- def __init__(self, input_size, hidden_size, num_layers, output_size):
- super(LSTMModel, self).__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
- self.fc = nn.Linear(hidden_size, output_size)
- def forward(self, x):
- h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
- c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
- out, _ = self.lstm(x, (h0, c0))
- out = self.fc(out[:, -1, :])
- return out
- # 3. 准备数据
- def create_sequences(data, seq_length):
- xs, ys = [], []
- for i in range(len(data) - seq_length):
- x = data[i:i+seq_length]
- y = data[i+seq_length]
- xs.append(x)
- ys.append(y)
- return np.array(xs), np.array(ys)
- # 参数设置
- input_size = 1
- hidden_size = 32
- num_layers = 2
- output_size = 1
- seq_length = 10
- # 创建序列
- X, y = create_sequences(random_sequence, seq_length)
- # 转换为 PyTorch 张量
- X = torch.tensor(X, dtype=torch.float32).unsqueeze(2)
- y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
- # 4. 训练 LSTM 模型
- model = LSTMModel(input_size, hidden_size, num_layers, output_size)
- criterion = nn.MSELoss()
- optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
- num_epochs = 100*100
- for epoch in range(num_epochs):
- model.train()
- outputs = model(X)
- loss = criterion(outputs, y)
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- if (epoch+1) % 10 == 0:
- print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
- # 5. 生成预测结果
- model.eval()
- with torch.no_grad():
- # 初始化 test_input 为前 seq_length 个序列
- test_input = X[0].unsqueeze(0) # 形状 (1, 5, 1)
- predicted = []
- for _ in range(sequence_length - seq_length):
- output = model(test_input)
- predicted.append(output.item())
- # 更新 test_input: 移除第一个时间步并添加预测的输出
- test_input = torch.cat((test_input[:, 1:], output.view(1, 1, 1)), dim=1)
- predicted = np.array(predicted)
- np.savetxt('random_sequence_predicted.csv', predicted, delimiter=',')
- # 6. 使用 matplotlib 绘制对比图
- plt.figure(figsize=(12, 6))
- # 绘制原始序列
- plt.subplot(1, 2, 1)
- plt.scatter(range(sequence_length), random_sequence, label='Original Sequence', color='blue', s=50)
- plt.title('原始序列')
- plt.xlabel('时间步')
- plt.ylabel('值')
- plt.legend()
- # 绘制预测序列
- plt.subplot(1, 2, 2)
- plt.scatter(range(seq_length, sequence_length), predicted, label='Predicted Sequence', color='red', s=50)
- plt.title('预测序列')
- plt.xlabel('时间步')
- plt.ylabel('值')
- plt.legend()
- plt.tight_layout()
- plt.show()
|