Parcourir la source

增加deepc example及调试

alex il y a 5 mois
Parent
commit
2b6e5a32df
6 fichiers modifiés avec 483 ajouts et 5 suppressions
  1. 11 0
      pydeepc/__init__.py
  2. 232 0
      pydeepc/deepc.py
  3. 144 0
      pydeepc/utils.py
  4. 16 0
      setup.py
  5. 10 5
      test.py
  6. 70 0
      utils.py

+ 11 - 0
pydeepc/__init__.py

@@ -0,0 +1,11 @@
+from .deepc import *
+from .utils import *
+
+__author__ = 'Alessio Russo - alessior@kth.se'
+__version__ = '0.3.6'
+__url__ = 'https://github.com/rssalessio/PyDeePC'
+__info__ = {
+    'version': __version__,
+    'author': __author__,
+    'url': __url__
+}

+ 232 - 0
pydeepc/deepc.py

@@ -0,0 +1,232 @@
+import math
+import numpy as np
+import cvxpy as cp
+from numpy.typing import NDArray
+from typing import Tuple, Callable, List, Optional, Union, Dict
+from cvxpy.expressions.expression import Expression
+from cvxpy.constraints.constraint import Constraint
+from pydeepc.utils import (
+    Data,
+    split_data,
+    low_rank_matrix_approximation,
+    OptimizationProblem,
+    OptimizationProblemVariables)
+
+
+
+
+class DeePC(object):
+    optimization_problem: OptimizationProblem = None
+    _SMALL_NUMBER: float = 1e-32
+
+    def __init__(self, data: Data, Tini: int, horizon: int, explained_variance: Optional[float] = None):
+        """
+        Solves the DeePC optimization problem
+        For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
+
+        :param data:                A tuple of input/output data. Data should have shape TxM
+                                    where T is the batch size and M is the number of features
+        :param Tini:                number of samples needed to estimate initial conditions
+        :param horizon:             horizon length
+        :param explained_variance:  Regularization term in (0,1] used to approximate the Hankel matrices.
+                                    By default is None (no low-rank approximation is performed).
+        """
+        assert explained_variance is None or 0 < explained_variance <= 1, "explained_variance should be in (0,1] or be none"
+        self.Tini = Tini
+        self.horizon = horizon
+        self.explained_variance = explained_variance
+        self.update_data(data)
+
+        self.optimization_problem = None
+
+    def update_data(self, data: Data):
+        """
+        Update Hankel matrices of DeePC. You need to rebuild the optimization problem
+        after calling this funciton.
+
+        :param data:                A tuple of input/output data. Data should have shape TxM
+                                    where T is the batch size and M is the number of features
+        """
+        assert len(data.u.shape) == 2, \
+            "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
+        assert len(data.y.shape) == 2, \
+            "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
+        assert data.y.shape[0] == data.u.shape[0], \
+            "Input/output data must have the same length"
+        assert data.y.shape[0] - self.Tini - self.horizon + 1 >= 1, \
+            f"There is not enough data: this value {data.y.shape[0] - self.Tini - self.horizon + 1} needs to be >= 1"
+        
+        Up, Uf, Yp, Yf = split_data(data, self.Tini, self.horizon, self.explained_variance)
+
+        self.Up = Up
+        self.Uf = Uf
+        self.Yp = Yp
+        self.Yf = Yf
+        
+        self.M = data.u.shape[1]
+        self.P = data.y.shape[1]
+        self.T = data.u.shape[0]
+
+        self.optimization_problem = None
+
+    def build_problem(self,
+            build_loss: Callable[[cp.Variable, cp.Variable], Expression],
+            build_constraints: Optional[Callable[[cp.Variable, cp.Variable], Optional[List[Constraint]]]] = None,
+            lambda_g: float = 0.,
+            lambda_y: float = 0.,
+            lambda_u: float= 0.,
+            lambda_proj: float = 0.) -> OptimizationProblem:
+        """
+        Builds the DeePC optimization problem
+        For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
+
+        For info on the projection (least-square) regularizer, see also
+        https://arxiv.org/pdf/2101.01273.pdf
+
+
+        :param build_loss:          Callback function that takes as input an (input,output) tuple of data
+                                    of shape (TxM), where T is the horizon length and M is the feature size
+                                    The callback should return a scalar value of type Expression
+        :param build_constraints:   Callback function that takes as input an (input,output) tuple of data
+                                    of shape (TxM), where T is the horizon length and M is the feature size
+                                    The callback should return a list of constraints.
+        :param lambda_g:            non-negative scalar. Regularization factor for g. Used for
+                                    stochastic/non-linear systems.
+        :param lambda_y:            non-negative scalar. Regularization factor for y_init. Used for
+                                    stochastic/non-linear systems.
+        :param lambda_u:            non-negative scalar. Regularization factor for u_init. Used for
+                                    stochastic/non-linear systems.
+        :param lambda_proj:         Positive term that penalizes the least square solution.
+        :return:                    Parameters of the optimization problem
+        """
+        assert build_loss is not None, "Loss function callback cannot be none"
+        assert lambda_g >= 0 and lambda_y >= 0, "Regularizers must be non-negative"
+        assert lambda_u >= 0, "Regularizer of u_init must be non-negative"
+        assert lambda_proj >= 0, "The projection regularizer must be non-negative"
+
+        self.optimization_problem = False
+
+        # Build variables
+        uini = cp.Parameter(shape=(self.M * self.Tini), name='u_ini')
+        yini = cp.Parameter(shape=(self.P * self.Tini), name='y_ini')
+        u = cp.Variable(shape=(self.M * self.horizon), name='u')
+        y = cp.Variable(shape=(self.P * self.horizon), name='y')
+        g = cp.Variable(shape=(self.T - self.Tini - self.horizon + 1), name='g')
+        slack_y = cp.Variable(shape=(self.Tini * self.P), name='slack_y')
+        slack_u = cp.Variable(shape=(self.Tini * self.M), name='slack_u')
+
+        Up, Yp, Uf, Yf = self.Up, self.Yp, self.Uf, self.Yf
+
+        if lambda_proj > DeePC._SMALL_NUMBER:
+            # Compute projection matrix (for the least square solution)
+            Zp = np.vstack([Up, Yp, Uf])
+            ZpInv = np.linalg.pinv(Zp)
+            I = np.eye(self.T - self.Tini - self.horizon + 1)
+            # Kernel orthogonal projector
+            I_min_P = I - (ZpInv@ Zp)
+
+        A = np.vstack([Up, Yp, Uf, Yf])
+        b = cp.hstack([uini + slack_u, yini + slack_y, u, y])
+
+        # Build constraints
+        constraints = [A @ g == b]
+
+        if math.isclose(lambda_y, 0):
+            constraints.append(cp.norm(slack_y, 2) <= DeePC._SMALL_NUMBER)
+        if math.isclose(lambda_u, 0):
+            constraints.append(cp.norm(slack_u, 2) <= DeePC._SMALL_NUMBER)
+
+        # u, y = self.Uf @ g, self.Yf @ g
+        u = cp.reshape(u, (self.horizon, self.M), order = 'C')
+        y = cp.reshape(y, (self.horizon, self.P), order = 'C')
+
+        _constraints = build_constraints(u, y) if build_constraints is not None else (None, None)
+
+        for idx, constraint in enumerate(_constraints):
+            if constraint is None or not isinstance(constraint, Constraint) or not constraint.is_dcp():
+                raise Exception(f'Constraint {idx} is not defined or is not convex.')
+
+        constraints.extend([] if _constraints is None else _constraints)
+
+        # Build loss
+        _loss = build_loss(u, y)
+        
+        if _loss is None or not isinstance(_loss, Expression) or not _loss.is_dcp():
+            raise Exception('Loss function is not defined or is not convex!')
+
+        # Add regularizers
+        _regularizers = lambda_g * cp.norm(g, p=1) if lambda_g > DeePC._SMALL_NUMBER else 0
+        _regularizers += lambda_y * cp.norm(slack_y, p=1) if lambda_y > DeePC._SMALL_NUMBER else 0
+        _regularizers += lambda_proj * cp.norm(I_min_P @ g) if lambda_proj > DeePC._SMALL_NUMBER  else 0
+        _regularizers += lambda_u * cp.norm(slack_u, p=1) if lambda_u > DeePC._SMALL_NUMBER else 0
+
+        problem_loss = _loss + _regularizers
+
+        # Solve problem
+        objective = cp.Minimize(problem_loss)
+
+        try:
+            problem = cp.Problem(objective, constraints)
+        except cp.SolverError as e:
+            raise Exception(f'Error while constructing the DeePC problem. Details: {e}')
+
+        self.optimization_problem = OptimizationProblem(
+            variables = OptimizationProblemVariables(
+                u_ini = uini, y_ini = yini, u = u, y = y, g = g, slack_y = slack_y, slack_u = slack_u),
+            constraints = constraints,
+            objective_function = problem_loss,
+            problem = problem
+        )
+
+        return self.optimization_problem
+
+    def solve(
+            self,
+            data_ini: Data,
+            **cvxpy_kwargs
+        ) -> Tuple[NDArray[np.float64], Dict[str, Union[float, NDArray[np.float64], OptimizationProblemVariables]]]:
+        """
+        Solves the DeePC optimization problem
+        For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
+
+        :param data_ini:            A tuple of input/output data used to estimate initial condition.
+                                    Data should have shape Tini x M where Tini is the batch size and
+                                    M is the number of features
+        :param cvxpy_kwargs:        All arguments that need to be passed to the cvxpy solve method.
+        :return u_optimal:          Optimal input signal to be applied to the system, of length `horizon`
+        :return info:               A dictionary with 5 keys:
+                                    info['variables']: variables of the optimization problem
+                                    info['value']: value of the optimization problem
+                                    info['u_optimal']: the same as the first value returned by this function
+        """
+        assert len(data_ini.u.shape) == 2, "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
+        assert len(data_ini.y.shape) == 2, "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
+        assert data_ini.u.shape[1] == self.M, "Incorrect number of features for the input signal"
+        assert data_ini.y.shape[1] == self.P, "Incorrect number of features for the output signal"
+        assert data_ini.y.shape[0] == data_ini.u.shape[0], "Input/output data must have the same length"
+        assert data_ini.y.shape[0] == self.Tini, f"Invalid size"
+        assert self.optimization_problem is not None, "Problem was not built"
+
+
+        # Need to transpose to make sure that time is over the columns, and features over the rows
+        uini, yini = data_ini.u[:self.Tini].flatten(), data_ini.y[:self.Tini].flatten()
+
+        self.optimization_problem.variables.u_ini.value = uini
+        self.optimization_problem.variables.y_ini.value = yini
+
+        try:
+            result = self.optimization_problem.problem.solve(**cvxpy_kwargs)
+        except cp.SolverError as e:
+            raise Exception(f'Error while solving the DeePC problem. Details: {e}')
+
+        if np.isinf(result):
+            raise Exception('Problem is unbounded')
+
+        u_optimal = (self.Uf @ self.optimization_problem.variables.g.value).reshape(self.horizon, self.M)
+        info = {
+            'value': result, 
+            'variables': self.optimization_problem.variables,
+            'u_optimal': u_optimal
+            }
+
+        return u_optimal, info

