123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import math
- import numpy as np
- import cvxpy as cp
- from numpy.typing import NDArray
- from typing import Tuple, Callable, List, Optional, Union, Dict
- from cvxpy.expressions.expression import Expression
- from cvxpy.constraints.constraint import Constraint
- from pydeepc.utils import (
- Data,
- split_data,
- low_rank_matrix_approximation,
- OptimizationProblem,
- OptimizationProblemVariables)
- class DeePC(object):
- optimization_problem: OptimizationProblem = None
- _SMALL_NUMBER: float = 1e-32
- def __init__(self, data: Data, Tini: int, horizon: int, explained_variance: Optional[float] = None):
- """
- Solves the DeePC optimization problem
- For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
- :param data: A tuple of input/output data. Data should have shape TxM
- where T is the batch size and M is the number of features
- :param Tini: number of samples needed to estimate initial conditions
- :param horizon: horizon length
- :param explained_variance: Regularization term in (0,1] used to approximate the Hankel matrices.
- By default is None (no low-rank approximation is performed).
- """
- assert explained_variance is None or 0 < explained_variance <= 1, "explained_variance should be in (0,1] or be none"
- self.Tini = Tini
- self.horizon = horizon
- self.explained_variance = explained_variance
- self.update_data(data)
- self.optimization_problem = None
- def update_data(self, data: Data):
- """
- Update Hankel matrices of DeePC. You need to rebuild the optimization problem
- after calling this funciton.
- :param data: A tuple of input/output data. Data should have shape TxM
- where T is the batch size and M is the number of features
- """
- assert len(data.u.shape) == 2, \
- "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
- assert len(data.y.shape) == 2, \
- "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
- assert data.y.shape[0] == data.u.shape[0], \
- "Input/output data must have the same length"
- assert data.y.shape[0] - self.Tini - self.horizon + 1 >= 1, \
- f"There is not enough data: this value {data.y.shape[0] - self.Tini - self.horizon + 1} needs to be >= 1"
-
- Up, Uf, Yp, Yf = split_data(data, self.Tini, self.horizon, self.explained_variance)
- self.Up = Up
- self.Uf = Uf
- self.Yp = Yp
- self.Yf = Yf
-
- self.M = data.u.shape[1]
- self.P = data.y.shape[1]
- self.T = data.u.shape[0]
- self.optimization_problem = None
- def build_problem(self,
- build_loss: Callable[[cp.Variable, cp.Variable], Expression],
- build_constraints: Optional[Callable[[cp.Variable, cp.Variable], Optional[List[Constraint]]]] = None,
- lambda_g: float = 0.,
- lambda_y: float = 0.,
- lambda_u: float= 0.,
- lambda_proj: float = 0.) -> OptimizationProblem:
- """
- Builds the DeePC optimization problem
- For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
- For info on the projection (least-square) regularizer, see also
- https://arxiv.org/pdf/2101.01273.pdf
- :param build_loss: Callback function that takes as input an (input,output) tuple of data
- of shape (TxM), where T is the horizon length and M is the feature size
- The callback should return a scalar value of type Expression
- :param build_constraints: Callback function that takes as input an (input,output) tuple of data
- of shape (TxM), where T is the horizon length and M is the feature size
- The callback should return a list of constraints.
- :param lambda_g: non-negative scalar. Regularization factor for g. Used for
- stochastic/non-linear systems.
- :param lambda_y: non-negative scalar. Regularization factor for y_init. Used for
- stochastic/non-linear systems.
- :param lambda_u: non-negative scalar. Regularization factor for u_init. Used for
- stochastic/non-linear systems.
- :param lambda_proj: Positive term that penalizes the least square solution.
- :return: Parameters of the optimization problem
- """
- assert build_loss is not None, "Loss function callback cannot be none"
- assert lambda_g >= 0 and lambda_y >= 0, "Regularizers must be non-negative"
- assert lambda_u >= 0, "Regularizer of u_init must be non-negative"
- assert lambda_proj >= 0, "The projection regularizer must be non-negative"
- self.optimization_problem = False
- # Build variables
- uini = cp.Parameter(shape=(self.M * self.Tini), name='u_ini')
- yini = cp.Parameter(shape=(self.P * self.Tini), name='y_ini')
- u = cp.Variable(shape=(self.M * self.horizon), name='u')
- y = cp.Variable(shape=(self.P * self.horizon), name='y')
- g = cp.Variable(shape=(self.T - self.Tini - self.horizon + 1), name='g')
- slack_y = cp.Variable(shape=(self.Tini * self.P), name='slack_y')
- slack_u = cp.Variable(shape=(self.Tini * self.M), name='slack_u')
- Up, Yp, Uf, Yf = self.Up, self.Yp, self.Uf, self.Yf
- if lambda_proj > DeePC._SMALL_NUMBER:
- # Compute projection matrix (for the least square solution)
- Zp = np.vstack([Up, Yp, Uf])
- ZpInv = np.linalg.pinv(Zp)
- I = np.eye(self.T - self.Tini - self.horizon + 1)
- # Kernel orthogonal projector
- I_min_P = I - (ZpInv@ Zp)
- A = np.vstack([Up, Yp, Uf, Yf])
- b = cp.hstack([uini + slack_u, yini + slack_y, u, y])
- # Build constraints
- constraints = [A @ g == b]
- if math.isclose(lambda_y, 0):
- constraints.append(cp.norm(slack_y, 2) <= DeePC._SMALL_NUMBER)
- if math.isclose(lambda_u, 0):
- constraints.append(cp.norm(slack_u, 2) <= DeePC._SMALL_NUMBER)
- # u, y = self.Uf @ g, self.Yf @ g
- u = cp.reshape(u, (self.horizon, self.M), order = 'C')
- y = cp.reshape(y, (self.horizon, self.P), order = 'C')
- _constraints = build_constraints(u, y) if build_constraints is not None else (None, None)
- for idx, constraint in enumerate(_constraints):
- if constraint is None or not isinstance(constraint, Constraint) or not constraint.is_dcp():
- raise Exception(f'Constraint {idx} is not defined or is not convex.')
- constraints.extend([] if _constraints is None else _constraints)
- # Build loss
- _loss = build_loss(u, y)
-
- if _loss is None or not isinstance(_loss, Expression) or not _loss.is_dcp():
- raise Exception('Loss function is not defined or is not convex!')
- # Add regularizers
- _regularizers = lambda_g * cp.norm(g, p=1) if lambda_g > DeePC._SMALL_NUMBER else 0
- _regularizers += lambda_y * cp.norm(slack_y, p=1) if lambda_y > DeePC._SMALL_NUMBER else 0
- _regularizers += lambda_proj * cp.norm(I_min_P @ g) if lambda_proj > DeePC._SMALL_NUMBER else 0
- _regularizers += lambda_u * cp.norm(slack_u, p=1) if lambda_u > DeePC._SMALL_NUMBER else 0
- problem_loss = _loss + _regularizers
- # Solve problem
- objective = cp.Minimize(problem_loss)
- try:
- problem = cp.Problem(objective, constraints)
- except cp.SolverError as e:
- raise Exception(f'Error while constructing the DeePC problem. Details: {e}')
- self.optimization_problem = OptimizationProblem(
- variables = OptimizationProblemVariables(
- u_ini = uini, y_ini = yini, u = u, y = y, g = g, slack_y = slack_y, slack_u = slack_u),
- constraints = constraints,
- objective_function = problem_loss,
- problem = problem
- )
- return self.optimization_problem
- def solve(
- self,
- data_ini: Data,
- **cvxpy_kwargs
- ) -> Tuple[NDArray[np.float64], Dict[str, Union[float, NDArray[np.float64], OptimizationProblemVariables]]]:
- """
- Solves the DeePC optimization problem
- For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
- :param data_ini: A tuple of input/output data used to estimate initial condition.
- Data should have shape Tini x M where Tini is the batch size and
- M is the number of features
- :param cvxpy_kwargs: All arguments that need to be passed to the cvxpy solve method.
- :return u_optimal: Optimal input signal to be applied to the system, of length `horizon`
- :return info: A dictionary with 5 keys:
- info['variables']: variables of the optimization problem
- info['value']: value of the optimization problem
- info['u_optimal']: the same as the first value returned by this function
- """
- assert len(data_ini.u.shape) == 2, "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
- assert len(data_ini.y.shape) == 2, "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
- assert data_ini.u.shape[1] == self.M, "Incorrect number of features for the input signal"
- assert data_ini.y.shape[1] == self.P, "Incorrect number of features for the output signal"
- assert data_ini.y.shape[0] == data_ini.u.shape[0], "Input/output data must have the same length"
- assert data_ini.y.shape[0] == self.Tini, f"Invalid size"
- assert self.optimization_problem is not None, "Problem was not built"
- # Need to transpose to make sure that time is over the columns, and features over the rows
- uini, yini = data_ini.u[:self.Tini].flatten(), data_ini.y[:self.Tini].flatten()
- self.optimization_problem.variables.u_ini.value = uini
- self.optimization_problem.variables.y_ini.value = yini
- try:
- result = self.optimization_problem.problem.solve(**cvxpy_kwargs)
- except cp.SolverError as e:
- raise Exception(f'Error while solving the DeePC problem. Details: {e}')
- if np.isinf(result):
- raise Exception('Problem is unbounded')
- u_optimal = (self.Uf @ self.optimization_problem.variables.g.value).reshape(self.horizon, self.M)
- info = {
- 'value': result,
- 'variables': self.optimization_problem.variables,
- 'u_optimal': u_optimal
- }
- return u_optimal, info
|