deepc.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import math
  2. import numpy as np
  3. import cvxpy as cp
  4. from numpy.typing import NDArray
  5. from typing import Tuple, Callable, List, Optional, Union, Dict
  6. from cvxpy.expressions.expression import Expression
  7. from cvxpy.constraints.constraint import Constraint
  8. from pydeepc.utils import (
  9. Data,
  10. split_data,
  11. low_rank_matrix_approximation,
  12. OptimizationProblem,
  13. OptimizationProblemVariables)
  14. class DeePC(object):
  15. optimization_problem: OptimizationProblem = None
  16. _SMALL_NUMBER: float = 1e-32
  17. def __init__(self, data: Data, Tini: int, horizon: int, explained_variance: Optional[float] = None):
  18. """
  19. Solves the DeePC optimization problem
  20. For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
  21. :param data: A tuple of input/output data. Data should have shape TxM
  22. where T is the batch size and M is the number of features
  23. :param Tini: number of samples needed to estimate initial conditions
  24. :param horizon: horizon length
  25. :param explained_variance: Regularization term in (0,1] used to approximate the Hankel matrices.
  26. By default is None (no low-rank approximation is performed).
  27. """
  28. assert explained_variance is None or 0 < explained_variance <= 1, "explained_variance should be in (0,1] or be none"
  29. self.Tini = Tini
  30. self.horizon = horizon
  31. self.explained_variance = explained_variance
  32. self.update_data(data)
  33. self.optimization_problem = None
  34. def update_data(self, data: Data):
  35. """
  36. Update Hankel matrices of DeePC. You need to rebuild the optimization problem
  37. after calling this funciton.
  38. :param data: A tuple of input/output data. Data should have shape TxM
  39. where T is the batch size and M is the number of features
  40. """
  41. assert len(data.u.shape) == 2, \
  42. "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
  43. assert len(data.y.shape) == 2, \
  44. "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
  45. assert data.y.shape[0] == data.u.shape[0], \
  46. "Input/output data must have the same length"
  47. assert data.y.shape[0] - self.Tini - self.horizon + 1 >= 1, \
  48. f"There is not enough data: this value {data.y.shape[0] - self.Tini - self.horizon + 1} needs to be >= 1"
  49. Up, Uf, Yp, Yf = split_data(data, self.Tini, self.horizon, self.explained_variance)
  50. self.Up = Up
  51. self.Uf = Uf
  52. self.Yp = Yp
  53. self.Yf = Yf
  54. self.M = data.u.shape[1]
  55. self.P = data.y.shape[1]
  56. self.T = data.u.shape[0]
  57. self.optimization_problem = None
  58. def build_problem(self,
  59. build_loss: Callable[[cp.Variable, cp.Variable], Expression],
  60. build_constraints: Optional[Callable[[cp.Variable, cp.Variable], Optional[List[Constraint]]]] = None,
  61. lambda_g: float = 0.,
  62. lambda_y: float = 0.,
  63. lambda_u: float= 0.,
  64. lambda_proj: float = 0.) -> OptimizationProblem:
  65. """
  66. Builds the DeePC optimization problem
  67. For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
  68. For info on the projection (least-square) regularizer, see also
  69. https://arxiv.org/pdf/2101.01273.pdf
  70. :param build_loss: Callback function that takes as input an (input,output) tuple of data
  71. of shape (TxM), where T is the horizon length and M is the feature size
  72. The callback should return a scalar value of type Expression
  73. :param build_constraints: Callback function that takes as input an (input,output) tuple of data
  74. of shape (TxM), where T is the horizon length and M is the feature size
  75. The callback should return a list of constraints.
  76. :param lambda_g: non-negative scalar. Regularization factor for g. Used for
  77. stochastic/non-linear systems.
  78. :param lambda_y: non-negative scalar. Regularization factor for y_init. Used for
  79. stochastic/non-linear systems.
  80. :param lambda_u: non-negative scalar. Regularization factor for u_init. Used for
  81. stochastic/non-linear systems.
  82. :param lambda_proj: Positive term that penalizes the least square solution.
  83. :return: Parameters of the optimization problem
  84. """
  85. assert build_loss is not None, "Loss function callback cannot be none"
  86. assert lambda_g >= 0 and lambda_y >= 0, "Regularizers must be non-negative"
  87. assert lambda_u >= 0, "Regularizer of u_init must be non-negative"
  88. assert lambda_proj >= 0, "The projection regularizer must be non-negative"
  89. self.optimization_problem = False
  90. # Build variables
  91. uini = cp.Parameter(shape=(self.M * self.Tini), name='u_ini')
  92. yini = cp.Parameter(shape=(self.P * self.Tini), name='y_ini')
  93. u = cp.Variable(shape=(self.M * self.horizon), name='u')
  94. y = cp.Variable(shape=(self.P * self.horizon), name='y')
  95. g = cp.Variable(shape=(self.T - self.Tini - self.horizon + 1), name='g')
  96. slack_y = cp.Variable(shape=(self.Tini * self.P), name='slack_y')
  97. slack_u = cp.Variable(shape=(self.Tini * self.M), name='slack_u')
  98. Up, Yp, Uf, Yf = self.Up, self.Yp, self.Uf, self.Yf
  99. if lambda_proj > DeePC._SMALL_NUMBER:
  100. # Compute projection matrix (for the least square solution)
  101. Zp = np.vstack([Up, Yp, Uf])
  102. ZpInv = np.linalg.pinv(Zp)
  103. I = np.eye(self.T - self.Tini - self.horizon + 1)
  104. # Kernel orthogonal projector
  105. I_min_P = I - (ZpInv@ Zp)
  106. A = np.vstack([Up, Yp, Uf, Yf])
  107. b = cp.hstack([uini + slack_u, yini + slack_y, u, y])
  108. # Build constraints
  109. constraints = [A @ g == b]
  110. if math.isclose(lambda_y, 0):
  111. constraints.append(cp.norm(slack_y, 2) <= DeePC._SMALL_NUMBER)
  112. if math.isclose(lambda_u, 0):
  113. constraints.append(cp.norm(slack_u, 2) <= DeePC._SMALL_NUMBER)
  114. # u, y = self.Uf @ g, self.Yf @ g
  115. u = cp.reshape(u, (self.horizon, self.M), order = 'C')
  116. y = cp.reshape(y, (self.horizon, self.P), order = 'C')
  117. _constraints = build_constraints(u, y) if build_constraints is not None else (None, None)
  118. for idx, constraint in enumerate(_constraints):
  119. if constraint is None or not isinstance(constraint, Constraint) or not constraint.is_dcp():
  120. raise Exception(f'Constraint {idx} is not defined or is not convex.')
  121. constraints.extend([] if _constraints is None else _constraints)
  122. # Build loss
  123. _loss = build_loss(u, y)
  124. if _loss is None or not isinstance(_loss, Expression) or not _loss.is_dcp():
  125. raise Exception('Loss function is not defined or is not convex!')
  126. # Add regularizers
  127. _regularizers = lambda_g * cp.norm(g, p=1) if lambda_g > DeePC._SMALL_NUMBER else 0
  128. _regularizers += lambda_y * cp.norm(slack_y, p=1) if lambda_y > DeePC._SMALL_NUMBER else 0
  129. _regularizers += lambda_proj * cp.norm(I_min_P @ g) if lambda_proj > DeePC._SMALL_NUMBER else 0
  130. _regularizers += lambda_u * cp.norm(slack_u, p=1) if lambda_u > DeePC._SMALL_NUMBER else 0
  131. problem_loss = _loss + _regularizers
  132. # Solve problem
  133. objective = cp.Minimize(problem_loss)
  134. try:
  135. problem = cp.Problem(objective, constraints)
  136. except cp.SolverError as e:
  137. raise Exception(f'Error while constructing the DeePC problem. Details: {e}')
  138. self.optimization_problem = OptimizationProblem(
  139. variables = OptimizationProblemVariables(
  140. u_ini = uini, y_ini = yini, u = u, y = y, g = g, slack_y = slack_y, slack_u = slack_u),
  141. constraints = constraints,
  142. objective_function = problem_loss,
  143. problem = problem
  144. )
  145. return self.optimization_problem
  146. def solve(
  147. self,
  148. data_ini: Data,
  149. **cvxpy_kwargs
  150. ) -> Tuple[NDArray[np.float64], Dict[str, Union[float, NDArray[np.float64], OptimizationProblemVariables]]]:
  151. """
  152. Solves the DeePC optimization problem
  153. For more info check alg. 2 in https://arxiv.org/pdf/1811.05890.pdf
  154. :param data_ini: A tuple of input/output data used to estimate initial condition.
  155. Data should have shape Tini x M where Tini is the batch size and
  156. M is the number of features
  157. :param cvxpy_kwargs: All arguments that need to be passed to the cvxpy solve method.
  158. :return u_optimal: Optimal input signal to be applied to the system, of length `horizon`
  159. :return info: A dictionary with 5 keys:
  160. info['variables']: variables of the optimization problem
  161. info['value']: value of the optimization problem
  162. info['u_optimal']: the same as the first value returned by this function
  163. """
  164. assert len(data_ini.u.shape) == 2, "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
  165. assert len(data_ini.y.shape) == 2, "Data needs to be shaped as a TxM matrix (T is the number of samples and M is the number of features)"
  166. assert data_ini.u.shape[1] == self.M, "Incorrect number of features for the input signal"
  167. assert data_ini.y.shape[1] == self.P, "Incorrect number of features for the output signal"
  168. assert data_ini.y.shape[0] == data_ini.u.shape[0], "Input/output data must have the same length"
  169. assert data_ini.y.shape[0] == self.Tini, f"Invalid size"
  170. assert self.optimization_problem is not None, "Problem was not built"
  171. # Need to transpose to make sure that time is over the columns, and features over the rows
  172. uini, yini = data_ini.u[:self.Tini].flatten(), data_ini.y[:self.Tini].flatten()
  173. self.optimization_problem.variables.u_ini.value = uini
  174. self.optimization_problem.variables.y_ini.value = yini
  175. try:
  176. result = self.optimization_problem.problem.solve(**cvxpy_kwargs)
  177. except cp.SolverError as e:
  178. raise Exception(f'Error while solving the DeePC problem. Details: {e}')
  179. if np.isinf(result):
  180. raise Exception('Problem is unbounded')
  181. u_optimal = (self.Uf @ self.optimization_problem.variables.g.value).reshape(self.horizon, self.M)
  182. info = {
  183. 'value': result,
  184. 'variables': self.optimization_problem.variables,
  185. 'u_optimal': u_optimal
  186. }
  187. return u_optimal, info