Sfoglia il codice sorgente

添加pytorch测试代码

alex 4 mesi fa
parent
commit
e1378e7a54
4 ha cambiato i file con 168 aggiunte e 7 eliminazioni
  1. 1 0
      .gitignore
  2. 6 7
      example_siso_pulley_deepc.py
  3. 68 0
      test_pytorch.py
  4. 93 0
      test_pytorch_cuda.py

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+/data/*

+ 6 - 7
example_siso_pulley_deepc.py

@@ -20,7 +20,6 @@ import warnings
 warnings.simplefilter(action='ignore', category=FutureWarning)
 warnings.simplefilter(action='ignore', category=UserWarning)
 
-#
 
 # Define the loss function for DeePC
 def loss_callback(u: cp.Variable, y: cp.Variable) -> Expression:
@@ -86,15 +85,15 @@ for T in T_list:
     i=i+1
     print(f'Simulating with {T} initial samples...count={i}')
     sys.reset()
-    # Generate initial data and initialize DeePC
+    # Generate initial data and initialize DeePC   data : u->1000*1 y-> 1000*1
     data = sys.apply_input(u = np.random.normal(size=T).reshape((T, 1)))
     data_train = data
 
-
+    #Uf 10*987 Up 4*987 Yf 10*987 Yp 4*987  Tini=4  horizon=10
     deepc = DeePC(data, Tini = T_INI, horizon = HORIZON)
     # deepc = DeePC(data, Tini=T_INI, horizon=HORIZON, explained_variance=.9999)
 
-    # Create initial data
+    # Create initial data u:4*1 y:4*1
     data_ini = Data(u = np.zeros((T_INI, 1)), y = np.zeros((T_INI, 1)))
     sys.reset(data_ini = data_ini)
 
@@ -106,14 +105,14 @@ for T in T_list:
         lambda_u = LAMBDA_U_REGULARIZER)
 
     for idx in range(EXPERIMENT_HORIZON // s):  #整除
-        # Solve DeePC
+        # Solve DeePC  u_optimal:10*1
         u_optimal, info = deepc.solve(data_ini = data_ini, warm_start=True, solver=cp.ECOS)
 
 
-        # Apply optimal control input
+        # Apply optimal control input  计算出来控制量u 取第1:s行,所有列数据
         _ = sys.apply_input(u = u_optimal[:s, :])
 
-        # Fetch last T_INI samples
+        # Fetch last T_INI samples 计算出来的u:4*1     y4*1
         data_ini = sys.get_last_n_samples(T_INI)
 
         if idx % 10 == 1 :

+ 68 - 0
test_pytorch.py

@@ -0,0 +1,68 @@
+
+import torch
+import torch.nn as nn
+import numpy as np
+import matplotlib.pyplot as plt
+
+
+print(torch.__version__)  # pytorch版本
+print(torch.version.cuda)  # cuda版本
+print(torch.cuda.is_available())  # 查看cuda是否可用
+
+#
+# 使用GPU or CPU
+device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
+
+
+
+# 生成一些随机的训练数据
+np.random.seed(42)
+x = np.random.rand(1000, 1)
+y = 2 * x + 1 + 0.5 * np.random.randn(1000, 1)
+
+# 将数据转换为张量
+x_tensor = torch.from_numpy(x).float()
+y_tensor = torch.from_numpy(y).float()
+
+# 定义线性回归模型
+class LinearRegressionModel(nn.Module):
+    def __init__(self):
+        super(LinearRegressionModel, self).__init__()
+        self.linear = nn.Linear(1, 1)
+
+    def forward(self, x):
+        return self.linear(x)
+
+# 创建模型实例
+model = LinearRegressionModel()
+
+# 定义损失函数和优化器
+criterion = nn.MSELoss()
+optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
+
+# 训练模型
+num_epochs = 1000
+for epoch in range(num_epochs):
+    # 前向传播
+    outputs = model(x_tensor)
+    loss = criterion(outputs, y_tensor)
+
+    # 反向传播和优化
+    optimizer.zero_grad()
+    loss.backward()
+    optimizer.step()
+
+    if (epoch + 1) % 10 == 0:
+        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}')
+
+# 进行预测
+with torch.no_grad():
+    predicted = model(x_tensor)
+
+# 绘制原始数据和预测结果
+plt.scatter(x, y, label='Original Data')
+plt.plot(x, predicted.numpy(), color='red', label='Predicted Line')
+plt.xlabel('x')
+plt.ylabel('y')
+plt.legend()
+plt.show()

+ 93 - 0
test_pytorch_cuda.py

@@ -0,0 +1,93 @@
+import torch
+import torch.nn as nn
+import torch.optim as optim
+from torchvision import datasets, transforms
+import matplotlib.pyplot as plt
+
+# 检查是否有可用的GPU
+device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
+
+# 定义数据预处理步骤
+transform = transforms.Compose([
+    transforms.ToTensor(),
+    transforms.Normalize((0.1307,), (0.3081,))
+])
+
+# 加载MNIST训练数据集
+train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
+train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
+
+# 加载MNIST测试数据集
+test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
+test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)
+
+# 定义卷积神经网络模型
+class Net(nn.Module):
+    def __init__(self):
+        super(Net, self).__init__()
+        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
+        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
+        self.fc1 = nn.Linear(320, 50)
+        self.fc2 = nn.Linear(50, 10)
+
+    def forward(self, x):
+        x = nn.functional.relu(nn.functional.max_pool2d(self.conv1(x), 2))
+        x = nn.functional.relu(nn.functional.max_pool2d(self.conv2(x), 2))
+        x = x.view(-1, 320)
+        x = nn.functional.relu(self.fc1(x))
+        x = self.fc2(x)
+        return x
+
+# 创建模型实例并将其移动到设备(GPU或CPU)
+model = Net()
+model.to(device)
+
+# 定义损失函数和优化器
+criterion = nn.CrossEntropyLoss()
+optimizer = optim.SGD(model.parameters(), lr=0.01)
+
+# 训练模型
+def train_model():
+    model.train()
+    for epoch in range(20):
+        running_loss = 0.0
+        for batch_idx, (data, target) in enumerate(train_loader):
+            data, target = data.to(device), target.to(device)
+
+            optimizer.zero_grad()
+
+            output = model(data)
+            loss = criterion(output, target)
+            loss.backward()
+            optimizer.step()
+
+            running_loss += loss.item()
+            if batch_idx % 100 == 0:
+                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
+                    epoch + 1, batch_idx * 64, len(train_loader.dataset),
+                    100. * batch_idx / len(train_loader), running_loss / (batch_idx + 1)))
+
+        print('====> Epoch: {} Average Loss: {:.4f}'.format(epoch + 1, running_loss / len(train_loader)))
+
+# 测试模型
+def test_model():
+    model.eval()
+    test_loss = 0
+    correct = 0
+    with torch.no_grad():
+        for data, target in test_loader:
+            data, target = data.to(device), target.to(device)
+
+            output = model(data)
+            test_loss += criterion(output, target).item()
+            pred = output.argmax(dim=1, keepdim=True)
+            correct += pred.eq(target.view_as(pred)).sum().item()
+
+    test_loss /= len(test_loader)
+    accuracy = correct / len(test_loader.dataset)
+
+    print('Test set: Average loss: {:.4f}, Accuracy: {:.2f}%'.format(test_loss, 100 * accuracy))
+
+if __name__ == "__main__":
+    train_model()
+    test_model()