+ 144 - 0
pydeepc/utils.py

@@ -0,0 +1,144 @@
+import numpy as np
+from typing import NamedTuple, Tuple, Optional, List, Union
+from cvxpy import Expression, Variable, Problem, Parameter
+from cvxpy.constraints.constraint import Constraint
+from numpy.typing import NDArray
+
+class OptimizationProblemVariables(NamedTuple):
+    """
+    Class used to store all the variables used in the optimization
+    problem
+    """
+    u_ini: Union[Variable, Parameter]
+    y_ini: Union[Variable, Parameter]
+    u: Union[Variable, Parameter]
+    y: Union[Variable, Parameter]
+    g: Union[Variable, Parameter]
+    slack_y: Union[Variable, Parameter]
+    slack_u: Union[Variable, Parameter]
+
+
+class OptimizationProblem(NamedTuple):
+    """
+    Class used to store the elements an optimization problem
+    :param problem_variables:   variables of the opt. problem
+    :param constraints:         constraints of the problem
+    :param objective_function:  objective function
+    :param problem:             optimization problem object
+    """
+    variables: OptimizationProblemVariables
+    constraints: List[Constraint]
+    objective_function: Expression
+    problem: Problem
+
+class Data(NamedTuple):
+    """
+    Tuple that contains input/output data
+    :param u: input data
+    :param y: output data
+    """
+    u: NDArray[np.float64]
+    y: NDArray[np.float64]
+
+
+def create_hankel_matrix(data: NDArray[np.float64], order: int) -> NDArray[np.float64]:
+    """
+    Create an Hankel matrix of order L from a given matrix of size TxM,
+    where M is the number of features and T is the batch size.
+    Note that we need L <= T.
+
+    :param data:  A matrix of data (size TxM). 
+                  T is the batch size and M is the number of features
+    :param order: The order of the Hankel matrix (L)
+    :return:      The Hankel matrix of type np.ndarray
+    """
+    data = np.array(data)
+    
+    assert len(data.shape) == 2, "Data needs to be a matrix"
+
+    T, M = data.shape
+    assert T >= order and order > 0, "The number of data points needs to be larger than the order"
+
+    H = np.zeros((order * M, (T - order + 1)))
+    for idx in range (T - order + 1):
+        H[:, idx] = data[idx:idx+order, :].flatten()
+    return H
+
+def split_data(data: Data, Tini: int, horizon: int, explained_variance: Optional[float] = None) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
+    """
+    Utility function used to split the data into past data and future data.
+    Constructs the Hankel matrix for the input/output data, and uses the first
+    Tini rows to create the past data, and the last 'horizon' rows to create
+    the future data.
+    For more info check eq. (4) in https://arxiv.org/pdf/1811.05890.pdf
+
+    :param data:                A tuple of input/output data. Data should have shape TxM
+                                where T is the batch size and M is the number of features
+    :param Tini:                number of samples needed to estimate initial conditions
+    :param horizon:             horizon
+    :param explained_variance:  Regularization term in (0,1] used to approximate the Hankel matrices.
+                                By default is None (no low-rank approximation is performed).
+    :return:                    Returns Up,Uf,Yp,Yf (see eq. (4) of the original DeePC paper)
+    """
+    assert Tini >= 1, "Tini cannot be lower than 1"
+    assert horizon >= 1, "Horizon cannot be lower than 1"
+    assert explained_variance is None or 0 < explained_variance <= 1, "explained_variance should be in (0,1] or be none"
+ 
+    Mu, My = data.u.shape[1], data.y.shape[1]
+    Hu = create_hankel_matrix(data.u, Tini + horizon)
+    Hy = create_hankel_matrix(data.y, Tini + horizon)
+
+    if explained_variance is not None:
+        Hu = low_rank_matrix_approximation(Hu, explained_var=explained_variance)
+        Hy = low_rank_matrix_approximation(Hy, explained_var=explained_variance)
+
+    Up, Uf = Hu[:Tini * Mu], Hu[-horizon * Mu:]
+    Yp, Yf = Hy[:Tini * My], Hy[-horizon * My:]
+    
+    return Up, Uf, Yp, Yf
+
+def low_rank_matrix_approximation(
+        X:NDArray[np.float64],
+        explained_var: Optional[float] = 0.9,
+        rank: Optional[int] = None,
+        SVD: Optional[Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]] = None,
+        **svd_kwargs) -> NDArray[np.float64]:
+    """
+    Computes an low-rank approximation of a matrix
+
+    Adapted from https://gist.github.com/thearn/5424219
+
+    :param X:               matrix to approximate
+    :param explained_var:   Value in (0,1] used to compute the rank. Describes how close 
+                            the low rank matrix is to the original matrix (in terms of the
+                            singular values). Default value: 0.9
+    :param rank:            rank order. To be used if you want to approximate the matrix by a specific
+                            rank. By default is None. If different than None, then it will override the
+                            explained_var parameter.
+    :param SVD:             If not none, it should be the SVD decomposition (see numpy.linalg.svd) of X
+    :param **svd_kwargs:    additional parameters to be passed to numpy.linalg.svd
+    :return: the low rank approximation of X
+    """
+    assert len(X.shape) == 2, "X must be a matrix"
+    assert isinstance(rank, tuple([int, float])) or isinstance(explained_var, tuple([int, float])), \
+        "You need to specify explained_var or rank!"
+    assert explained_var is None or explained_var <= 1. and explained_var > 0, \
+        "explained_var must be in (0,1]"
+    assert rank is None or (rank >= 1 and rank <= min(X.shape[0], X.shape[1])), \
+        "Rank cannot be lower than 1 or greater than min(num_rows, num_cols)"
+    assert SVD is None or len(SVD) == 3, "SVD must be a tuple of 3 elements"
+
+    u, s, v = np.linalg.svd(X, full_matrices=False, **svd_kwargs) if not SVD else SVD
+
+    if rank is None:
+        s_squared = np.power(s, 2)
+        total_var = np.sum(s_squared)
+        z = np.cumsum(s_squared) / total_var
+        rank = np.argmax(np.logical_or(z > explained_var, np.isclose(z, explained_var)))
+
+    u_low = u[:,:rank]
+    s_low = s[:rank]
+    v_low = v[:rank,:]
+
+    X_low = np.dot(u_low * s_low, v_low)
+    return X_low

