From 3dcad07abdfe694fc6fbe480a3cab574e0abf3e9 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Thu, 4 Aug 2022 08:13:16 -0700 Subject: [PATCH 1/7] initial bspline implementation --- control/flatsys/__init__.py | 1 + control/flatsys/basis.py | 5 + control/flatsys/bspline.py | 198 ++++++++++++++++++++++++++++++++++ control/flatsys/flatsys.py | 4 + control/tests/bspline_test.py | 181 +++++++++++++++++++++++++++++++ control/tests/flatsys_test.py | 15 +-- 6 files changed, 398 insertions(+), 6 deletions(-) create mode 100644 control/flatsys/bspline.py create mode 100644 control/tests/bspline_test.py diff --git a/control/flatsys/__init__.py b/control/flatsys/__init__.py index 0926fa81a..6e8eac030 100644 --- a/control/flatsys/__init__.py +++ b/control/flatsys/__init__.py @@ -54,6 +54,7 @@ from .basis import BasisFamily from .poly import PolyFamily from .bezier import BezierFamily +from .bspline import BSplineFamily # Classes from .systraj import SystemTrajectory diff --git a/control/flatsys/basis.py b/control/flatsys/basis.py index 1ea957f52..a0986bd61 100644 --- a/control/flatsys/basis.py +++ b/control/flatsys/basis.py @@ -61,5 +61,10 @@ def __call__(self, i, t): """Evaluate the ith basis function at a point in time""" return self.eval_deriv(i, 0, t) + def eval(self, coeffs, tlist): + return [ + sum([coeffs[i] * self(i, t) for i in range(self.N)]) + for t in tlist] + def eval_deriv(self, i, j, t): raise NotImplementedError("Internal error; improper basis functions") diff --git a/control/flatsys/bspline.py b/control/flatsys/bspline.py new file mode 100644 index 000000000..5e3b98cb5 --- /dev/null +++ b/control/flatsys/bspline.py @@ -0,0 +1,198 @@ +# bspline.py - B-spline basis functions +# RMM, 2 Aug 2022 +# +# This class implements a set of B-spline basis functions that implement a +# piecewise polynomial at a set of breakpoints t0, ..., tn with given orders +# and smoothness. +# + +import numpy as np +from .basis import BasisFamily +from scipy.interpolate import BSpline, splev + +class BSplineFamily(BasisFamily): + """B-spline basis functions. + + This class represents a B-spline basis for piecewise polynomials defined + across a set of breakpoints with given order and smoothness. + + """ + def __init__(self, breakpoints, degree, smoothness=None, vars=1): + """Create a B-spline basis for piecewise smooth polynomials + + Define B-spline polynomials for a set of one or more variables. + B-splines are characterized by a set of intervals separated by break + points. On each interval we have a polynomial of a certain order + and the spline is continuous up to a given smoothness at interior + break points. + + Parameters + ---------- + breakpoints : 1D array or 2D array of float + The breakpoints for the spline(s). + + degree : int or list of ints + For each spline variable, the degree of the polynomial between + break points. If a single number is given and more than one + spline variable is specified, the same order is used for each + spline variable. + + smoothness : int or list of ints + For each spline variable, the smoothness at breakpoints (number + of derivatives that should match). + + vars : int or list of str, option + The number of spline variables or a list of spline variable names. + + """ + # Process the breakpoints for the spline */ + breakpoints = np.array(breakpoints, dtype=float) + if breakpoints.ndim == 2: + raise NotImplementedError( + "breakpoints for each spline variable not yet supported") + elif breakpoints.ndim != 1: + raise ValueError("breakpoints must be convertable to a 1D array") + elif breakpoints.size < 2: + raise ValueError("break point vector must have at least 2 values") + elif np.any(np.diff(breakpoints) <= 0): + raise ValueError("break points must be strictly increasing values") + + # Decide on the number of spline variables + if isinstance(vars, list) and all([isinstance(v, str) for v in vars]): + raise NotImplemented("list of variable names not yet supported") + elif not isinstance(vars, int): + raise TypeError("vars must be an integer or list of strings") + else: + nvars = vars + + # + # Process B-spline parameters (order, smoothness) + # + # B-splines are characterized by a set of intervals separated by + # breakpoints. On each interval we have a polynomial of a certain + # order and the spline is continuous up to a given smoothness at + # breakpoints. The code in this section allows some flexibility in + # the way that all of this information is supplied, including using + # scalar values for parameters (which are then broadcast to each + # output) and inferring values and dimensions from other + # information, when possible. + # + + # Utility function for broadcasting spline params (order, smoothness) + def process_spline_parameters( + values, length, allowed_types, minimum=0, + default=None, name='unknown'): + + # Preprocessing + if values is None and default is None: + return None + elif values is None: + values = default + elif isinstance(values, np.ndarray): + # Convert ndarray to list + values = values.tolist() + + # Figure out what type of object we were passed + if isinstance(values, allowed_types): + # Single number of an allowed type => broadcast to list + values = [values for i in range(length)] + elif all([isinstance(v, allowed_types) for v in values]): + # List of values => make sure it is the right size + if len(values) != length: + raise ValueError(f"length of '{name}' does not match n") + else: + raise ValueError(f"could not parse '{name}' keyword") + + # Check to make sure the values are OK + if values is not None and any([val < minimum for val in values]): + raise ValueError( + f"invalid value for {name}; must be at least {minimum}") + + return values + + # Degree of polynomial + degree = process_spline_parameters( + degree, nvars, (int), name='degree', minimum=1) + + # Smoothness at breakpoints; set default to degree - 1 (max possible) + smoothness = process_spline_parameters( + smoothness, nvars, (int), name='smoothness', minimum=0, + default=[d - 1 for d in degree]) + + # Make sure degree is sufficent for the level of smoothness + if any([degree[i] - smoothness[i] < 1 for i in range(nvars)]): + raise ValueError("degree must be greater than smoothness") + + # Store the parameters and process them in call_ntg() + self.nvars = nvars + self.breakpoints = breakpoints + self.degree = degree + self.smoothness = smoothness + self.nintervals = breakpoints.size - 1 + + # + # Compute parameters for a SciPy BSpline object + # + # To create a B-spline, we need to compute the knot points, keeping + # track of the use of repeated knot points at the initial knot and + # final knot as well as repeated knots at intermediate points + # depending on the desired smoothness. + # + + # Store the coefficients for each output (useful later) + self.coef_offset, self.coef_length, offset = [], [], 0 + for i in range(self.nvars): + # Compute number of coefficients for the piecewise polynomial + ncoefs = (self.degree[i] + 1) * (len(self.breakpoints) - 1) - \ + (self.smoothness[i] + 1) * (len(self.breakpoints) - 2) + + self.coef_offset.append(offset) + self.coef_length.append(ncoefs) + offset += ncoefs + self.N = offset # save the total number of coefficients + + # Create knot points for each spline variable + # TODO: extend to multi-dimensional breakpoints + self.knotpoints = [] + for i in range(self.nvars): + # Allocate space for the knotpoints + self.knotpoints.append(np.empty( + (self.degree[i] + 1) + (len(self.breakpoints) - 2) * \ + (self.degree[i] - self.smoothness[i]) + (self.degree[i] + 1))) + + # Initial knot points + self.knotpoints[i][0:self.degree[i] + 1] = self.breakpoints[0] + offset = self.degree[i] + 1 + + # Interior knot points + nknots = self.degree[i] - self.smoothness[i] + assert nknots > 0 # just in case + for j in range(1, self.breakpoints.size - 1): + self.knotpoints[i][offset:offset+nknots] = self.breakpoints[j] + offset += nknots + + # Final knot point + self.knotpoints[i][offset:offset + self.degree[i] + 1] = \ + self.breakpoints[-1] + + def eval(self, coefs, tlist): + return np.array([ + BSpline(self.knotpoints[i], + coefs[self.coef_offset[i]: + self.coef_offset[i] + self.coef_length[i]], + self.degree[i])(tlist) + for i in range(self.nvars)]) + + # Compute the kth derivative of the ith basis function at time t + def eval_deriv(self, i, k, t, squeeze=True): + """Evaluate the kth derivative of the ith basis function at time t.""" + if self.nvars > 1 or not squeeze: + raise NotImplementedError( + "derivatives of multi-variable splines not yet supported") + + # Create a coefficient vector for this spline + coefs = np.zeros(self.coef_length[0]); coefs[i] = 1 + + # Evaluate the derivative of the spline at the desired point in time + return BSpline(self.knotpoints[0], coefs, + self.degree[0]).derivative(k)(t) diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index c01eb9127..aa701af4b 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -413,7 +413,10 @@ def point_to_point( # # Start by solving the least squares problem + # TODO: add warning if rank is too small alpha, residuals, rank, s = np.linalg.lstsq(M, Z, rcond=None) + if rank < Z.size: + warnings.warn("basis too small; solution may not exist") if cost is not None or trajectory_constraints is not None: # Search over the null space to minimize cost/satisfy constraints @@ -425,6 +428,7 @@ def traj_cost(null_coeffs): coeffs = alpha + N @ null_coeffs # Evaluate the costs at the listed time points + # TODO: store Mt ahead of time, since it doesn't change costval = 0 for t in timepts: M_t = _basis_flag_matrix(sys, basis, zflag_T0, t) diff --git a/control/tests/bspline_test.py b/control/tests/bspline_test.py new file mode 100644 index 000000000..b00f856dc --- /dev/null +++ b/control/tests/bspline_test.py @@ -0,0 +1,181 @@ +"""bspline_test.py - test bsplines and their use in flat system + +RMM, 2 Aug 2022 + +This test suite checks to make sure that the bspline basic functions +supporting differential flat systetms are functioning. It doesn't do +exhaustive testing of operations on flat systems. Separate unit tests +should be created for that purpose. + +""" + +import numpy as np +import pytest +import scipy as sp + +import control as ct +import control.flatsys as fs +import control.optimal as opt + +def test_bspline_basis(): + Tf = 10 + degree = 5 + maxderiv = 4 + bspline = fs.BSplineFamily([0, Tf/3, Tf/2, Tf], degree, maxderiv) + time = np.linspace(0, Tf, 100) + + # Make sure that the knotpoint vector looks right + np.testing.assert_equal( + bspline.knotpoints, + [np.array([0, 0, 0, 0, 0, 0, + Tf/3, Tf/2, + Tf, Tf, Tf, Tf, Tf, Tf])]) + + # Repeat with default smoothness + bspline = fs.BSplineFamily([0, Tf/3, Tf/2, Tf], degree) + np.testing.assert_equal( + bspline.knotpoints, + [np.array([0, 0, 0, 0, 0, 0, + Tf/3, Tf/2, + Tf, Tf, Tf, Tf, Tf, Tf])]) + + # Sum of the B-spline curves should be one + np.testing.assert_almost_equal( + 1, sum([bspline(i, time) for i in range(bspline.N)])) + + # Sum of derivatives should be zero + for k in range(1, maxderiv): + np.testing.assert_almost_equal( + 0, sum([bspline.eval_deriv(i, k, time) + for i in range(0, bspline.N)])) + + # Make sure that the second derivative integrates to the first + time = np.linspace(0, Tf, 1000) + dt = time[1] - time[0] + for i in range(bspline.N): + for j in range(1, maxderiv): + np.testing.assert_allclose( + np.diff(bspline.eval_deriv(i, j-1, time)) / dt, + bspline.eval_deriv(i, j, time)[0:-1], + atol=0.01, rtol=0.01) + + # Exception check + with pytest.raises(IndexError, match="out of bounds"): + bspline.eval_deriv(bspline.N, 0, time) + + +@pytest.mark.parametrize( + "xf, uf, Tf", + [([1, 0], [0], 2), + ([0, 1], [0], 3), + ([1, 1], [1], 4)]) +def test_double_integrator(xf, uf, Tf): + # Define a second order integrator + sys = ct.StateSpace([[-1, 1], [0, -2]], [[0], [1]], [[1, 0]], 0) + flatsys = fs.LinearFlatSystem(sys) + + # Define the basis set + bspline = fs.BSplineFamily([0, Tf/2, Tf], 4, 2) + + x0, u0, = [0, 0], [0] + traj = fs.point_to_point(flatsys, Tf, x0, u0, xf, uf, basis=bspline) + + # Verify that the trajectory computation is correct + x, u = traj.eval([0, Tf]) + np.testing.assert_array_almost_equal(x0, x[:, 0]) + np.testing.assert_array_almost_equal(u0, u[:, 0]) + np.testing.assert_array_almost_equal(xf, x[:, 1]) + np.testing.assert_array_almost_equal(uf, u[:, 1]) + + # Simulate the system and make sure we stay close to desired traj + T = np.linspace(0, Tf, 200) + xd, ud = traj.eval(T) + + t, y, x = ct.forced_response(sys, T, ud, x0, return_x=True) + np.testing.assert_array_almost_equal(x, xd, decimal=3) + + # Multi-dimensional splines not yet implemented + bspline = fs.BSplineFamily([0, Tf/3, Tf/2, Tf], [4, 5], [2, 3], vars=2) + with pytest.raises(NotImplementedError, match="not yet supported"): + fs.point_to_point(flatsys, Tf, x0, u0, xf, uf, basis=bspline) + + +# Bicycle model +def vehicle_flat_forward(x, u, params={}): + b = params.get('wheelbase', 3.) # get parameter values + zflag = [np.zeros(3), np.zeros(3)] # list for flag arrays + zflag[0][0] = x[0] # flat outputs + zflag[1][0] = x[1] + zflag[0][1] = u[0] * np.cos(x[2]) # first derivatives + zflag[1][1] = u[0] * np.sin(x[2]) + thdot = (u[0]/b) * np.tan(u[1]) # dtheta/dt + zflag[0][2] = -u[0] * thdot * np.sin(x[2]) # second derivatives + zflag[1][2] = u[0] * thdot * np.cos(x[2]) + return zflag + +def vehicle_flat_reverse(zflag, params={}): + b = params.get('wheelbase', 3.) # get parameter values + x = np.zeros(3); u = np.zeros(2) # vectors to store x, u + x[0] = zflag[0][0] # x position + x[1] = zflag[1][0] # y position + x[2] = np.arctan2(zflag[1][1], zflag[0][1]) # angle + u[0] = zflag[0][1] * np.cos(x[2]) + zflag[1][1] * np.sin(x[2]) + thdot_v = zflag[1][2] * np.cos(x[2]) - zflag[0][2] * np.sin(x[2]) + u[1] = np.arctan2(thdot_v, u[0]**2 / b) + return x, u + +def vehicle_update(t, x, u, params): + b = params.get('wheelbase', 3.) # get parameter values + dx = np.array([ + np.cos(x[2]) * u[0], + np.sin(x[2]) * u[0], + (u[0]/b) * np.tan(u[1]) + ]) + return dx + +def vehicle_output(t, x, u, params): return x + +# Create differentially flat input/output system +vehicle_flat = fs.FlatSystem( + vehicle_flat_forward, vehicle_flat_reverse, vehicle_update, + vehicle_output, inputs=('v', 'delta'), outputs=('x', 'y', 'theta'), + states=('x', 'y', 'theta')) + +def test_kinematic_car(): + # Define the endpoints of the trajectory + x0 = [0., -2., 0.]; u0 = [10., 0.] + xf = [100., 2., 0.]; uf = [10., 0.] + Tf = 10 + + # Set up a basis vector + bspline = fs.BSplineFamily([0, Tf/2, Tf], 5, 3) + + # Find trajectory between initial and final conditions + traj = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=bspline) + + # Verify that the trajectory computation is correct + x, u = traj.eval([0, Tf]) + np.testing.assert_array_almost_equal(x0, x[:, 0]) + np.testing.assert_array_almost_equal(u0, u[:, 0]) + np.testing.assert_array_almost_equal(xf, x[:, 1]) + np.testing.assert_array_almost_equal(uf, u[:, 1]) + +def test_bspline_errors(): + # Breakpoints must be a 1D array, in increasing order + with pytest.raises(NotImplementedError, match="not yet supported"): + basis = fs.BSplineFamily([[0, 1, 3], [0, 2, 3]], [3, 3]) + + with pytest.raises(ValueError, + match="breakpoints must be convertable to a 1D array"): + basis = fs.BSplineFamily([[[0, 1], [0, 1]], [[0, 1], [0, 1]]], [3, 3]) + + with pytest.raises(ValueError, match="must have at least 2 values"): + basis = fs.BSplineFamily([10], 2) + + with pytest.raises(ValueError, match="must be strictly increasing"): + basis = fs.BSplineFamily([1, 3, 2], 2) + + # Smoothness can't be more than dimension of splines + basis = fs.BSplineFamily([0, 1], 4, 3) # OK + with pytest.raises(ValueError, match="degree must be greater"): + basis = fs.BSplineFamily([0, 1], 4, 4) # not OK diff --git a/control/tests/flatsys_test.py b/control/tests/flatsys_test.py index e3584d459..4eb091b14 100644 --- a/control/tests/flatsys_test.py +++ b/control/tests/flatsys_test.py @@ -150,7 +150,10 @@ def test_flat_default_output(self, vehicle_flat): resp2 = ct.input_output_response(flatsys, T, u1, x0) np.testing.assert_array_almost_equal(resp1.outputs[0:2], resp2.outputs) - def test_flat_cost_constr(self): + @pytest.mark.parametrize("basis", [ + fs.PolyFamily(8), + fs.BSplineFamily([0, 3, 7, 10], 4, 2)]) + def test_flat_cost_constr(self, basis): # Double integrator system sys = ct.ss([[0, 1], [0, 0]], [[0], [1]], [[1, 0]], 0) flat_sys = fs.LinearFlatSystem(sys) @@ -159,11 +162,11 @@ def test_flat_cost_constr(self): x0 = [1, 0]; u0 = [0] xf = [0, 0]; uf = [0] Tf = 10 - T = np.linspace(0, Tf, 500) + T = np.linspace(0, Tf, 100) # Find trajectory between initial and final conditions traj = fs.point_to_point( - flat_sys, Tf, x0, u0, xf, uf, basis=fs.PolyFamily(8)) + flat_sys, Tf, x0, u0, xf, uf, basis=basis) x, u = traj.eval(T) np.testing.assert_array_almost_equal(x0, x[:, 0]) @@ -178,7 +181,7 @@ def test_flat_cost_constr(self): traj_cost = fs.point_to_point( flat_sys, timepts, x0, u0, xf, uf, cost=cost_fcn, - basis=fs.PolyFamily(8), + basis=basis, # initial_guess='lstsq', # minimize_kwargs={'method': 'trust-constr'} ) @@ -204,7 +207,7 @@ def test_flat_cost_constr(self): traj_const = fs.point_to_point( flat_sys, timepts, x0, u0, xf, uf, cost=cost_fcn, - constraints=constraints, basis=fs.PolyFamily(8), + constraints=constraints, basis=basis, ) # Verify that the trajectory computation is correct @@ -224,7 +227,7 @@ def test_flat_cost_constr(self): (sp.optimize.NonlinearConstraint, lambda x, u: x, lb, ub)] traj_nlconst = fs.point_to_point( flat_sys, timepts, x0, u0, xf, uf, cost=cost_fcn, - constraints=nl_constraints, basis=fs.PolyFamily(8), + constraints=nl_constraints, basis=basis, ) x_nlconst, u_nlconst = traj_nlconst.eval(T) np.testing.assert_almost_equal(x_const, x_nlconst) From 0ab0d0703f22281aee4bf8d7db65bda8effeb372 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Mon, 8 Aug 2022 22:14:47 -0700 Subject: [PATCH 2/7] add support for multiple spline variables; update docstrings --- control/flatsys/basis.py | 43 +++++++++++++++---- control/flatsys/bezier.py | 2 +- control/flatsys/bspline.py | 77 +++++++++++++++++------------------ control/flatsys/flatsys.py | 31 +++++++++----- control/flatsys/poly.py | 2 +- control/flatsys/systraj.py | 4 +- control/tests/bspline_test.py | 50 ++++++++++++++++++++--- 7 files changed, 142 insertions(+), 67 deletions(-) diff --git a/control/flatsys/basis.py b/control/flatsys/basis.py index a0986bd61..ad1d8b2a3 100644 --- a/control/flatsys/basis.py +++ b/control/flatsys/basis.py @@ -47,7 +47,11 @@ class BasisFamily: :math:`z_i^{(q)}(t)` = basis.eval_deriv(self, i, j, t) - Parameters + A basis set can either consist of a single variable that is used for + each flat output (nvars = None) or a different variable for different + flat outputs (nvars > 0). + + Attributes ---------- N : int Order of the basis set. @@ -56,15 +60,38 @@ class BasisFamily: def __init__(self, N): """Create a basis family of order N.""" self.N = N # save number of basis functions + self.nvars = None # default number of variables + self.coef_offset = [0] # coefficient offset for each variable + self.coef_length = [N] # coefficient length for each variable - def __call__(self, i, t): + def __call__(self, i, t, var=None): """Evaluate the ith basis function at a point in time""" - return self.eval_deriv(i, 0, t) + return self.eval_deriv(i, 0, t, var=var) + + def var_ncoefs(self, var): + """Get the number of coefficients for a variable""" + return self.N if self.nvars is None else self.coef_length[var] + + def eval(self, coeffs, tlist, var=None): + if self.nvars is None and var != None: + raise SystemError("multi-variable call to a scalar basis") + + elif self.nvars is None: + # Single variable basis + return [ + sum([coeffs[i] * self(i, t) for i in range(self.N)]) + for t in tlist] - def eval(self, coeffs, tlist): - return [ - sum([coeffs[i] * self(i, t) for i in range(self.N)]) - for t in tlist] + else: + # Multi-variable basis + values = np.empty((self.nvars, tlist.size)) + for j in range(self.nvars): + coef_len = self.var_ncoefs(j) + values[j] = np.array([ + sum([coeffs[i] * self(i, t, var=j) + for i in range(coef_len)]) + for t in tlist]) + return values - def eval_deriv(self, i, j, t): + def eval_deriv(self, i, j, t, var=None): raise NotImplementedError("Internal error; improper basis functions") diff --git a/control/flatsys/bezier.py b/control/flatsys/bezier.py index 7e41c546e..d2ab1f275 100644 --- a/control/flatsys/bezier.py +++ b/control/flatsys/bezier.py @@ -59,7 +59,7 @@ def __init__(self, N, T=1): self.T = T # save end of time interval # Compute the kth derivative of the ith basis function at time t - def eval_deriv(self, i, k, t): + def eval_deriv(self, i, k, t, var=None): """Evaluate the kth derivative of the ith basis function at time t.""" if i >= self.N: raise ValueError("Basis function index too high") diff --git a/control/flatsys/bspline.py b/control/flatsys/bspline.py index 5e3b98cb5..a0e5bf3f0 100644 --- a/control/flatsys/bspline.py +++ b/control/flatsys/bspline.py @@ -17,14 +17,14 @@ class BSplineFamily(BasisFamily): across a set of breakpoints with given order and smoothness. """ - def __init__(self, breakpoints, degree, smoothness=None, vars=1): + def __init__(self, breakpoints, degree, smoothness=None, vars=None): """Create a B-spline basis for piecewise smooth polynomials Define B-spline polynomials for a set of one or more variables. - B-splines are characterized by a set of intervals separated by break - points. On each interval we have a polynomial of a certain order - and the spline is continuous up to a given smoothness at interior - break points. + B-splines are used as a basis for a set of piecewise smooth + polynomials joined at breakpoints. On each interval we have a + polynomial of a given order and the spline is continuous up to a + given smoothness at interior breakpoints. Parameters ---------- @@ -41,8 +41,11 @@ def __init__(self, breakpoints, degree, smoothness=None, vars=1): For each spline variable, the smoothness at breakpoints (number of derivatives that should match). - vars : int or list of str, option - The number of spline variables or a list of spline variable names. + vars : None or int, optional + The number of spline variables. If specified as None (default), + then the spline basis describes a single variable, with no + indexing. If the number of spine variables is > 0, then the + spline basis is index using the `var` keyword. """ # Process the breakpoints for the spline */ @@ -58,17 +61,19 @@ def __init__(self, breakpoints, degree, smoothness=None, vars=1): raise ValueError("break points must be strictly increasing values") # Decide on the number of spline variables - if isinstance(vars, list) and all([isinstance(v, str) for v in vars]): - raise NotImplemented("list of variable names not yet supported") + if vars is None: + nvars = 1 + self.nvars = None # track as single variable elif not isinstance(vars, int): - raise TypeError("vars must be an integer or list of strings") + raise TypeError("vars must be an integer") else: nvars = vars + self.nvars = nvars # # Process B-spline parameters (order, smoothness) # - # B-splines are characterized by a set of intervals separated by + # B-splines are defined on a set of intervals separated by # breakpoints. On each interval we have a polynomial of a certain # order and the spline is continuous up to a given smoothness at # breakpoints. The code in this section allows some flexibility in @@ -99,14 +104,15 @@ def process_spline_parameters( elif all([isinstance(v, allowed_types) for v in values]): # List of values => make sure it is the right size if len(values) != length: - raise ValueError(f"length of '{name}' does not match n") + raise ValueError(f"length of '{name}' does not match" + f" number of variables") else: raise ValueError(f"could not parse '{name}' keyword") # Check to make sure the values are OK if values is not None and any([val < minimum for val in values]): raise ValueError( - f"invalid value for {name}; must be at least {minimum}") + f"invalid value for '{name}'; must be at least {minimum}") return values @@ -123,25 +129,23 @@ def process_spline_parameters( if any([degree[i] - smoothness[i] < 1 for i in range(nvars)]): raise ValueError("degree must be greater than smoothness") - # Store the parameters and process them in call_ntg() - self.nvars = nvars + # Store the parameters for the spline (self.nvars already stored) self.breakpoints = breakpoints self.degree = degree self.smoothness = smoothness - self.nintervals = breakpoints.size - 1 # # Compute parameters for a SciPy BSpline object # - # To create a B-spline, we need to compute the knot points, keeping - # track of the use of repeated knot points at the initial knot and + # To create a B-spline, we need to compute the knotpoints, keeping + # track of the use of repeated knotpoints at the initial knot and # final knot as well as repeated knots at intermediate points # depending on the desired smoothness. # # Store the coefficients for each output (useful later) self.coef_offset, self.coef_length, offset = [], [], 0 - for i in range(self.nvars): + for i in range(nvars): # Compute number of coefficients for the piecewise polynomial ncoefs = (self.degree[i] + 1) * (len(self.breakpoints) - 1) - \ (self.smoothness[i] + 1) * (len(self.breakpoints) - 2) @@ -151,48 +155,43 @@ def process_spline_parameters( offset += ncoefs self.N = offset # save the total number of coefficients - # Create knot points for each spline variable + # Create knotpoints for each spline variable # TODO: extend to multi-dimensional breakpoints self.knotpoints = [] - for i in range(self.nvars): + for i in range(nvars): # Allocate space for the knotpoints self.knotpoints.append(np.empty( (self.degree[i] + 1) + (len(self.breakpoints) - 2) * \ (self.degree[i] - self.smoothness[i]) + (self.degree[i] + 1))) - # Initial knot points + # Initial knotpoints (multiplicity = order) self.knotpoints[i][0:self.degree[i] + 1] = self.breakpoints[0] offset = self.degree[i] + 1 - # Interior knot points + # Interior knotpoints (multiplicity = degree - smoothness) nknots = self.degree[i] - self.smoothness[i] assert nknots > 0 # just in case for j in range(1, self.breakpoints.size - 1): self.knotpoints[i][offset:offset+nknots] = self.breakpoints[j] offset += nknots - # Final knot point + # Final knotpoint (multiplicity = order) self.knotpoints[i][offset:offset + self.degree[i] + 1] = \ self.breakpoints[-1] - def eval(self, coefs, tlist): - return np.array([ - BSpline(self.knotpoints[i], - coefs[self.coef_offset[i]: - self.coef_offset[i] + self.coef_length[i]], - self.degree[i])(tlist) - for i in range(self.nvars)]) - # Compute the kth derivative of the ith basis function at time t - def eval_deriv(self, i, k, t, squeeze=True): + def eval_deriv(self, i, k, t, var=None): """Evaluate the kth derivative of the ith basis function at time t.""" - if self.nvars > 1 or not squeeze: - raise NotImplementedError( - "derivatives of multi-variable splines not yet supported") + if self.nvars is None or (self.nvars == 1 and var is None): + # Use same variable for all requests + var = 0 + elif self.nvars > 1 and var is None: + raise SystemError( + "scalar variable call to multi-variable splines") # Create a coefficient vector for this spline - coefs = np.zeros(self.coef_length[0]); coefs[i] = 1 + coefs = np.zeros(self.coef_length[var]); coefs[i] = 1 # Evaluate the derivative of the spline at the desired point in time - return BSpline(self.knotpoints[0], coefs, - self.degree[0]).derivative(k)(t) + return BSpline(self.knotpoints[var], coefs, + self.degree[var]).derivative(k)(t) diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index aa701af4b..96034158e 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -155,6 +155,7 @@ def __init__(self, if reverse is not None: self.reverse = reverse # Save the length of the flat flag + # TODO: missing def __str__(self): return f"{NonlinearIOSystem.__str__(self)}\n\n" \ @@ -233,17 +234,19 @@ def _basis_flag_matrix(sys, basis, flag, t, params={}): column of the matrix corresponds to a basis function and each row is a derivative, with the derivatives (flag) for each output stacked on top of each other. - +l """ flagshape = [len(f) for f in flag] - M = np.zeros((sum(flagshape), basis.N * sys.ninputs)) + M = np.zeros((sum(flagshape), + sum([basis.var_ncoefs(i) for i in range(sys.ninputs)]))) flag_off = 0 - coeff_off = 0 + coef_off = 0 for i, flag_len in enumerate(flagshape): - for j, k in itertools.product(range(basis.N), range(flag_len)): - M[flag_off + k, coeff_off + j] = basis.eval_deriv(j, k, t) + coef_len = basis.var_ncoefs(i) + for j, k in itertools.product(range(coef_len), range(flag_len)): + M[flag_off + k, coef_off + j] = basis.eval_deriv(j, k, t, var=i) flag_off += flag_len - coeff_off += basis.N + coef_off += coef_len return M @@ -362,11 +365,16 @@ def point_to_point( if basis is None: basis = PolyFamily(2 * (sys.nstates + sys.ninputs)) + # If a multivariable basis was given, make sure the size is correct + if basis.nvars is not None and basis.nvars != sys.ninputs: + raise ValueError("size of basis does not match flat system size") + # Make sure we have enough basis functions to solve the problem - if basis.N * sys.ninputs < 2 * (sys.nstates + sys.ninputs): + ncoefs = sum([basis.var_ncoefs(i) for i in range(sys.ninputs)]) + if ncoefs < 2 * (sys.nstates + sys.ninputs): raise ValueError("basis set is too small") elif (cost is not None or trajectory_constraints is not None) and \ - basis.N * sys.ninputs == 2 * (sys.nstates + sys.ninputs): + ncoefs == 2 * (sys.nstates + sys.ninputs): warnings.warn("minimal basis specified; optimization not possible") cost = None trajectory_constraints = None @@ -531,11 +539,12 @@ def traj_const(null_coeffs): # Store the flag lengths and coefficients # TODO: make this more pythonic - coeff_off = 0 + coef_off = 0 for i in range(sys.ninputs): # Grab the coefficients corresponding to this flat output - systraj.coeffs.append(alpha[coeff_off:coeff_off + basis.N]) - coeff_off += basis.N + coef_len = basis.var_ncoefs(i) + systraj.coeffs.append(alpha[coef_off:coef_off + coef_len]) + coef_off += coef_len # Keep track of the length of the flat flag for this output systraj.flaglen.append(len(zflag_T0[i])) diff --git a/control/flatsys/poly.py b/control/flatsys/poly.py index 08dcfb1c9..bb3677c48 100644 --- a/control/flatsys/poly.py +++ b/control/flatsys/poly.py @@ -55,7 +55,7 @@ def __init__(self, N): super(PolyFamily, self).__init__(N) # Compute the kth derivative of the ith basis function at time t - def eval_deriv(self, i, k, t): + def eval_deriv(self, i, k, t, var=None): """Evaluate the kth derivative of the ith basis function at time t.""" if (i < k): return 0; # higher derivative than power return factorial(i)/factorial(i-k) * np.power(t, i-k) diff --git a/control/flatsys/systraj.py b/control/flatsys/systraj.py index 9d425295b..c9bde6d7a 100644 --- a/control/flatsys/systraj.py +++ b/control/flatsys/systraj.py @@ -106,11 +106,11 @@ def eval(self, tlist): for i in range(self.ninputs): flag_len = self.flaglen[i] zflag.append(np.zeros(flag_len)) - for j in range(self.basis.N): + for j in range(self.basis.var_ncoefs(i)): for k in range(flag_len): #! TODO: rewrite eval_deriv to take in time vector zflag[i][k] += self.coeffs[i][j] * \ - self.basis.eval_deriv(j, k, t) + self.basis.eval_deriv(j, k, t, var=i) # Now copy the states and inputs # TODO: revisit order of list arguments diff --git a/control/tests/bspline_test.py b/control/tests/bspline_test.py index b00f856dc..0ac59094d 100644 --- a/control/tests/bspline_test.py +++ b/control/tests/bspline_test.py @@ -59,6 +59,11 @@ def test_bspline_basis(): bspline.eval_deriv(i, j, time)[0:-1], atol=0.01, rtol=0.01) + # Make sure that ndarrays are processed the same as integer lists + degree = np.array(degree) + bspline2 = fs.BSplineFamily([0, Tf/3, Tf/2, Tf], degree, maxderiv) + np.testing.assert_equal(bspline(0, time), bspline2(0, time)) + # Exception check with pytest.raises(IndexError, match="out of bounds"): bspline.eval_deriv(bspline.N, 0, time) @@ -94,11 +99,6 @@ def test_double_integrator(xf, uf, Tf): t, y, x = ct.forced_response(sys, T, ud, x0, return_x=True) np.testing.assert_array_almost_equal(x, xd, decimal=3) - # Multi-dimensional splines not yet implemented - bspline = fs.BSplineFamily([0, Tf/3, Tf/2, Tf], [4, 5], [2, 3], vars=2) - with pytest.raises(NotImplementedError, match="not yet supported"): - fs.point_to_point(flatsys, Tf, x0, u0, xf, uf, basis=bspline) - # Bicycle model def vehicle_flat_forward(x, u, params={}): @@ -160,6 +160,25 @@ def test_kinematic_car(): np.testing.assert_array_almost_equal(xf, x[:, 1]) np.testing.assert_array_almost_equal(uf, u[:, 1]) +def test_kinematic_car_multivar(): + # Define the endpoints of the trajectory + x0 = [0., -2., 0.]; u0 = [10., 0.] + xf = [100., 2., 0.]; uf = [10., 0.] + Tf = 10 + + # Set up a basis vector + bspline = fs.BSplineFamily([0, Tf/2, Tf], [5, 6], [3, 4], vars=2) + + # Find trajectory between initial and final conditions + traj = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=bspline) + + # Verify that the trajectory computation is correct + x, u = traj.eval([0, Tf]) + np.testing.assert_array_almost_equal(x0, x[:, 0]) + np.testing.assert_array_almost_equal(u0, u[:, 0]) + np.testing.assert_array_almost_equal(xf, x[:, 1]) + np.testing.assert_array_almost_equal(uf, u[:, 1]) + def test_bspline_errors(): # Breakpoints must be a 1D array, in increasing order with pytest.raises(NotImplementedError, match="not yet supported"): @@ -179,3 +198,24 @@ def test_bspline_errors(): basis = fs.BSplineFamily([0, 1], 4, 3) # OK with pytest.raises(ValueError, match="degree must be greater"): basis = fs.BSplineFamily([0, 1], 4, 4) # not OK + + # nvars must be an integer + with pytest.raises(TypeError, match="vars must be an integer"): + basis = fs.BSplineFamily([0, 1], 4, 3, vars=['x1', 'x2']) + + # degree, smoothness must match nvars + with pytest.raises(ValueError, match="length of 'degree' does not match"): + basis = fs.BSplineFamily([0, 1], [4, 4, 4], 3, vars=2) + + # degree, smoothness must be list of ints + basis = fs.BSplineFamily([0, 1], [4, 4], 3, vars=2) # OK + with pytest.raises(ValueError, match="could not parse 'degree'"): + basis = fs.BSplineFamily([0, 1], [4, '4'], 3, vars=2) + + # degree must be strictly positive + with pytest.raises(ValueError, match="'degree'; must be at least 1"): + basis = fs.BSplineFamily([0, 1], 0, 1) + + # smoothness must be non-negative + with pytest.raises(ValueError, match="'smoothness'; must be at least 0"): + basis = fs.BSplineFamily([0, 1], 2, -1) From 78f7bbb4361b8c6118e938e6fff7946a23324500 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 12 Aug 2022 20:32:55 -0700 Subject: [PATCH 3/7] initial implementation of solve_flat_ocp --- control/flatsys/__init__.py | 2 +- control/flatsys/flatsys.py | 316 +++++++++++++++++++++++++++++++++- control/tests/flatsys_test.py | 98 +++++++++++ 3 files changed, 411 insertions(+), 5 deletions(-) diff --git a/control/flatsys/__init__.py b/control/flatsys/__init__.py index 6e8eac030..8ead08dce 100644 --- a/control/flatsys/__init__.py +++ b/control/flatsys/__init__.py @@ -62,4 +62,4 @@ from .linflat import LinearFlatSystem # Package functions -from .flatsys import point_to_point +from .flatsys import point_to_point, solve_flat_ocp diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index 96034158e..18b3d7ec6 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -411,7 +411,7 @@ def point_to_point( # Solve for the coefficients of the flat outputs # # At this point, we need to solve the equation M alpha = zflag, where M - # is the matrix constrains for initial and final conditions and zflag = + # is the matrix constraints for initial and final conditions and zflag = # [zflag_T0; zflag_tf]. # # If there are no constraints, then we just need to solve a linear @@ -426,6 +426,11 @@ def point_to_point( if rank < Z.size: warnings.warn("basis too small; solution may not exist") + # Precompute the collocation matrix the defines the flag at timepts + Mt_list = [] + for t in timepts: + Mt_list.append(_basis_flag_matrix(sys, basis, zflag_T0, t)) + if cost is not None or trajectory_constraints is not None: # Search over the null space to minimize cost/satisfy constraints N = sp.linalg.null_space(M) @@ -438,8 +443,8 @@ def traj_cost(null_coeffs): # Evaluate the costs at the listed time points # TODO: store Mt ahead of time, since it doesn't change costval = 0 - for t in timepts: - M_t = _basis_flag_matrix(sys, basis, zflag_T0, t) + for i, t in enumerate(timepts): + M_t = Mt_list[i] # Compute flag at this time point zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) @@ -477,7 +482,7 @@ def traj_const(null_coeffs): values = [] for i, t in enumerate(timepts): # Calculate the states and inputs for the flat output - M_t = _basis_flag_matrix(sys, basis, zflag_T0, t) + M_t = Mt_list[i] # Compute flag at this time point zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) @@ -551,3 +556,306 @@ def traj_const(null_coeffs): # Return a function that computes inputs and states as a function of time return systraj + + +# Solve a point to point trajectory generation problem for a flat system +def solve_flat_ocp( + sys, timepts, x0=0, u0=0, basis=None, trajectory_cost=None, + terminal_cost=None, trajectory_constraints=None, + initial_guess=None, params=None, **kwargs): + """Compute trajectory between an initial and final conditions. + + Compute an optimial trajectory for a differentially flat system starting + from an initial state and input value. + + Parameters + ---------- + flatsys : FlatSystem object + Description of the differentially flat system. This object must + define a function `flatsys.forward()` that takes the system state and + produceds the flag of flat outputs and a system `flatsys.reverse()` + that takes the flag of the flat output and prodes the state and + input. + + timepts : float or 1D array_like + The list of points for evaluating cost and constraints, as well as + the time horizon. If given as a float, indicates the final time for + the trajectory (corresponding to xf) + + x0, u0 : 1D arrays + Define the initial conditions for the system. If either of the + values are given as None, they are replaced by a vector of zeros of + the appropriate dimension. + + basis : :class:`~control.flatsys.BasisFamily` object, optional + The basis functions to use for generating the trajectory. If not + specified, the :class:`~control.flatsys.PolyFamily` basis family + will be used, with the minimal number of elements required to find a + feasible trajectory (twice the number of system states) + + trajectory_cost : callable + Function that returns the integral cost given the current state + and input. Called as `cost(x, u)`. + + terminal_cost : callable + Function that returns the terminal cost given the state and input. + Called as `cost(x, u)`. + + trajectory_constraints : list of tuples, optional + List of constraints that should hold at each point in the time vector. + Each element of the list should consist of a tuple with first element + given by :class:`scipy.optimize.LinearConstraint` or + :class:`scipy.optimize.NonlinearConstraint` and the remaining + elements of the tuple are the arguments that would be passed to those + functions. The following tuples are supported: + + * (LinearConstraint, A, lb, ub): The matrix A is multiplied by stacked + vector of the state and input at each point on the trajectory for + comparison against the upper and lower bounds. + + * (NonlinearConstraint, fun, lb, ub): a user-specific constraint + function `fun(x, u)` is called at each point along the trajectory + and compared against the upper and lower bounds. + + The constraints are applied at each time point along the trajectory. + + minimize_kwargs : str, optional + Pass additional keywords to :func:`scipy.optimize.minimize`. + + Returns + ------- + traj : :class:`~control.flatsys.SystemTrajectory` object + The system trajectory is returned as an object that implements the + `eval()` function, we can be used to compute the value of the state + and input and a given time t. + + Notes + ----- + Additional keyword parameters can be used to fine tune the behavior of + the underlying optimization function. See `minimize_*` keywords in + :func:`OptimalControlProblem` for more information. + + """ + # + # Make sure the problem is one that we can handle + # + x0 = _check_convert_array(x0, [(sys.nstates,), (sys.nstates, 1)], + 'Initial state: ', squeeze=True) + u0 = _check_convert_array(u0, [(sys.ninputs,), (sys.ninputs, 1)], + 'Initial input: ', squeeze=True) + + # Process final time + timepts = np.atleast_1d(timepts) + Tf = timepts[-1] + T0 = timepts[0] if len(timepts) > 1 else T0 + + # Process keyword arguments + if trajectory_constraints is None: + # Backwards compatibility + trajectory_constraints = kwargs.pop('constraints', None) + if trajectory_cost is None: + # Compatibility with point_to_point + trajectory_cost = kwargs.pop('cost', None) + + minimize_kwargs = {} + minimize_kwargs['method'] = kwargs.pop('minimize_method', None) + minimize_kwargs['options'] = kwargs.pop('minimize_options', {}) + minimize_kwargs.update(kwargs.pop('minimize_kwargs', {})) + + if trajectory_cost is None and terminal_cost is None: + raise TypeError("need trajectory and/or terminal cost required") + + if kwargs: + raise TypeError("unrecognized keywords: ", str(kwargs)) + + # + # Determine the basis function set to use and make sure it is big enough + # + + # If no basis set was specified, use a polynomial basis (poor choice...) + if basis is None: + basis = PolyFamily(2 * (sys.nstates + sys.ninputs)) + + # If a multivariable basis was given, make sure the size is correct + if basis.nvars is not None and basis.nvars != sys.ninputs: + raise ValueError("size of basis does not match flat system size") + + # Make sure we have enough basis functions to solve the problem + ncoefs = sum([basis.var_ncoefs(i) for i in range(sys.ninputs)]) + if ncoefs <= sys.nstates + sys.ninputs: + raise ValueError("basis set is too small") + + # Figure out the parameters to use, if any + params = sys.params if params is None else params + + # + # Map the initial and conditions to flat output conditions + # + # We need to compute the output "flag": [z(t), z'(t), z''(t), ...] + # and then evaluate this at the initial and final condition. + # + + zflag_T0 = sys.forward(x0, u0, params) + Z_T0 = np.hstack(zflag_T0) + + # + # Compute the matrix constraints for initial conditions + # + # This computation depends on the basis function we are using. It + # essentially amounts to evaluating the basis functions and their + # derivatives at the initial conditions. + + # Compute the flags for the initial and final states + M_T0 = _basis_flag_matrix(sys, basis, zflag_T0, T0) + + # + # Solve for the coefficients of the flat outputs + # + # At this point, we need to solve the equation M_T0 alpha = zflag_T0. + # + # If there are no additional constraints, then we just need to solve a + # linear system of equations => use least squares. Otherwise, we have a + # nonlinear optimal control problem with equality constraints => use + # scipy.optimize.minimize(). + # + + # Start by solving the least squares problem + alpha, residuals, rank, s = np.linalg.lstsq(M_T0, Z_T0, rcond=None) + if rank < Z_T0.size: + warnings.warn("basis too small; solution may not exist") + + # Precompute the collocation matrix the defines the flag at timepts + # TODO: only compute if we have trajectory cost/constraints + Mt_list = [] + for t in timepts: + Mt_list.append(_basis_flag_matrix(sys, basis, zflag_T0, t)) + + # Search over the null space to minimize cost/satisfy constraints + N = sp.linalg.null_space(M_T0) + + # Define a function to evaluate the cost along a trajectory + def traj_cost(null_coeffs): + # Add this to the existing solution + coeffs = alpha + N @ null_coeffs + costval = 0 + + # Evaluate the trajectory costs at the listed time points + if trajectory_cost is not None: + for i, t in enumerate(timepts[0:-1]): + M_t = Mt_list[i] + + # Compute flag at this time point + zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) + + # Find states and inputs at the time points + x, u = sys.reverse(zflag, params) + + # Evaluate the cost at this time point + # TODO: make use of time interval + costval += trajectory_cost(x, u) + + # Evaluate the terminal_cost + if terminal_cost is not None: + M_t = Mt_list[-1] + zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) + x, u = sys.reverse(zflag, params) + costval += terminal_cost(x, u) + + return costval + + # Process the constraints we were given + traj_constraints = trajectory_constraints + if traj_constraints is None: + traj_constraints = [] + elif isinstance(traj_constraints, tuple): + # TODO: Check to make sure this is really a constraint + traj_constraints = [traj_constraints] + elif not isinstance(traj_constraints, list): + raise TypeError("trajectory constraints must be a list") + + # Process constraints + minimize_constraints = [] + if len(traj_constraints) > 0: + # Set up a nonlinear function to evaluate the constraints + def traj_const(null_coeffs): + # Add this to the existing solution + coeffs = alpha + N @ null_coeffs + + # Evaluate the constraints at the listed time points + values = [] + for i, t in enumerate(timepts): + # Calculate the states and inputs for the flat output + M_t = Mt_list[i] + + # Compute flag at this time point + zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) + + # Find states and inputs at the time points + states, inputs = sys.reverse(zflag, params) + + # Evaluate the constraint function along the trajectory + for type, fun, lb, ub in traj_constraints: + if type == sp.optimize.LinearConstraint: + # `fun` is A matrix associated with polytope... + values.append(fun @ np.hstack([states, inputs])) + elif type == sp.optimize.NonlinearConstraint: + values.append(fun(states, inputs)) + else: + raise TypeError( + "unknown constraint type %s" % type) + return np.array(values).flatten() + + # Store upper and lower bounds + const_lb, const_ub = [], [] + for t in timepts: + for type, fun, lb, ub in traj_constraints: + const_lb.append(lb) + const_ub.append(ub) + const_lb = np.array(const_lb).flatten() + const_ub = np.array(const_ub).flatten() + + # Store the constraint as a nonlinear constraint + minimize_constraints = [sp.optimize.NonlinearConstraint( + traj_const, const_lb, const_ub)] + + # Add initial and terminal constraints + # minimize_constraints += [sp.optimize.LinearConstraint(M, Z, Z)] + + # Process the initial condition + if initial_guess is None: + initial_guess = np.zeros(M_T0.shape[1] - M_T0.shape[0]) + else: + raise NotImplementedError("Initial guess not yet implemented.") + + # Find the optimal solution + res = sp.optimize.minimize( + traj_cost, initial_guess, constraints=minimize_constraints, + **minimize_kwargs) + if res.success: + alpha += N @ res.x + else: + raise RuntimeError( + "Unable to solve optimal control problem\n" + + "scipy.optimize.minimize returned " + res.message) + + # + # Transform the trajectory from flat outputs to states and inputs + # + + # Create a trajectory object to store the result + systraj = SystemTrajectory(sys, basis, params=params) + + # Store the flag lengths and coefficients + # TODO: make this more pythonic + coef_off = 0 + for i in range(sys.ninputs): + # Grab the coefficients corresponding to this flat output + coef_len = basis.var_ncoefs(i) + systraj.coeffs.append(alpha[coef_off:coef_off + coef_len]) + coef_off += coef_len + + # Keep track of the length of the flat flag for this output + systraj.flaglen.append(len(zflag_T0[i])) + + # Return a function that computes inputs and states as a function of time + return systraj diff --git a/control/tests/flatsys_test.py b/control/tests/flatsys_test.py index 4eb091b14..710841d60 100644 --- a/control/tests/flatsys_test.py +++ b/control/tests/flatsys_test.py @@ -233,6 +233,104 @@ def test_flat_cost_constr(self, basis): np.testing.assert_almost_equal(x_const, x_nlconst) np.testing.assert_almost_equal(u_const, u_nlconst) + @pytest.mark.parametrize("basis", [ + # fs.PolyFamily(8), + fs.BSplineFamily([0, 3, 7, 10], 5, 2)]) + def test_flat_solve_ocp(self, basis): + # Double integrator system + sys = ct.ss([[0, 1], [0, 0]], [[0], [1]], [[1, 0]], 0) + flat_sys = fs.LinearFlatSystem(sys) + + # Define the endpoints of the trajectory + x0 = [1, 0]; u0 = [0] + xf = [-1, 0]; uf = [0] + Tf = 10 + T = np.linspace(0, Tf, 100) + + # Find trajectory between initial and final conditions + traj = fs.point_to_point( + flat_sys, Tf, x0, u0, xf, uf, basis=basis) + x, u = traj.eval(T) + + np.testing.assert_array_almost_equal(x0, x[:, 0]) + np.testing.assert_array_almost_equal(u0, u[:, 0]) + np.testing.assert_array_almost_equal(xf, x[:, -1]) + np.testing.assert_array_almost_equal(uf, u[:, -1]) + + # Solve with a terminal cost function + timepts = np.linspace(0, Tf, 10) + terminal_cost = opt.quadratic_cost( + flat_sys, 1e3, 1e3, x0=xf, u0=uf) + + traj_cost = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, + terminal_cost=terminal_cost, basis=basis) + + # Verify that the trajectory computation is correct + x_cost, u_cost = traj_cost.eval(T) + np.testing.assert_array_almost_equal(x0, x_cost[:, 0]) + np.testing.assert_array_almost_equal(u0, u_cost[:, 0]) + np.testing.assert_array_almost_equal(xf, x_cost[:, -1]) + np.testing.assert_array_almost_equal(uf, u_cost[:, -1]) + + # Solve with trajectory and terminal cost functions + trajectory_cost = opt.quadratic_cost(flat_sys, 0, 1, x0=xf, u0=uf) + + traj_cost = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, terminal_cost=terminal_cost, + trajectory_cost=trajectory_cost, basis=basis) + + # Verify that the trajectory computation is correct + x_cost, u_cost = traj_cost.eval(T) + np.testing.assert_array_almost_equal(x0, x_cost[:, 0]) + np.testing.assert_array_almost_equal(u0, u_cost[:, 0]) + + # Make sure we got close on the terminal condition + assert all(np.abs(x_cost[:, -1] - xf) < 0.1) + + # Make sure that we got a different answer than before + assert np.any(np.abs(x - x_cost) > 0.1) + + # Re-solve with constraint on the y deviation + lb, ub = [-2, np.min(x_cost[1])*0.95], [2, 1] + constraints = [opt.state_range_constraint(flat_sys, lb, ub)] + + # Make sure that the previous solution violated at least one constraint + assert np.any(x_cost[0, :] < lb[0]) or np.any(x_cost[0, :] > ub[0]) \ + or np.any(x_cost[1, :] < lb[1]) or np.any(x_cost[1, :] > ub[1]) + + traj_const = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, + terminal_cost=terminal_cost, trajectory_cost=trajectory_cost, + trajectory_constraints=constraints, basis=basis, + ) + + # Verify that the trajectory computation is correct + x_const, u_const = traj_const.eval(timepts) + np.testing.assert_array_almost_equal(x0, x_const[:, 0]) + np.testing.assert_array_almost_equal(u0, u_const[:, 0]) + + # Make sure we got close on the terminal condition + assert all(np.abs(x_cost[:, -1] - xf) < 0.1) + + # Make sure that the solution respects the bounds (with some slop) + for i in range(x_const.shape[0]): + assert np.all(x_const[i] >= lb[i] * 1.02) + assert np.all(x_const[i] <= ub[i] * 1.02) + + # Solve the same problem with a nonlinear constraint type + # Use alternative keywords as well + nl_constraints = [ + (sp.optimize.NonlinearConstraint, lambda x, u: x, lb, ub)] + traj_nlconst = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, + cost=trajectory_cost, terminal_cost=terminal_cost, + constraints=nl_constraints, basis=basis, + ) + x_nlconst, u_nlconst = traj_nlconst.eval(timepts) + np.testing.assert_almost_equal(x_const, x_nlconst) + np.testing.assert_almost_equal(u_const, u_nlconst) + def test_bezier_basis(self): bezier = fs.BezierFamily(4) time = np.linspace(0, 1, 100) From 05084f35c933f4a02112830271b23dd2d3fee198 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Thu, 18 Aug 2022 22:48:54 -0700 Subject: [PATCH 4/7] unit tests, bug fixes, algorithm improvements * add initial_guess functionality to solve_flat_ocp * pre-compute collocation matrices in point_to_point, solve_flat_ocp * updated return values for solve_flat_ocp * add __repr__ for flat basis functions * docstring improvements * additional unit tests + examples --- control/flatsys/basis.py | 4 + control/flatsys/bezier.py | 3 +- control/flatsys/bspline.py | 4 + control/flatsys/flatsys.py | 98 +++++++++------ control/flatsys/poly.py | 6 +- control/tests/flatsys_test.py | 231 +++++++++++++++++++++++++++++----- control/tests/kwargs_test.py | 4 +- examples/kincar-flatsys.py | 63 ++++++++-- 8 files changed, 335 insertions(+), 78 deletions(-) diff --git a/control/flatsys/basis.py b/control/flatsys/basis.py index ad1d8b2a3..baf2b926b 100644 --- a/control/flatsys/basis.py +++ b/control/flatsys/basis.py @@ -64,6 +64,10 @@ def __init__(self, N): self.coef_offset = [0] # coefficient offset for each variable self.coef_length = [N] # coefficient length for each variable + def __repr__(self): + return f'<{self.__class__.__name__}: nvars={self.nvars}, ' + \ + f'N={self.N}>' + def __call__(self, i, t, var=None): """Evaluate the ith basis function at a point in time""" return self.eval_deriv(i, 0, t, var=var) diff --git a/control/flatsys/bezier.py b/control/flatsys/bezier.py index d2ab1f275..241274268 100644 --- a/control/flatsys/bezier.py +++ b/control/flatsys/bezier.py @@ -78,6 +78,7 @@ def eval_deriv(self, i, k, t, var=None): # Return the kth derivative of the ith Bezier basis function return binom(n, i) * sum([ (-1)**(j-i) * - binom(n-i, j-i) * factorial(j)/factorial(j-k) * np.power(u, j-k) + binom(n-i, j-i) * factorial(j)/factorial(j-k) * \ + np.power(u, j-k) / np.power(self.T, k) for j in range(max(i, k), n+1) ]) diff --git a/control/flatsys/bspline.py b/control/flatsys/bspline.py index a0e5bf3f0..12cf49431 100644 --- a/control/flatsys/bspline.py +++ b/control/flatsys/bspline.py @@ -179,6 +179,10 @@ def process_spline_parameters( self.knotpoints[i][offset:offset + self.degree[i] + 1] = \ self.breakpoints[-1] + def __repr__(self): + return f'<{self.__class__.__name__}: nvars={self.nvars}, ' + \ + f'degree={self.degree}, smoothness={self.smoothness}>' + # Compute the kth derivative of the ith basis function at time t def eval_deriv(self, i, k, t, var=None): """Evaluate the kth derivative of the ith basis function at time t.""" diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index 18b3d7ec6..78d7696f9 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -426,24 +426,23 @@ def point_to_point( if rank < Z.size: warnings.warn("basis too small; solution may not exist") - # Precompute the collocation matrix the defines the flag at timepts - Mt_list = [] - for t in timepts: - Mt_list.append(_basis_flag_matrix(sys, basis, zflag_T0, t)) - if cost is not None or trajectory_constraints is not None: # Search over the null space to minimize cost/satisfy constraints N = sp.linalg.null_space(M) + # Precompute the collocation matrix the defines the flag at timepts + Mt_list = [] + for t in timepts[1:-1]: + Mt_list.append(_basis_flag_matrix(sys, basis, zflag_T0, t)) + # Define a function to evaluate the cost along a trajectory def traj_cost(null_coeffs): # Add this to the existing solution coeffs = alpha + N @ null_coeffs # Evaluate the costs at the listed time points - # TODO: store Mt ahead of time, since it doesn't change costval = 0 - for i, t in enumerate(timepts): + for i, t in enumerate(timepts[1:-1]): M_t = Mt_list[i] # Compute flag at this time point @@ -453,7 +452,7 @@ def traj_cost(null_coeffs): x, u = sys.reverse(zflag, params) # Evaluate the cost at this time point - costval += cost(x, u) + costval += cost(x, u) * (timepts[i+1] - timepts[i]) return costval # If no cost given, override with magnitude of the coefficients @@ -480,7 +479,7 @@ def traj_const(null_coeffs): # Evaluate the constraints at the listed time points values = [] - for i, t in enumerate(timepts): + for i, t in enumerate(timepts[1:-1]): # Calculate the states and inputs for the flat output M_t = Mt_list[i] @@ -504,7 +503,7 @@ def traj_const(null_coeffs): # Store upper and lower bounds const_lb, const_ub = [], [] - for t in timepts: + for t in timepts[1:-1]: for type, fun, lb, ub in traj_constraints: const_lb.append(lb) const_ub.append(ub) @@ -515,9 +514,6 @@ def traj_const(null_coeffs): minimize_constraints = [sp.optimize.NonlinearConstraint( traj_const, const_lb, const_ub)] - # Add initial and terminal constraints - # minimize_constraints += [sp.optimize.LinearConstraint(M, Z, Z)] - # Process the initial condition if initial_guess is None: initial_guess = np.zeros(M.shape[1] - M.shape[0]) @@ -528,12 +524,13 @@ def traj_const(null_coeffs): res = sp.optimize.minimize( traj_cost, initial_guess, constraints=minimize_constraints, **minimize_kwargs) - if res.success: - alpha += N @ res.x - else: - raise RuntimeError( - "Unable to solve optimal control problem\n" + - "scipy.optimize.minimize returned " + res.message) + alpha += N @ res.x + + # See if we got an answer + if not res.success: + warnings.warn( + "unable to solve optimal control problem\n" + f"scipy.optimize.minimize: '{res.message}'", UserWarning) # # Transform the trajectory from flat outputs to states and inputs @@ -541,6 +538,11 @@ def traj_const(null_coeffs): # Create a trajectory object to store the result systraj = SystemTrajectory(sys, basis, params=params) + if cost is not None or trajectory_constraints is not None: + # Store the result of the optimization + systraj.cost = res.fun + systraj.success = res.success + systraj.message = res.message # Store the flag lengths and coefficients # TODO: make this more pythonic @@ -560,7 +562,7 @@ def traj_const(null_coeffs): # Solve a point to point trajectory generation problem for a flat system def solve_flat_ocp( - sys, timepts, x0=0, u0=0, basis=None, trajectory_cost=None, + sys, timepts, x0=0, u0=0, trajectory_cost=None, basis=None, terminal_cost=None, trajectory_constraints=None, initial_guess=None, params=None, **kwargs): """Compute trajectory between an initial and final conditions. @@ -619,6 +621,9 @@ def solve_flat_ocp( The constraints are applied at each time point along the trajectory. + initial_guess : 2D array_like, optional + Initial guess for the optimal trajectory of the flat outputs. + minimize_kwargs : str, optional Pass additional keywords to :func:`scipy.optimize.minimize`. @@ -631,9 +636,14 @@ def solve_flat_ocp( Notes ----- - Additional keyword parameters can be used to fine tune the behavior of - the underlying optimization function. See `minimize_*` keywords in - :func:`OptimalControlProblem` for more information. + 1. Additional keyword parameters can be used to fine tune the behavior + of the underlying optimization function. See `minimize_*` keywords + in :func:`OptimalControlProblem` for more information. + + 2. The return data structure includes the following additional attributes: + * success : bool indicating whether the optimization succeeded + * cost : computed cost of the returned trajectory + * message : message returned by optimization if success if False """ # @@ -705,7 +715,7 @@ def solve_flat_ocp( # essentially amounts to evaluating the basis functions and their # derivatives at the initial conditions. - # Compute the flags for the initial and final states + # Compute the flag for the initial state M_T0 = _basis_flag_matrix(sys, basis, zflag_T0, T0) # @@ -752,7 +762,7 @@ def traj_cost(null_coeffs): # Evaluate the cost at this time point # TODO: make use of time interval - costval += trajectory_cost(x, u) + costval += trajectory_cost(x, u) * (timepts[i+1] - timepts[i]) # Evaluate the terminal_cost if terminal_cost is not None: @@ -821,22 +831,37 @@ def traj_const(null_coeffs): # Add initial and terminal constraints # minimize_constraints += [sp.optimize.LinearConstraint(M, Z, Z)] - # Process the initial condition + # Process the initial guess if initial_guess is None: - initial_guess = np.zeros(M_T0.shape[1] - M_T0.shape[0]) + initial_coefs = np.ones(M_T0.shape[1] - M_T0.shape[0]) else: - raise NotImplementedError("Initial guess not yet implemented.") + # Compute the map from coefficients to flat outputs + initial_coefs = [] + for i in range(sys.ninputs): + M_z = np.array([ + basis.eval_deriv(j, 0, timepts, var=i) + for j in range(basis.var_ncoefs(i))]).transpose() + + # Compute the parameters that give the best least squares fit + coefs, _, _, _ = np.linalg.lstsq( + M_z, initial_guess[i], rcond=None) + initial_coefs.append(coefs) + initial_coefs = np.hstack(initial_coefs) + + # Project the parameters onto the independent variables + initial_coefs, _, _, _ = np.linalg.lstsq(N, initial_coefs, rcond=None) # Find the optimal solution res = sp.optimize.minimize( - traj_cost, initial_guess, constraints=minimize_constraints, + traj_cost, initial_coefs, constraints=minimize_constraints, **minimize_kwargs) - if res.success: - alpha += N @ res.x - else: - raise RuntimeError( - "Unable to solve optimal control problem\n" + - "scipy.optimize.minimize returned " + res.message) + alpha += N @ res.x + + # See if we got an answer + if not res.success: + warnings.warn( + "unable to solve optimal control problem\n" + f"scipy.optimize.minimize: '{res.message}'", UserWarning) # # Transform the trajectory from flat outputs to states and inputs @@ -844,6 +869,9 @@ def traj_const(null_coeffs): # Create a trajectory object to store the result systraj = SystemTrajectory(sys, basis, params=params) + systraj.cost = res.fun + systraj.success = res.success + systraj.message = res.message # Store the flag lengths and coefficients # TODO: make this more pythonic diff --git a/control/flatsys/poly.py b/control/flatsys/poly.py index bb3677c48..65d1b500d 100644 --- a/control/flatsys/poly.py +++ b/control/flatsys/poly.py @@ -50,12 +50,14 @@ class PolyFamily(BasisFamily): \phi_i(t) = t^i """ - def __init__(self, N): + def __init__(self, N, T=1.): """Create a polynomial basis of order N.""" super(PolyFamily, self).__init__(N) + self.T = T # Compute the kth derivative of the ith basis function at time t def eval_deriv(self, i, k, t, var=None): """Evaluate the kth derivative of the ith basis function at time t.""" if (i < k): return 0; # higher derivative than power - return factorial(i)/factorial(i-k) * np.power(t, i-k) + return factorial(i)/factorial(i-k) * \ + np.power(t/self.T, i-k) / np.power(self.T, k) diff --git a/control/tests/flatsys_test.py b/control/tests/flatsys_test.py index 710841d60..00f5827d3 100644 --- a/control/tests/flatsys_test.py +++ b/control/tests/flatsys_test.py @@ -11,29 +11,35 @@ import numpy as np import pytest import scipy as sp +import re import control as ct import control.flatsys as fs import control.optimal as opt +# Set tolerances for lower/upper bound tests +atol = 1e-4 +rtol = 1e-4 + class TestFlatSys: """Test differential flat systems""" @pytest.mark.parametrize( - "xf, uf, Tf", - [([1, 0], [0], 2), - ([0, 1], [0], 3), - ([1, 1], [1], 4)]) - def test_double_integrator(self, xf, uf, Tf): + " xf, uf, Tf, basis", + [([1, 0], [0], 2, fs.PolyFamily(6)), + ([0, 1], [0], 3, fs.PolyFamily(6)), + ([0, 1], [0], 3, fs.BezierFamily(6)), + ([0, 1], [0], 3, fs.BSplineFamily([0, 1.5, 3], 4)), + ([1, 1], [1], 4, fs.PolyFamily(6)), + ([1, 1], [1], 4, fs.BezierFamily(6)), + ([1, 1], [1], 4, fs.BSplineFamily([0, 1.5, 3], 4))]) + def test_double_integrator(self, xf, uf, Tf, basis): # Define a second order integrator sys = ct.StateSpace([[-1, 1], [0, -2]], [[0], [1]], [[1, 0]], 0) flatsys = fs.LinearFlatSystem(sys) - # Define the basis set - poly = fs.PolyFamily(6) - x1, u1, = [0, 0], [0] - traj = fs.point_to_point(flatsys, Tf, x1, u1, xf, uf, basis=poly) + traj = fs.point_to_point(flatsys, Tf, x1, u1, xf, uf, basis=basis) # Verify that the trajectory computation is correct x, u = traj.eval([0, Tf]) @@ -92,16 +98,19 @@ def vehicle_output(t, x, u, params): return x vehicle_output, inputs=('v', 'delta'), outputs=('x', 'y', 'theta'), states=('x', 'y', 'theta')) - @pytest.mark.parametrize("poly", [ - fs.PolyFamily(6), fs.PolyFamily(8), fs.BezierFamily(6)]) - def test_kinematic_car(self, vehicle_flat, poly): + @pytest.mark.parametrize("basis", [ + fs.PolyFamily(6), fs.PolyFamily(8), fs.BezierFamily(6), + fs.BSplineFamily([0, 10], 8), + fs.BSplineFamily([0, 5, 10], 4) + ]) + def test_kinematic_car(self, vehicle_flat, basis): # Define the endpoints of the trajectory x0 = [0., -2., 0.]; u0 = [10., 0.] xf = [100., 2., 0.]; uf = [10., 0.] Tf = 10 # Find trajectory between initial and final conditions - traj = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=poly) + traj = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=basis) # Verify that the trajectory computation is correct x, u = traj.eval([0, Tf]) @@ -111,16 +120,97 @@ def test_kinematic_car(self, vehicle_flat, poly): np.testing.assert_array_almost_equal(uf, u[:, 1]) # Simulate the system and make sure we stay close to desired traj + # Note: this can sometimes fail since system is open loop unstable T = np.linspace(0, Tf, 100) xd, ud = traj.eval(T) resp = ct.input_output_response(vehicle_flat, T, ud, x0) - np.testing.assert_array_almost_equal(resp.states, xd, decimal=2) + if not np.allclose(resp.states, xd, atol=1e-2, rtol=1e-2): + pytest.xfail("system is open loop unstable => errors can build") # integrate equations and compare to desired t, y, x = ct.input_output_response( vehicle_flat, T, ud, x0, return_x=True) np.testing.assert_allclose(x, xd, atol=0.01, rtol=0.01) + @pytest.mark.parametrize( + "basis, guess, constraints, method", [ + (fs.PolyFamily(8, T=10), 'prev', None, None), + (fs.BezierFamily(8, T=10), 'linear', None, None), + (fs.BSplineFamily([0, 10], 8), None, None, None), + (fs.BSplineFamily([0, 10], 8), 'prev', None, 'trust-constr'), + (fs.BSplineFamily([0, 10], [6, 8], vars=2), 'prev', None, None), + (fs.BSplineFamily([0, 5, 10], 5), 'linear', None, 'slsqp'), + (fs.BSplineFamily([0, 10], 8), None, ([8, -0.1], [12, 0.1]), None), + (fs.BSplineFamily([0, 5, 10], 5, 3), None, None, None), + ]) + def test_kinematic_car_ocp( + self, vehicle_flat, basis, guess, constraints, method): + + # Define the endpoints of the trajectory + x0 = [0., -2., 0.]; u0 = [10., 0.] + xf = [40., 2., 0.]; uf = [10., 0.] + Tf = 4 + timepts = np.linspace(0, Tf, 10) + + # Find trajectory between initial and final conditions + traj_p2p = fs.point_to_point( + vehicle_flat, Tf, x0, u0, xf, uf, basis=basis) + + # Verify that the trajectory computation is correct + x, u = traj_p2p.eval(timepts) + np.testing.assert_array_almost_equal(x0, x[:, 0]) + np.testing.assert_array_almost_equal(u0, u[:, 0]) + np.testing.assert_array_almost_equal(xf, x[:, -1]) + np.testing.assert_array_almost_equal(uf, u[:, -1]) + + # + # Re-solve as optimal control problem + # + + # Define the cost function (mainly penalize steering angle) + traj_cost = opt.quadratic_cost( + vehicle_flat, None, np.diag([0.1, 10]), x0=xf, u0=uf) + + # Set terminal cost to bring us close to xf + terminal_cost = opt.quadratic_cost( + vehicle_flat, 1e3 * np.eye(3), None, x0=xf) + + # Implement terminal constraints if specified + if constraints: + input_constraints = opt.input_range_constraint( + vehicle_flat, *constraints) + else: + input_constraints = None + + # Use a straight line as an initial guess for the trajectory + if guess == 'prev': + initial_guess = traj_p2p.eval(timepts)[0][0:2] + elif guess == 'linear': + initial_guess = np.array( + [x0[i] + (xf[i] - x0[i]) * timepts/Tf for i in (0, 1)]) + else: + initial_guess = None + + # Solve the optimal trajectory + traj_ocp = fs.solve_flat_ocp( + vehicle_flat, timepts, x0, u0, + cost=traj_cost, constraints=input_constraints, + terminal_cost=terminal_cost, basis=basis, + initial_guess=initial_guess, + minimize_kwargs={'method': method}, + ) + xd, ud = traj_ocp.eval(timepts) + if not traj_ocp.success: + # If unsuccessful, make sure the error is just about precision + assert re.match(".*precision loss.*", traj_ocp.message) is not None + + # Make sure the constraints are satisfied + if input_constraints: + _, _, lb, ub = input_constraints + for i in range(ud.shape[0]): + assert all(lb[i] - ud[i] < rtol * abs(lb[i]) + atol) + assert all(ud[i] - ub[i] < rtol * abs(ub[i]) + atol) + def test_flat_default_output(self, vehicle_flat): # Construct a flat system with the default outputs flatsys = fs.FlatSystem( @@ -134,9 +224,9 @@ def test_flat_default_output(self, vehicle_flat): Tf = 10 # Find trajectory between initial and final conditions - poly = fs.PolyFamily(6) - traj1 = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=poly) - traj2 = fs.point_to_point(flatsys, Tf, x0, u0, xf, uf, basis=poly) + basis = fs.PolyFamily(6) + traj1 = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=basis) + traj2 = fs.point_to_point(flatsys, Tf, x0, u0, xf, uf, basis=basis) # Verify that the trajectory computation is correct T = np.linspace(0, Tf, 10) @@ -152,7 +242,9 @@ def test_flat_default_output(self, vehicle_flat): @pytest.mark.parametrize("basis", [ fs.PolyFamily(8), - fs.BSplineFamily([0, 3, 7, 10], 4, 2)]) + fs.BSplineFamily([0, 5, 10], 6), + fs.BSplineFamily([0, 3, 7, 10], 4, 2) + ]) def test_flat_cost_constr(self, basis): # Double integrator system sys = ct.ss([[0, 1], [0, 0]], [[0], [1]], [[1, 0]], 0) @@ -208,19 +300,22 @@ def test_flat_cost_constr(self, basis): traj_const = fs.point_to_point( flat_sys, timepts, x0, u0, xf, uf, cost=cost_fcn, constraints=constraints, basis=basis, + # minimize_kwargs={'method': 'trust-constr'} ) + assert traj_const.success # Verify that the trajectory computation is correct - x_const, u_const = traj_const.eval(T) + x_cost, u_cost = traj_cost.eval(timepts) # re-eval on timepts + x_const, u_const = traj_const.eval(timepts) np.testing.assert_array_almost_equal(x0, x_const[:, 0]) np.testing.assert_array_almost_equal(u0, u_const[:, 0]) np.testing.assert_array_almost_equal(xf, x_const[:, -1]) np.testing.assert_array_almost_equal(uf, u_const[:, -1]) # Make sure that the solution respects the bounds (with some slop) - for i in range(x_const.shape[0]): - assert np.all(x_const[i] >= lb[i] * 1.02) - assert np.all(x_const[i] <= ub[i] * 1.02) + for i in range(x_const.shape[0]): + assert all(lb[i] - x_const[i] < rtol * abs(lb[i]) + atol) + assert all(x_const[i] - ub[i] < rtol * abs(ub[i]) + atol) # Solve the same problem with a nonlinear constraint type nl_constraints = [ @@ -229,9 +324,9 @@ def test_flat_cost_constr(self, basis): flat_sys, timepts, x0, u0, xf, uf, cost=cost_fcn, constraints=nl_constraints, basis=basis, ) - x_nlconst, u_nlconst = traj_nlconst.eval(T) - np.testing.assert_almost_equal(x_const, x_nlconst) - np.testing.assert_almost_equal(u_const, u_nlconst) + x_nlconst, u_nlconst = traj_nlconst.eval(timepts) + np.testing.assert_almost_equal(x_const, x_nlconst, decimal=2) + np.testing.assert_almost_equal(u_const, u_nlconst, decimal=2) @pytest.mark.parametrize("basis", [ # fs.PolyFamily(8), @@ -315,8 +410,8 @@ def test_flat_solve_ocp(self, basis): # Make sure that the solution respects the bounds (with some slop) for i in range(x_const.shape[0]): - assert np.all(x_const[i] >= lb[i] * 1.02) - assert np.all(x_const[i] <= ub[i] * 1.02) + assert all(lb[i] - x_const[i] < rtol * abs(lb[i]) + atol) + assert all(x_const[i] - ub[i] < rtol * abs(ub[i]) + atol) # Solve the same problem with a nonlinear constraint type # Use alternative keywords as well @@ -456,10 +551,11 @@ def test_point_to_point_errors(self): # Unsolvable optimization constraint = [opt.input_range_constraint(flat_sys, -0.01, 0.01)] - with pytest.raises(RuntimeError, match="Unable to solve optimal"): + with pytest.warns(UserWarning, match="unable to solve"): traj = fs.point_to_point( flat_sys, timepts, x0, u0, xf, uf, constraints=constraint, basis=fs.PolyFamily(8)) + assert not traj.success # Method arguments, parameters traj_method = fs.point_to_point( @@ -477,6 +573,81 @@ def test_point_to_point_errors(self): traj_method = fs.point_to_point( flat_sys, timepts, x0, u0, xf, uf, solve_ivp_method=None) + def test_solve_flat_ocp_errors(self): + """Test error and warning conditions in point_to_point()""" + # Double integrator system + sys = ct.ss([[0, 1], [0, 0]], [[0], [1]], [[1, 0]], 0) + flat_sys = fs.LinearFlatSystem(sys) + + # Define the endpoints of the trajectory + x0 = [1, 0]; u0 = [0] + xf = [0, 0]; uf = [0] + Tf = 10 + T = np.linspace(0, Tf, 500) + + # Cost function + timepts = np.linspace(0, Tf, 10) + cost_fcn = opt.quadratic_cost( + flat_sys, np.diag([1, 1]), 1, x0=xf, u0=uf) + + # Solving without basis specified should be OK + traj = fs.solve_flat_ocp(flat_sys, timepts, x0, u0, cost_fcn) + x, u = traj.eval(timepts) + np.testing.assert_array_almost_equal(x0, x[:, 0]) + if not traj.success: + # If unsuccessful, make sure the error is just about precision + assert re.match(".* precision loss.*", traj.message) is not None + + x, u = traj.eval(timepts) + np.testing.assert_array_almost_equal(x0, x[:, 0]) + np.testing.assert_array_almost_equal(u0, u[:, 0]) + + # Solving without a cost function generates an error + with pytest.raises(TypeError, match="cost required"): + traj = fs.solve_flat_ocp(flat_sys, timepts, x0, u0) + + # Try to optimize with insufficient degrees of freedom + with pytest.raises(ValueError, match="basis set is too small"): + traj = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, cost=cost_fcn, + basis=fs.PolyFamily(2)) + + # Solve with the errors in the various input arguments + with pytest.raises(ValueError, match="Initial state: Wrong shape"): + traj = fs.solve_flat_ocp( + flat_sys, timepts, np.zeros(3), u0, cost_fcn) + with pytest.raises(ValueError, match="Initial input: Wrong shape"): + traj = fs.solve_flat_ocp( + flat_sys, timepts, x0, np.zeros(3), cost_fcn) + + # Constraint that isn't a constraint + with pytest.raises(TypeError, match="must be a list"): + traj = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, cost_fcn, + constraints=np.eye(2), basis=fs.PolyFamily(8)) + + # Unknown constraint type + with pytest.raises(TypeError, match="unknown constraint type"): + traj = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, cost_fcn, + constraints=[(None, 0, 0, 0)], basis=fs.PolyFamily(8)) + + # Method arguments, parameters + traj_method = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, cost=cost_fcn, + basis=fs.PolyFamily(6), minimize_method='slsqp') + traj_kwarg = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, cost=cost_fcn, + basis=fs.PolyFamily(6), minimize_kwargs={'method': 'slsqp'}) + np.testing.assert_allclose( + traj_method.eval(timepts)[0], traj_kwarg.eval(timepts)[0], + atol=1e-5) + + # Unrecognized keywords + with pytest.raises(TypeError, match="unrecognized keyword"): + traj_method = fs.solve_flat_ocp( + flat_sys, timepts, x0, u0, cost_fcn, solve_ivp_method=None) + @pytest.mark.parametrize( "xf, uf, Tf", [([1, 0], [0], 2), @@ -488,10 +659,10 @@ def test_response(self, xf, uf, Tf): flatsys = fs.LinearFlatSystem(sys) # Define the basis set - poly = fs.PolyFamily(6) + basis = fs.PolyFamily(6) x1, u1, = [0, 0], [0] - traj = fs.point_to_point(flatsys, Tf, x1, u1, xf, uf, basis=poly) + traj = fs.point_to_point(flatsys, Tf, x1, u1, xf, uf, basis=basis) # Compute the response the regular way T = np.linspace(0, Tf, 10) diff --git a/control/tests/kwargs_test.py b/control/tests/kwargs_test.py index ada16a46a..598a8ccca 100644 --- a/control/tests/kwargs_test.py +++ b/control/tests/kwargs_test.py @@ -2,7 +2,7 @@ # RMM, 20 Mar 2022 # # Allowing unrecognized keywords to be passed to a function without -# generating and error message can generate annoying bugs, since you +# generating an error message can generate annoying bugs, since you # sometimes think you are telling the function to do something and actually # you have a misspelling or other error and your input is being ignored. # @@ -190,6 +190,8 @@ def test_matplotlib_kwargs(): 'tf2ss' : test_unrecognized_kwargs, 'flatsys.point_to_point': flatsys_test.TestFlatSys.test_point_to_point_errors, + 'flatsys.solve_flat_ocp': + flatsys_test.TestFlatSys.test_solve_flat_ocp_errors, 'FrequencyResponseData.__init__': frd_test.TestFRD.test_unrecognized_keyword, 'InputOutputSystem.__init__': test_unrecognized_kwargs, diff --git a/examples/kincar-flatsys.py b/examples/kincar-flatsys.py index 967bdb310..2ebee3133 100644 --- a/examples/kincar-flatsys.py +++ b/examples/kincar-flatsys.py @@ -74,12 +74,13 @@ def vehicle_update(t, x, u, params): return dx # Plot the trajectory in xy coordinates -def plot_results(t, x, ud): +def plot_results(t, x, ud, rescale=True): plt.subplot(4, 1, 2) plt.plot(x[0], x[1]) plt.xlabel('x [m]') plt.ylabel('y [m]') - plt.axis([x0[0], xf[0], x0[1]-1, xf[1]+1]) + if rescale: + plt.axis([x0[0], xf[0], x0[1]-1, xf[1]+1]) # Time traces of the state and input plt.subplot(2, 4, 5) @@ -94,7 +95,8 @@ def plot_results(t, x, ud): plt.plot(t, ud[0]) plt.xlabel('Time t [sec]') plt.ylabel('v [m/s]') - plt.axis([0, Tf, u0[0] - 1, uf[0] + 1]) + if rescale: + plt.axis([0, Tf, u0[0] - 1, uf[0] + 1]) plt.subplot(2, 4, 8) plt.plot(t, ud[1]) @@ -121,11 +123,11 @@ def plot_results(t, x, ud): poly = fs.PolyFamily(6) # Find a trajectory between the initial condition and the final condition -traj = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=poly) +traj1 = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=poly) # Create the desired trajectory between the initial and final condition T = np.linspace(0, Tf, 500) -xd, ud = traj.eval(T) +xd, ud = traj1.eval(T) # Simulation the open system dynamics with the full input t, y, x = ct.input_output_response( @@ -149,10 +151,10 @@ def plot_results(t, x, ud): vehicle_flat, np.diag([0, 0.1, 0]), np.diag([0.1, 1]), x0=xf, u0=uf) # Solve for an optimal solution -traj = fs.point_to_point( +traj2 = fs.point_to_point( vehicle_flat, timepts, x0, u0, xf, uf, cost=traj_cost, basis=basis, ) -xd, ud = traj.eval(T) +xd, ud = traj2.eval(T) plt.figure(2) plt.suptitle("Lane change with lateral error + steering penalties") @@ -164,15 +166,19 @@ def plot_results(t, x, ud): # Resolve the problem with constraints on the inputs # +# Constraint the input values constraints = [ opt.input_range_constraint(vehicle_flat, [8, -0.1], [12, 0.1]) ] +# TEST: Change the basis to use B-splines +basis = fs.BSplineFamily([0, Tf/2, Tf], 6) + # Solve for an optimal solution -traj = fs.point_to_point( +traj3 = fs.point_to_point( vehicle_flat, timepts, x0, u0, xf, uf, cost=traj_cost, constraints=constraints, basis=basis, ) -xd, ud = traj.eval(T) +xd, ud = traj3.eval(T) plt.figure(3) plt.suptitle("Lane change with penalty + steering constraints") @@ -181,3 +187,42 @@ def plot_results(t, x, ud): # Show the results unless we are running in batch mode if 'PYCONTROL_TEST_EXAMPLES' not in os.environ: plt.show() + + +# +# Approach #4: optimal trajectory, final cost with trajectory constraints +# +# Resolve the problem with constraints on the inputs and also replacing the +# point to point problem with one using a terminal cost to set the final +# state. +# + +# Define the cost function (mainly penalize steering angle) +traj_cost = opt.quadratic_cost( + vehicle_flat, None, np.diag([0.1, 10]), x0=xf, u0=uf) + +# Set terminal cost to bring us close to xf +terminal_cost = opt.quadratic_cost(vehicle_flat, 1e3 * np.eye(3), None, x0=xf) + +# Change the basis to use B-splines +basis = fs.BSplineFamily([0, Tf/2, Tf], [4, 6], vars=2) + +# Use a straight line as an initial guess for the trajectory +initial_guess = np.array( + [x0[i] + (xf[i] - x0[i]) * timepts/Tf for i in (0, 1)]) + +# Solve for an optimal solution +traj4 = fs.solve_flat_ocp( + vehicle_flat, timepts, x0, u0, cost=traj_cost, constraints=constraints, + terminal_cost=terminal_cost, basis=basis, initial_guess=initial_guess, + # minimize_kwargs={'method': 'trust-constr'}, +) +xd, ud = traj4.eval(T) + +plt.figure(4) +plt.suptitle("Lane change with terminal cost + steering constraints") +plot_results(T, xd, ud, rescale=False) # TODO: remove rescale + +# Show the results unless we are running in batch mode +if 'PYCONTROL_TEST_EXAMPLES' not in os.environ: + plt.show() From e65c4f29e1e4a8fa096859a032337fa02f9ee20f Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 19 Aug 2022 22:09:21 -0700 Subject: [PATCH 5/7] additional unit tests for coverage + bug fixes --- control/flatsys/basis.py | 16 ++++++++++--- control/tests/flatsys_test.py | 42 ++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/control/flatsys/basis.py b/control/flatsys/basis.py index baf2b926b..8a3a46af0 100644 --- a/control/flatsys/basis.py +++ b/control/flatsys/basis.py @@ -36,6 +36,8 @@ # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. +import numpy as np + # Basis family class (for use as a base class) class BasisFamily: @@ -86,16 +88,24 @@ def eval(self, coeffs, tlist, var=None): sum([coeffs[i] * self(i, t) for i in range(self.N)]) for t in tlist] - else: - # Multi-variable basis + elif var is None: + # Multi-variable basis with single list of coefficients values = np.empty((self.nvars, tlist.size)) + offset = 0 for j in range(self.nvars): coef_len = self.var_ncoefs(j) values[j] = np.array([ - sum([coeffs[i] * self(i, t, var=j) + sum([coeffs[offset + i] * self(i, t, var=j) for i in range(coef_len)]) for t in tlist]) + offset += coef_len return values + else: + return np.array([ + sum([coeffs[i] * self(i, t, var=var) + for i in range(self.var_ncoefs(var))]) + for t in tlist]) + def eval_deriv(self, i, j, t, var=None): raise NotImplementedError("Internal error; improper basis functions") diff --git a/control/tests/flatsys_test.py b/control/tests/flatsys_test.py index 00f5827d3..e096c0c73 100644 --- a/control/tests/flatsys_test.py +++ b/control/tests/flatsys_test.py @@ -313,7 +313,7 @@ def test_flat_cost_constr(self, basis): np.testing.assert_array_almost_equal(uf, u_const[:, -1]) # Make sure that the solution respects the bounds (with some slop) - for i in range(x_const.shape[0]): + for i in range(x_const.shape[0]): assert all(lb[i] - x_const[i] < rtol * abs(lb[i]) + atol) assert all(x_const[i] - ub[i] < rtol * abs(ub[i]) + atol) @@ -673,3 +673,43 @@ def test_response(self, xf, uf, Tf): np.testing.assert_equal(T, response.time) np.testing.assert_equal(u, response.inputs) np.testing.assert_equal(x, response.states) + + @pytest.mark.parametrize( + "basis", + [fs.PolyFamily(4), + fs.BezierFamily(4), + fs.BSplineFamily([0, 1], 4), + fs.BSplineFamily([0, 1], 4, vars=2), + fs.BSplineFamily([0, 1], [4, 3], [2, 1], vars=2), + ]) + def test_basis_class(self, basis): + timepts = np.linspace(0, 1, 10) + + if basis.nvars is None: + # Evaluate function on basis vectors + for j in range(basis.N): + coefs = np.zeros(basis.N) + coefs[j] = 1 + np.testing.assert_equal( + basis.eval(coefs, timepts), + basis.eval_deriv(j, 0, timepts)) + else: + # Evaluate each variable on basis vectors + for i in range(basis.nvars): + for j in range(basis.var_ncoefs(i)): + coefs = np.zeros(basis.var_ncoefs(i)) + coefs[j] = 1 + np.testing.assert_equal( + basis.eval(coefs, timepts, var=i), + basis.eval_deriv(j, 0, timepts, var=i)) + + # Evaluate multi-variable output + offset = 0 + for i in range(basis.nvars): + for j in range(basis.var_ncoefs(i)): + coefs = np.zeros(basis.N) + coefs[offset] = 1 + np.testing.assert_equal( + basis.eval(coefs, timepts)[i], + basis.eval_deriv(j, 0, timepts, var=i)) + offset += 1 From 2901cbe4e3863083dfbe9a674235ef7cdd01c267 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 20 Aug 2022 09:36:35 -0700 Subject: [PATCH 6/7] updated flatsys documentation --- control/flatsys/__init__.py | 4 +- control/flatsys/basis.py | 2 + control/flatsys/bezier.py | 16 +++++-- control/flatsys/bspline.py | 63 +++++++++++++--------------- control/flatsys/flatsys.py | 11 ++++- control/flatsys/poly.py | 14 +++++-- control/tests/flatsys_test.py | 7 +++- doc/conf.py | 2 +- doc/flatsys.rst | 78 ++++++++++++++++++++++++++++------- 9 files changed, 137 insertions(+), 60 deletions(-) diff --git a/control/flatsys/__init__.py b/control/flatsys/__init__.py index 8ead08dce..157800073 100644 --- a/control/flatsys/__init__.py +++ b/control/flatsys/__init__.py @@ -46,7 +46,9 @@ defined using the :class:`~control.flatsys.BasisFamily` class. The resulting trajectory is return as a :class:`~control.flatsys.SystemTrajectory` object and can be evaluated using the :func:`~control.flatsys.SystemTrajectory.eval` -member function. +member function. Alternatively, the :func:`~control.flatsys.solve_flat_ocp` +function can be used to solve an optimal control problem with trajectory and +final costs or constraints. """ diff --git a/control/flatsys/basis.py b/control/flatsys/basis.py index 8a3a46af0..04abce88a 100644 --- a/control/flatsys/basis.py +++ b/control/flatsys/basis.py @@ -79,6 +79,7 @@ def var_ncoefs(self, var): return self.N if self.nvars is None else self.coef_length[var] def eval(self, coeffs, tlist, var=None): + """Compute function values given the coefficients and time points.""" if self.nvars is None and var != None: raise SystemError("multi-variable call to a scalar basis") @@ -108,4 +109,5 @@ def eval(self, coeffs, tlist, var=None): for t in tlist]) def eval_deriv(self, i, j, t, var=None): + """Evaluate the kth derivative of the ith basis function at time t.""" raise NotImplementedError("Internal error; improper basis functions") diff --git a/control/flatsys/bezier.py b/control/flatsys/bezier.py index 241274268..02b4209a6 100644 --- a/control/flatsys/bezier.py +++ b/control/flatsys/bezier.py @@ -48,15 +48,23 @@ class BezierFamily(BasisFamily): This class represents the family of polynomials of the form .. math:: - \phi_i(t) = \sum_{i=0}^n {n \choose i} - \left( \frac{t}{T_\text{f}} - t \right)^{n-i} - \left( \frac{t}{T_f} \right)^i + \phi_i(t) = \sum_{i=0}^N {N \choose i} + \left( \frac{t}{T} - t \right)^{N-i} + \left( \frac{t}{T} \right)^i + + Parameters + ---------- + N : int + Degree of the Bezier curve. + + T : float + Final time (used for rescaling). """ def __init__(self, N, T=1): """Create a polynomial basis of order N.""" super(BezierFamily, self).__init__(N) - self.T = T # save end of time interval + self.T = float(T) # save end of time interval # Compute the kth derivative of the ith basis function at time t def eval_deriv(self, i, k, t, var=None): diff --git a/control/flatsys/bspline.py b/control/flatsys/bspline.py index 12cf49431..c771beb59 100644 --- a/control/flatsys/bspline.py +++ b/control/flatsys/bspline.py @@ -14,40 +14,35 @@ class BSplineFamily(BasisFamily): """B-spline basis functions. This class represents a B-spline basis for piecewise polynomials defined - across a set of breakpoints with given order and smoothness. + across a set of breakpoints with given degree and smoothness. On each + interval between two breakpoints, we have a polynomial of a given degree + and the spline is continuous up to a given smoothness at interior + breakpoints. + + Parameters + ---------- + breakpoints : 1D array or 2D array of float + The breakpoints for the spline(s). + + degree : int or list of ints + For each spline variable, the degree of the polynomial between + breakpoints. If a single number is given and more than one spline + variable is specified, the same degree is used for each spline + variable. + + smoothness : int or list of ints + For each spline variable, the smoothness at breakpoints (number of + derivatives that should match). + + vars : None or int, optional + The number of spline variables. If specified as None (default), + then the spline basis describes a single variable, with no indexing. + If the number of spine variables is > 0, then the spline basis is + index using the `var` keyword. """ def __init__(self, breakpoints, degree, smoothness=None, vars=None): - """Create a B-spline basis for piecewise smooth polynomials - - Define B-spline polynomials for a set of one or more variables. - B-splines are used as a basis for a set of piecewise smooth - polynomials joined at breakpoints. On each interval we have a - polynomial of a given order and the spline is continuous up to a - given smoothness at interior breakpoints. - - Parameters - ---------- - breakpoints : 1D array or 2D array of float - The breakpoints for the spline(s). - - degree : int or list of ints - For each spline variable, the degree of the polynomial between - break points. If a single number is given and more than one - spline variable is specified, the same order is used for each - spline variable. - - smoothness : int or list of ints - For each spline variable, the smoothness at breakpoints (number - of derivatives that should match). - - vars : None or int, optional - The number of spline variables. If specified as None (default), - then the spline basis describes a single variable, with no - indexing. If the number of spine variables is > 0, then the - spline basis is index using the `var` keyword. - - """ + """Create a B-spline basis for piecewise smooth polynomials.""" # Process the breakpoints for the spline */ breakpoints = np.array(breakpoints, dtype=float) if breakpoints.ndim == 2: @@ -71,11 +66,11 @@ def __init__(self, breakpoints, degree, smoothness=None, vars=None): self.nvars = nvars # - # Process B-spline parameters (order, smoothness) + # Process B-spline parameters (degree, smoothness) # # B-splines are defined on a set of intervals separated by # breakpoints. On each interval we have a polynomial of a certain - # order and the spline is continuous up to a given smoothness at + # degree and the spline is continuous up to a given smoothness at # breakpoints. The code in this section allows some flexibility in # the way that all of this information is supplied, including using # scalar values for parameters (which are then broadcast to each @@ -83,7 +78,7 @@ def __init__(self, breakpoints, degree, smoothness=None, vars=None): # information, when possible. # - # Utility function for broadcasting spline params (order, smoothness) + # Utility function for broadcasting spline params (degree, smoothness) def process_spline_parameters( values, length, allowed_types, minimum=0, default=None, name='unknown'): diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index 78d7696f9..849c41c72 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -61,8 +61,10 @@ class FlatSystem(NonlinearIOSystem): ---------- forward : callable A function to compute the flat flag given the states and input. + reverse : callable A function to compute the states and input given the flat flag. + updfcn : callable, optional Function returning the state update function @@ -73,6 +75,7 @@ class FlatSystem(NonlinearIOSystem): time, and `param` is an optional dict containing the values of parameters used by the function. If not specified, the state space update will be computed using the flat system coordinates. + outfcn : callable Function returning the output at the given state @@ -80,6 +83,7 @@ class FlatSystem(NonlinearIOSystem): where the arguments are the same as for `upfcn`. If not specified, the output will be the flat outputs. + inputs : int, list of str, or None Description of the system inputs. This can be given as an integer count or as a list of strings that name the individual signals. @@ -88,19 +92,24 @@ class FlatSystem(NonlinearIOSystem): this parameter is not given or given as `None`, the relevant quantity will be determined when possible based on other information provided to functions using the system. + outputs : int, list of str, or None Description of the system outputs. Same format as `inputs`. + states : int, list of str, or None Description of the system states. Same format as `inputs`. + dt : None, True or float, optional System timebase. None (default) indicates continuous time, True indicates discrete time with undefined sampling time, positive number is discrete time with specified sampling time. + params : dict, optional Parameter values for the systems. Passed to the evaluation functions for the system as default values, overriding internal defaults. + name : string, optional System name (used for specifying signals) @@ -638,7 +647,7 @@ def solve_flat_ocp( ----- 1. Additional keyword parameters can be used to fine tune the behavior of the underlying optimization function. See `minimize_*` keywords - in :func:`OptimalControlProblem` for more information. + in :func:`~control.optimal.OptimalControlProblem` for more information. 2. The return data structure includes the following additional attributes: * success : bool indicating whether the optimization succeeded diff --git a/control/flatsys/poly.py b/control/flatsys/poly.py index 65d1b500d..bfd8de633 100644 --- a/control/flatsys/poly.py +++ b/control/flatsys/poly.py @@ -47,13 +47,21 @@ class PolyFamily(BasisFamily): This class represents the family of polynomials of the form .. math:: - \phi_i(t) = t^i + \phi_i(t) = \left( \frac{t}{T} \right)^i + + Parameters + ---------- + N : int + Degree of the Bezier curve. + + T : float + Final time (used for rescaling). """ - def __init__(self, N, T=1.): + def __init__(self, N, T=1): """Create a polynomial basis of order N.""" super(PolyFamily, self).__init__(N) - self.T = T + self.T = float(T) # save end of time interval # Compute the kth derivative of the ith basis function at time t def eval_deriv(self, i, k, t, var=None): diff --git a/control/tests/flatsys_test.py b/control/tests/flatsys_test.py index e096c0c73..665bfd968 100644 --- a/control/tests/flatsys_test.py +++ b/control/tests/flatsys_test.py @@ -12,6 +12,7 @@ import pytest import scipy as sp import re +import warnings import control as ct import control.flatsys as fs @@ -590,8 +591,10 @@ def test_solve_flat_ocp_errors(self): cost_fcn = opt.quadratic_cost( flat_sys, np.diag([1, 1]), 1, x0=xf, u0=uf) - # Solving without basis specified should be OK - traj = fs.solve_flat_ocp(flat_sys, timepts, x0, u0, cost_fcn) + # Solving without basis specified should be OK (may generate warning) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + traj = fs.solve_flat_ocp(flat_sys, timepts, x0, u0, cost_fcn) x, u = traj.eval(timepts) np.testing.assert_array_almost_equal(x0, x[:, 0]) if not traj.success: diff --git a/doc/conf.py b/doc/conf.py index 19c2970e1..e2210aeaa 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -87,7 +87,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/doc/flatsys.rst b/doc/flatsys.rst index 7599dd2af..ab8d7bf4c 100644 --- a/doc/flatsys.rst +++ b/doc/flatsys.rst @@ -64,17 +64,17 @@ trajectory of the system. We can parameterize the flat output trajectory using a set of smooth basis functions :math:`\psi_i(t)`: .. math:: - z(t) = \sum_{i=1}^N \alpha_i \psi_i(t), \qquad \alpha_i \in R + z(t) = \sum_{i=1}^N c_i \psi_i(t), \qquad c_i \in R -We seek a set of coefficients :math:`\alpha_i`, :math:`i = 1, \dots, N` such +We seek a set of coefficients :math:`c_i`, :math:`i = 1, \dots, N` such that :math:`z(t)` satisfies the boundary conditions for :math:`x(0)` and :math:`x(T)`. The derivatives of the flat output can be computed in terms of the derivatives of the basis functions: .. math:: - \dot z(t) &= \sum_{i=1}^N \alpha_i \dot \psi_i(t) \\ + \dot z(t) &= \sum_{i=1}^N c_i \dot \psi_i(t) \\ &\,\vdots \\ - \dot z^{(q)}(t) &= \sum_{i=1}^N \alpha_i \psi^{(q)}_i(t). + \dot z^{(q)}(t) &= \sum_{i=1}^N c_i \psi^{(q)}_i(t). We can thus write the conditions on the flat outputs and their derivatives as @@ -90,7 +90,7 @@ derivatives as \vdots & \vdots & & \vdots \\ \psi^{(q)}_1(T) & \psi^{(q)}_2(T) & \dots & \psi^{(q)}_N(T) \\ \end{bmatrix} - \begin{bmatrix} \alpha_1 \\ \vdots \\ \alpha_N \end{bmatrix} = + \begin{bmatrix} c_1 \\ \vdots \\ c_N \end{bmatrix} = \begin{bmatrix} z(0) \\ \dot z(0) \\ \vdots \\ z^{(q)}(0) \\[1ex] z(T) \\ \dot z(T) \\ \vdots \\ z^{(q)}(T) \\ @@ -99,7 +99,7 @@ derivatives as This equation is a *linear* equation of the form .. math:: - M \alpha = \begin{bmatrix} \bar z(0) \\ \bar z(T) \end{bmatrix} + M c = \begin{bmatrix} \bar z(0) \\ \bar z(T) \end{bmatrix} where :math:`\bar z` is called the *flat flag* for the system. Assuming that :math:`M` has a sufficient number of columns and that it is full @@ -139,22 +139,28 @@ For a linear system, a flat system representation can be generated using the For more general systems, the `FlatSystem` object must be created manually:: - sys = control.flatsys.FlatSystem(nstate, ninputs, forward, reverse) + sys = control.flatsys.FlatSystem( + forward, reverse, states=['x1', ..., 'xn'], inputs=['u1', ..., 'um']) In addition to the flat system description, a set of basis functions -:math:`\phi_i(t)` must be chosen. The `FlatBasis` class is used to represent -the basis functions. A polynomial basis function of the form 1, :math:`t`, -:math:`t^2`, ... can be computed using the `PolyBasis` class, which is -initialized by passing the desired order of the polynomial basis set:: +:math:`\phi_i(t)` must be chosen. The `FlatBasis` class is used to +represent the basis functions. A polynomial basis function of the +form 1, :math:`t`, :math:`t^2`, ... can be computed using the +:class:`~control.flatsys.PolyFamily` class, which is initialized by +passing the desired order of the polynomial basis set:: - polybasis = control.flatsys.PolyBasis(N) + basis = control.flatsys.PolyFamily(N) + +Additional basis function families include Bezier curves +(:class:`~control.flatsys.BezierFamily`) and B-splines +(:class:`~control.flatsys.BSplineFamily`). Once the system and basis function have been defined, the :func:`~control.flatsys.point_to_point` function can be used to compute a trajectory between initial and final states and inputs:: traj = control.flatsys.point_to_point( - sys, Tf, x0, u0, xf, uf, basis=polybasis) + sys, Tf, x0, u0, xf, uf, basis=basis) The returned object has class :class:`~control.flatsys.SystemTrajectory` and can be used to compute the state and input trajectory between the initial and @@ -169,6 +175,18 @@ The :func:`~control.flatsys.point_to_point` function also allows the specification of a cost function and/or constraints, in the same format as :func:`~control.optimal.solve_ocp`. +The :func:`~control.flatsys.solve_flat_ocp` function can be used to +solve an optimal control problem without a final state:: + + traj = control.flatsys.solve_flat_ocp( + sys, timepts, x0, u0, cost, basis=basis) + +The `cost` parameter is a function function with call signature +`cost(x, u)` and should return the (incremental) cost at the given +state, and input. It will be evaluated at each point in the `timepts` +vector. The `terminal_cost` parameter can be used to specify a cost +function for the final point in the trajectory. + Example ======= @@ -179,7 +197,9 @@ derived *Feedback Systems* by Astrom and Murray, Example 3.11. .. code-block:: python + import control as ct import control.flatsys as fs + import numpy as np # Function to take states, inputs and return the flat flag def vehicle_flat_forward(x, u, params={}): @@ -228,7 +248,8 @@ derived *Feedback Systems* by Astrom and Murray, Example 3.11. return x, u vehicle_flat = fs.FlatSystem( - 3, 2, forward=vehicle_flat_forward, reverse=vehicle_flat_reverse) + vehicle_flat_forward, vehicle_flat_reverse, + inputs=('v', 'delta'), outputs=('x', 'y'), states=('x', 'y', 'theta')) To find a trajectory from an initial state :math:`x_0` to a final state :math:`x_\text{f}` in time :math:`T_\text{f}` we solve a point-to-point @@ -253,6 +274,33 @@ the endpoints. t = np.linspace(0, Tf, 100) x, u = traj.eval(t) +Alternatively, we can solve an optimal control problem in which we +minimize a cost function along the trajectory as well as a terminal +cost:` + +.. code-block:: python + + # Define the cost along the trajectory: penalize steering angle + traj_cost = ct.optimal.quadratic_cost( + vehicle_flat, None, np.diag([0.1, 10]), u0=uf) + + # Define the terminal cost: penalize distance from the end point + term_cost = ct.optimal.quadratic_cost( + vehicle_flat, np.diag([1e3, 1e3, 1e3]), None, x0=xf) + + # Use a straight line as the initial guess + timepts = np.linspace(0, Tf, 10) + initial_guess = np.array( + [x0[i] + (xf[i] - x0[i]) * timepts/Tf for i in (0, 1)]) + + # Solve the optimal control problem, evaluating cost at timepts + bspline = fs.BSplineFamily([0, Tf/2, Tf], 4) + traj = fs.solve_flat_ocp( + vehicle_flat, timepts, x0, u0, traj_cost, + terminal_cost=term_cost, initial_guess=initial_guess, basis=bspline) + + x, u = traj.eval(t) + Module classes and functions ============================ @@ -262,6 +310,7 @@ Module classes and functions ~control.flatsys.BasisFamily ~control.flatsys.BezierFamily + ~control.flatsys.BSplineFamily ~control.flatsys.FlatSystem ~control.flatsys.LinearFlatSystem ~control.flatsys.PolyFamily @@ -271,3 +320,4 @@ Module classes and functions :toctree: generated/ ~control.flatsys.point_to_point + ~control.flatsys.solve_flat_ocp From 47262f5ceaf5fc5a7d8c6947d9d24c03e3078fd6 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 28 Aug 2022 20:52:47 -0700 Subject: [PATCH 7/7] update coeff handling to allow multi-variable basis --- control/optimal.py | 65 ++++++++++++++++------------------- control/tests/optimal_test.py | 21 ++++++----- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/control/optimal.py b/control/optimal.py index da1bdcb8e..4913cc341 100644 --- a/control/optimal.py +++ b/control/optimal.py @@ -268,17 +268,16 @@ def _cost_function(self, coeffs): start_time = time.process_time() logging.info("_cost_function called at: %g", start_time) - # Retrieve the initial state and reshape the input vector + # Retrieve the saved initial state x = self.x - coeffs = coeffs.reshape((self.system.ninputs, -1)) - # Compute time points (if basis present) + # Compute inputs if self.basis: if self.log: logging.debug("coefficients = " + str(coeffs)) inputs = self._coeffs_to_inputs(coeffs) else: - inputs = coeffs + inputs = coeffs.reshape((self.system.ninputs, -1)) # See if we already have a simulation for this condition if np.array_equal(coeffs, self.last_coeffs) and \ @@ -391,15 +390,14 @@ def _constraint_function(self, coeffs): start_time = time.process_time() logging.info("_constraint_function called at: %g", start_time) - # Retrieve the initial state and reshape the input vector + # Retrieve the initial state x = self.x - coeffs = coeffs.reshape((self.system.ninputs, -1)) - # Compute time points (if basis present) + # Compute input at time points if self.basis: inputs = self._coeffs_to_inputs(coeffs) else: - inputs = coeffs + inputs = coeffs.reshape((self.system.ninputs, -1)) # See if we already have a simulation for this condition if np.array_equal(coeffs, self.last_coeffs) \ @@ -473,15 +471,14 @@ def _eqconst_function(self, coeffs): start_time = time.process_time() logging.info("_eqconst_function called at: %g", start_time) - # Retrieve the initial state and reshape the input vector + # Retrieve the initial state x = self.x - coeffs = coeffs.reshape((self.system.ninputs, -1)) - # Compute time points (if basis present) + # Compute input at time points if self.basis: inputs = self._coeffs_to_inputs(coeffs) else: - inputs = coeffs + inputs = coeffs.reshape((self.system.ninputs, -1)) # See if we already have a simulation for this condition if np.array_equal(coeffs, self.last_coeffs) and \ @@ -609,34 +606,36 @@ def _inputs_to_coeffs(self, inputs): return inputs # Solve least squares problems (M x = b) for coeffs on each input - coeffs = np.zeros((self.system.ninputs, self.basis.N)) + coeffs = [] for i in range(self.system.ninputs): # Set up the matrices to get inputs - M = np.zeros((self.timepts.size, self.basis.N)) + M = np.zeros((self.timepts.size, self.basis.var_ncoefs(i))) b = np.zeros(self.timepts.size) # Evaluate at each time point and for each basis function # TODO: vectorize for j, t in enumerate(self.timepts): - for k in range(self.basis.N): + for k in range(self.basis.var_ncoefs(i)): M[j, k] = self.basis(k, t) - b[j] = inputs[i, j] + b[j] = inputs[i, j] # Solve a least squares problem for the coefficients alpha, residuals, rank, s = np.linalg.lstsq(M, b, rcond=None) - coeffs[i, :] = alpha + coeffs.append(alpha) - return coeffs + return np.hstack(coeffs) # Utility function to convert coefficient vector to input vector def _coeffs_to_inputs(self, coeffs): # TODO: vectorize inputs = np.zeros((self.system.ninputs, self.timepts.size)) - for i, t in enumerate(self.timepts): - for k in range(self.basis.N): - phi_k = self.basis(k, t) - for inp in range(self.system.ninputs): - inputs[inp, i] += coeffs[inp, k] * phi_k + offset = 0 + for i in range(self.system.ninputs): + length = self.basis.var_ncoefs(i) + for j, t in enumerate(self.timepts): + for k in range(length): + inputs[i, j] += coeffs[offset + k] * self.basis(k, t) + offset += length return inputs # @@ -680,7 +679,7 @@ def _print_statistics(self, reset=True): # Compute the optimal trajectory from the current state def compute_trajectory( - self, x, squeeze=None, transpose=None, return_states=None, + self, x, squeeze=None, transpose=None, return_states=True, initial_guess=None, print_summary=True, **kwargs): """Compute the optimal input at state x @@ -689,8 +688,7 @@ def compute_trajectory( x : array-like or number, optional Initial state for the system. return_states : bool, optional - If True, return the values of the state at each time (default = - False). + If True (default), return the values of the state at each time. squeeze : bool, optional If True and if the system has a single output, return the system output as a 1D array rather than a 2D array. If False, return the @@ -837,7 +835,7 @@ class OptimalControlResult(sp.optimize.OptimizeResult): """ def __init__( - self, ocp, res, return_states=False, print_summary=False, + self, ocp, res, return_states=True, print_summary=False, transpose=None, squeeze=None): """Create a OptimalControlResult object""" @@ -848,14 +846,11 @@ def __init__( # Remember the optimal control problem that we solved self.problem = ocp - # Reshape and process the input vector - coeffs = res.x.reshape((ocp.system.ninputs, -1)) - - # Compute time points (if basis present) + # Compute input at time points if ocp.basis: - inputs = ocp._coeffs_to_inputs(coeffs) + inputs = ocp._coeffs_to_inputs(res.x) else: - inputs = coeffs + inputs = res.x.reshape((ocp.system.ninputs, -1)) # See if we got an answer if not res.success: @@ -894,7 +889,7 @@ def __init__( def solve_ocp( sys, horizon, X0, cost, trajectory_constraints=None, terminal_cost=None, terminal_constraints=[], initial_guess=None, basis=None, squeeze=None, - transpose=None, return_states=False, log=False, **kwargs): + transpose=None, return_states=True, log=False, **kwargs): """Compute the solution to an optimal control problem @@ -949,7 +944,7 @@ def solve_ocp( If `True`, turn on logging messages (using Python logging module). return_states : bool, optional - If True, return the values of the state at each time (default = False). + If True, return the values of the state at each time (default = True). squeeze : bool, optional If True and if the system has a single output, return the system diff --git a/control/tests/optimal_test.py b/control/tests/optimal_test.py index 027b55c75..b100e7e14 100644 --- a/control/tests/optimal_test.py +++ b/control/tests/optimal_test.py @@ -300,8 +300,9 @@ def test_terminal_constraints(sys_args): np.testing.assert_almost_equal(res.inputs, u1) # Re-run using a basis function and see if we get the same answer - res = opt.solve_ocp(sys, time, x0, cost, terminal_constraints=final_point, - basis=flat.BezierFamily(8, Tf)) + res = opt.solve_ocp( + sys, time, x0, cost, terminal_constraints=final_point, + basis=flat.BezierFamily(8, Tf)) # Final point doesn't affect cost => don't need to test np.testing.assert_almost_equal( @@ -471,8 +472,12 @@ def test_ocp_argument_errors(): sys, time, x0, cost, terminal_constraints=constraints) -def test_optimal_basis_simple(): - sys = ct.ss2io(ct.ss([[1, 1], [0, 1]], [[1], [0.5]], np.eye(2), 0, 1)) +@pytest.mark.parametrize("basis", [ + flat.PolyFamily(4), flat.PolyFamily(6), + flat.BezierFamily(4), flat.BSplineFamily([0, 4, 8], 6) + ]) +def test_optimal_basis_simple(basis): + sys = ct.ss([[1, 1], [0, 1]], [[1], [0.5]], np.eye(2), 0, 1) # State and input constraints constraints = [ @@ -492,7 +497,7 @@ def test_optimal_basis_simple(): # Basic optimal control problem res1 = opt.solve_ocp( sys, time, x0, cost, constraints, - basis=flat.BezierFamily(4, Tf), return_x=True) + terminal_cost=cost, basis=basis, return_x=True) assert res1.success # Make sure the constraints were satisfied @@ -503,14 +508,14 @@ def test_optimal_basis_simple(): # Pass an initial guess and rerun res2 = opt.solve_ocp( sys, time, x0, cost, constraints, initial_guess=0.99*res1.inputs, - basis=flat.BezierFamily(4, Tf), return_x=True) + terminal_cost=cost, basis=basis, return_x=True) assert res2.success np.testing.assert_allclose(res2.inputs, res1.inputs, atol=0.01, rtol=0.01) # Run with logging turned on for code coverage res3 = opt.solve_ocp( - sys, time, x0, cost, constraints, - basis=flat.BezierFamily(4, Tf), return_x=True, log=True) + sys, time, x0, cost, constraints, terminal_cost=cost, + basis=basis, return_x=True, log=True) assert res3.success np.testing.assert_almost_equal(res3.inputs, res1.inputs, decimal=3)