utils.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import numpy as np
  2. from typing import NamedTuple, Tuple, Optional, List, Union
  3. from cvxpy import Expression, Variable, Problem, Parameter
  4. from cvxpy.constraints.constraint import Constraint
  5. from numpy.typing import NDArray
  6. class OptimizationProblemVariables(NamedTuple):
  7. """
  8. Class used to store all the variables used in the optimization
  9. problem
  10. """
  11. u_ini: Union[Variable, Parameter]
  12. y_ini: Union[Variable, Parameter]
  13. u: Union[Variable, Parameter]
  14. y: Union[Variable, Parameter]
  15. g: Union[Variable, Parameter]
  16. slack_y: Union[Variable, Parameter]
  17. slack_u: Union[Variable, Parameter]
  18. class OptimizationProblem(NamedTuple):
  19. """
  20. Class used to store the elements an optimization problem
  21. :param problem_variables: variables of the opt. problem
  22. :param constraints: constraints of the problem
  23. :param objective_function: objective function
  24. :param problem: optimization problem object
  25. """
  26. variables: OptimizationProblemVariables
  27. constraints: List[Constraint]
  28. objective_function: Expression
  29. problem: Problem
  30. class Data(NamedTuple):
  31. """
  32. Tuple that contains input/output data
  33. :param u: input data
  34. :param y: output data
  35. """
  36. u: NDArray[np.float64]
  37. y: NDArray[np.float64]
  38. def create_hankel_matrix(data: NDArray[np.float64], order: int) -> NDArray[np.float64]:
  39. """
  40. Create an Hankel matrix of order L from a given matrix of size TxM,
  41. where M is the number of features and T is the batch size.
  42. Note that we need L <= T.
  43. :param data: A matrix of data (size TxM).
  44. T is the batch size and M is the number of features
  45. :param order: The order of the Hankel matrix (L)
  46. :return: The Hankel matrix of type np.ndarray
  47. """
  48. data = np.array(data)
  49. assert len(data.shape) == 2, "Data needs to be a matrix"
  50. T, M = data.shape
  51. assert T >= order and order > 0, "The number of data points needs to be larger than the order"
  52. H = np.zeros((order * M, (T - order + 1)))
  53. for idx in range (T - order + 1):
  54. H[:, idx] = data[idx:idx+order, :].flatten()
  55. return H
  56. def split_data(data: Data, Tini: int, horizon: int, explained_variance: Optional[float] = None) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
  57. """
  58. Utility function used to split the data into past data and future data.
  59. Constructs the Hankel matrix for the input/output data, and uses the first
  60. Tini rows to create the past data, and the last 'horizon' rows to create
  61. the future data.
  62. For more info check eq. (4) in https://arxiv.org/pdf/1811.05890.pdf
  63. :param data: A tuple of input/output data. Data should have shape TxM
  64. where T is the batch size and M is the number of features
  65. :param Tini: number of samples needed to estimate initial conditions
  66. :param horizon: horizon
  67. :param explained_variance: Regularization term in (0,1] used to approximate the Hankel matrices.
  68. By default is None (no low-rank approximation is performed).
  69. :return: Returns Up,Uf,Yp,Yf (see eq. (4) of the original DeePC paper)
  70. """
  71. assert Tini >= 1, "Tini cannot be lower than 1"
  72. assert horizon >= 1, "Horizon cannot be lower than 1"
  73. assert explained_variance is None or 0 < explained_variance <= 1, "explained_variance should be in (0,1] or be none"
  74. Mu, My = data.u.shape[1], data.y.shape[1]
  75. Hu = create_hankel_matrix(data.u, Tini + horizon)
  76. Hy = create_hankel_matrix(data.y, Tini + horizon)
  77. if explained_variance is not None:
  78. Hu = low_rank_matrix_approximation(Hu, explained_var=explained_variance)
  79. Hy = low_rank_matrix_approximation(Hy, explained_var=explained_variance)
  80. Up, Uf = Hu[:Tini * Mu], Hu[-horizon * Mu:]
  81. Yp, Yf = Hy[:Tini * My], Hy[-horizon * My:]
  82. return Up, Uf, Yp, Yf
  83. def low_rank_matrix_approximation(
  84. X:NDArray[np.float64],
  85. explained_var: Optional[float] = 0.9,
  86. rank: Optional[int] = None,
  87. SVD: Optional[Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]] = None,
  88. **svd_kwargs) -> NDArray[np.float64]:
  89. """
  90. Computes an low-rank approximation of a matrix
  91. Adapted from https://gist.github.com/thearn/5424219
  92. :param X: matrix to approximate
  93. :param explained_var: Value in (0,1] used to compute the rank. Describes how close
  94. the low rank matrix is to the original matrix (in terms of the
  95. singular values). Default value: 0.9
  96. :param rank: rank order. To be used if you want to approximate the matrix by a specific
  97. rank. By default is None. If different than None, then it will override the
  98. explained_var parameter.
  99. :param SVD: If not none, it should be the SVD decomposition (see numpy.linalg.svd) of X
  100. :param **svd_kwargs: additional parameters to be passed to numpy.linalg.svd
  101. :return: the low rank approximation of X
  102. """
  103. assert len(X.shape) == 2, "X must be a matrix"
  104. assert isinstance(rank, tuple([int, float])) or isinstance(explained_var, tuple([int, float])), \
  105. "You need to specify explained_var or rank!"
  106. assert explained_var is None or explained_var <= 1. and explained_var > 0, \
  107. "explained_var must be in (0,1]"
  108. assert rank is None or (rank >= 1 and rank <= min(X.shape[0], X.shape[1])), \
  109. "Rank cannot be lower than 1 or greater than min(num_rows, num_cols)"
  110. assert SVD is None or len(SVD) == 3, "SVD must be a tuple of 3 elements"
  111. u, s, v = np.linalg.svd(X, full_matrices=False, **svd_kwargs) if not SVD else SVD
  112. if rank is None:
  113. s_squared = np.power(s, 2)
  114. total_var = np.sum(s_squared)
  115. z = np.cumsum(s_squared) / total_var
  116. rank = np.argmax(np.logical_or(z > explained_var, np.isclose(z, explained_var)))
  117. u_low = u[:,:rank]
  118. s_low = s[:rank]
  119. v_low = v[:rank,:]
  120. X_low = np.dot(u_low * s_low, v_low)
  121. return X_low