+ 16 - 0
setup.py

@@ -0,0 +1,16 @@
+from setuptools import setup, find_packages
+from os import path
+
+
+setup(name = 'PyDeePC',
+    packages=find_packages(),
+    version = '0.3.6',
+    description = 'Python library for Data-Enabled Predictive Control',
+    url = 'https://github.com/rssalessio/PyDeePC',
+    author = 'Alessio Russo',
+    author_email = 'arusso2@bu.edu',
+    install_requires=['numpy', 'scipy', 'cvxpy', 'matplotlib', 'jupyter', 'ecos'],
+    license='MIT',
+    zip_safe=False,
+    python_requires='>=3.7',
+)

+ 10 - 5
test.py

@@ -5,10 +5,15 @@ from scipy.optimize import minimize
 def objective_function(x):
     return (x[0]-0.5)**2 + (x[1]-0.1)**2
 
-# 初始猜测值
-initial_guess = np.array([2, 2])
 
-# 使用minimize函数进行优化
-result = minimize(objective_function, initial_guess)
 
-print(result)
+if __name__=='__main__':
+    # # 初始猜测值
+    # initial_guess = np.array([2, 2])
+    # # 使用minimize函数进行优化
+    # result = minimize(objective_function, initial_guess)
+    # print(result)
+    a=20
+    b=18
+    print(f'Hello-{hex(a)}')
+    print(f'Hello-%x-%4d' %(a,b))

+ 70 - 0
utils.py

@@ -0,0 +1,70 @@
+import numpy as np
+import scipy.signal as scipysig
+from numpy.typing import NDArray
+from typing import Optional
+from pydeepc import Data
+
+class System(object):
+    """
+    Represents a dynamical system that can be simulated
+    """
+    def __init__(self, sys: scipysig.StateSpace, x0: Optional[NDArray[np.float64]] = None, noise_std: float = 0.5):
+        """
+        :param sys: a linear system
+        :param x0: initial state
+        :param noise_std: Standard deviation of the measurement noise
+        """
+        assert x0 is None or sys.A.shape[0] == len(x0), 'Invalid initial condition'
+        self.sys = sys
+        self.x0 = x0 if x0 is not None else np.zeros(sys.A.shape[0])
+        self.u = None
+        self.y = None
+        self.noise_std = noise_std
+
+    def apply_input(self, u: NDArray[np.float64]) -> Data:
+        """
+        Applies an input signal to the system.
+        
+        :param u: input signal. Needs to be of shape T x M, where T is the batch size and M is the number of features
+
+        :return: tuple that contains the (input,output) of the system
+        """
+        T = len(u)
+        if T > 1:
+            # If u is a signal of length > 1 use dlsim for quicker computation
+            t, y, x0 = scipysig.dlsim(self.sys, u, t = np.arange(T) * self.sys.dt, x0 = self.x0)
+            self.x0 = x0[-1]
+        else:
+            y = self.sys.C @ self.x0
+            self.x0 = self.sys.A @ self.x0.flatten() + self.sys.B @ u.flatten()
+
+        y = y + self.noise_std * np.random.normal(size = T).reshape(T, 1)
+
+        self.u = np.vstack([self.u, u]) if self.u is not None else u
+        self.y = np.vstack([self.y, y]) if self.y is not None else y
+        return Data(u, y)
+
+    def get_last_n_samples(self, n: int) -> Data:
+        """
+        Returns the last n samples
+        :param n: integer value
+        """
+        assert self.u.shape[0] >= n, 'Not enough samples are available'
+        return Data(self.u[-n:], self.y[-n:])
+
+    def get_all_samples(self) -> Data:
+        """
+        Returns all samples
+        """
+        return Data(self.u, self.y)
+
+    def reset(self, data_ini: Optional[Data] = None, x0: Optional[NDArray[np.float64]] = None):
+        """
+        Reset initial state and collected data
+        """
+        self.u = None if data_ini is None else data_ini.u
+        self.y = None if data_ini is None else data_ini.y
+        self.x0 = x0 if x0 is not None else np.zeros(self.sys.A.shape[0])
+
+
+