From 49febce68579a47c655a92756abd77b75f2aecd1 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Tue, 29 Dec 2020 11:03:10 -0800 Subject: [PATCH 01/18] allow naming of linearized system signals --- control/iosys.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 65d6c1228..0686e290a 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -494,7 +494,8 @@ def feedback(self, other=1, sign=-1, params={}): # Return the newly created system return newsys - def linearize(self, x0, u0, t=0, params={}, eps=1e-6): + def linearize(self, x0, u0, t=0, params={}, eps=1e-6, + name=None, copy=False, **kwargs): """Linearize an input/output system at a given state and input. Return the linearization of an input/output system at a given state @@ -547,8 +548,20 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6): D[:, i] = (self._out(t, x0, u0 + du) - H0) / eps # Create the state space system - linsys = StateSpace(A, B, C, D, self.dt, remove_useless=False) - return LinearIOSystem(linsys) + linsys = LinearIOSystem( + StateSpace(A, B, C, D, self.dt, remove_useless=False), + name=name, **kwargs) + + # Set the names the system, inputs, outputs, and states + if copy: + linsys.ninputs, linsys.input_index = self.ninputs, \ + self.input_index.copy() + linsys.noutputs, linsys.output_index = \ + self.noutputs, self.output_index.copy() + linsys.nstates, linsys.state_index = \ + self.nstates, self.state_index.copy() + + return linsys def copy(self, newname=None): """Make a copy of an input/output system.""" @@ -1759,6 +1772,11 @@ def linearize(sys, xeq, ueq=[], t=0, params={}, **kw): params : dict, optional Parameter values for the systems. Passed to the evaluation functions for the system as default values, overriding internal defaults. + copy : bool, Optional + If `copy` is True, copy the names of the input signals, output signals, + and states to the linearized system. + name : string, optional + Set the name of the linearized system. Returns ------- From cd9e75a0038ae91aa37235065424bff038c39416 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Thu, 31 Dec 2020 21:28:30 -0800 Subject: [PATCH 02/18] PEP8 cleanup --- control/iosys.py | 143 ++++++++++++++++++++++++++++------------------- 1 file changed, 87 insertions(+), 56 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 0686e290a..07dacdc5c 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -48,6 +48,7 @@ # Define module default parameter values _iosys_defaults = {} + class InputOutputSystem(object): """A class for representing input/output systems. @@ -75,7 +76,7 @@ class for a set of subclasses that are used to implement specific System timebase. 0 (default) indicates continuous time, True indicates discrete time with unspecified sampling time, positive number is discrete time with specified - sampling time, None indicates unspecified timebase (either + sampling time, None indicates unspecified timebase (either continuous or discrete time). params : dict, optional Parameter values for the systems. Passed to the evaluation functions @@ -95,7 +96,7 @@ class for a set of subclasses that are used to implement specific System timebase. 0 (default) indicates continuous time, True indicates discrete time with unspecified sampling time, positive number is discrete time with specified - sampling time, None indicates unspecified timebase (either + sampling time, None indicates unspecified timebase (either continuous or discrete time). params : dict, optional Parameter values for the systems. Passed to the evaluation functions @@ -118,6 +119,7 @@ class for a set of subclasses that are used to implement specific """ idCounter = 0 + def name_or_default(self, name=None): if name is None: name = "sys[{}]".format(InputOutputSystem.idCounter) @@ -153,15 +155,15 @@ def __init__(self, inputs=None, outputs=None, states=None, params={}, System timebase. 0 (default) indicates continuous time, True indicates discrete time with unspecified sampling time, positive number is discrete time with specified - sampling time, None indicates unspecified timebase (either + sampling time, None indicates unspecified timebase (either continuous or discrete time). params : dict, optional Parameter values for the systems. Passed to the evaluation functions for the system as default values, overriding internal defaults. name : string, optional - System name (used for specifying signals). If unspecified, a generic - name is generated with a unique integer id. + System name (used for specifying signals). If unspecified, a + generic name is generated with a unique integer id. Returns ------- @@ -190,11 +192,14 @@ def __str__(self): """String representation of an input/output system""" str = "System: " + (self.name if self.name else "(None)") + "\n" str += "Inputs (%s): " % self.ninputs - for key in self.input_index: str += key + ", " + for key in self.input_index: + str += key + ", " str += "\nOutputs (%s): " % self.noutputs - for key in self.output_index: str += key + ", " + for key in self.output_index: + str += key + ", " str += "\nStates (%s): " % self.nstates - for key in self.state_index: str += key + ", " + for key in self.state_index: + str += key + ", " return str def __mul__(sys2, sys1): @@ -224,10 +229,11 @@ def __mul__(sys2, sys1): # Make sure timebase are compatible dt = common_timebase(sys1.dt, sys2.dt) - inplist = [(0,i) for i in range(sys1.ninputs)] - outlist = [(1,i) for i in range(sys2.noutputs)] + inplist = [(0, i) for i in range(sys1.ninputs)] + outlist = [(1, i) for i in range(sys2.noutputs)] # Return the series interconnection between the systems - newsys = InterconnectedSystem((sys1, sys2), inplist=inplist, outlist=outlist) + newsys = InterconnectedSystem( + (sys1, sys2), inplist=inplist, outlist=outlist) # Set up the connection map manually newsys.set_connect_map(np.block( @@ -281,10 +287,11 @@ def __add__(sys1, sys2): ninputs = sys1.ninputs noutputs = sys1.noutputs - inplist = [[(0,i),(1,i)] for i in range(ninputs)] - outlist = [[(0,i),(1,i)] for i in range(noutputs)] + inplist = [[(0, i), (1, i)] for i in range(ninputs)] + outlist = [[(0, i), (1, i)] for i in range(noutputs)] # Create a new system to handle the composition - newsys = InterconnectedSystem((sys1, sys2), inplist=inplist, outlist=outlist) + newsys = InterconnectedSystem( + (sys1, sys2), inplist=inplist, outlist=outlist) # Return the newly created system return newsys @@ -303,10 +310,11 @@ def __neg__(sys): if sys.ninputs is None or sys.noutputs is None: raise ValueError("Can't determine number of inputs or outputs") - inplist = [(0,i) for i in range(sys.ninputs)] - outlist = [(0,i,-1) for i in range(sys.noutputs)] + inplist = [(0, i) for i in range(sys.ninputs)] + outlist = [(0, i, -1) for i in range(sys.noutputs)] # Create a new system to hold the negation - newsys = InterconnectedSystem((sys,), dt=sys.dt, inplist=inplist, outlist=outlist) + newsys = InterconnectedSystem( + (sys,), dt=sys.dt, inplist=inplist, outlist=outlist) # Return the newly created system return newsys @@ -476,12 +484,13 @@ def feedback(self, other=1, sign=-1, params={}): # Make sure timebases are compatible dt = common_timebase(self.dt, other.dt) - inplist = [(0,i) for i in range(self.ninputs)] - outlist = [(0,i) for i in range(self.noutputs)] + inplist = [(0, i) for i in range(self.ninputs)] + outlist = [(0, i) for i in range(self.noutputs)] # Return the series interconnection between the systems - newsys = InterconnectedSystem((self, other), inplist=inplist, outlist=outlist, - params=params, dt=dt) + newsys = InterconnectedSystem( + (self, other), inplist=inplist, outlist=outlist, + params=params, dt=dt) # Set up the connecton map manually newsys.set_connect_map(np.block( @@ -514,8 +523,10 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, ninputs = _find_size(self.ninputs, u0) # Convert x0, u0 to arrays, if needed - if np.isscalar(x0): x0 = np.ones((nstates,)) * x0 - if np.isscalar(u0): u0 = np.ones((ninputs,)) * u0 + if np.isscalar(x0): + x0 = np.ones((nstates,)) * x0 + if np.isscalar(u0): + u0 = np.ones((ninputs,)) * u0 # Compute number of outputs by evaluating the output function noutputs = _find_size(self.noutputs, self._out(t, x0, u0)) @@ -566,7 +577,8 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, def copy(self, newname=None): """Make a copy of an input/output system.""" newsys = copy.copy(self) - newsys.name = self.name_or_default("copy of " + self.name if not newname else newname) + newsys.name = self.name_or_default( + "copy of " + self.name if not newname else newname) return newsys @@ -605,15 +617,15 @@ def __init__(self, linsys, inputs=None, outputs=None, states=None, System timebase. 0 (default) indicates continuous time, True indicates discrete time with unspecified sampling time, positive number is discrete time with specified - sampling time, None indicates unspecified timebase (either + sampling time, None indicates unspecified timebase (either continuous or discrete time). params : dict, optional Parameter values for the systems. Passed to the evaluation functions for the system as default values, overriding internal defaults. name : string, optional - System name (used for specifying signals). If unspecified, a generic - name is generated with a unique integer id. + System name (used for specifying signals). If unspecified, a + generic name is generated with a unique integer id. Returns ------- @@ -729,11 +741,11 @@ def __init__(self, updfcn, outfcn=None, inputs=None, outputs=None, * dt = 0: continuous time system (default) * dt > 0: discrete time system with sampling period 'dt' * dt = True: discrete time with unspecified sampling period - * dt = None: no timebase specified + * dt = None: no timebase specified name : string, optional - System name (used for specifying signals). If unspecified, a generic - name is generated with a unique integer id. + System name (used for specifying signals). If unspecified, a + generic name is generated with a unique integer id. Returns ------- @@ -899,23 +911,28 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], * dt = 0: continuous time system (default) * dt > 0: discrete time system with sampling period 'dt' * dt = True: discrete time with unspecified sampling period - * dt = None: no timebase specified + * dt = None: no timebase specified name : string, optional - System name (used for specifying signals). If unspecified, a generic - name is generated with a unique integer id. + System name (used for specifying signals). If unspecified, a + generic name is generated with a unique integer id. """ # Convert input and output names to lists if they aren't already - if not isinstance(inplist, (list, tuple)): inplist = [inplist] - if not isinstance(outlist, (list, tuple)): outlist = [outlist] + if not isinstance(inplist, (list, tuple)): + inplist = [inplist] + if not isinstance(outlist, (list, tuple)): + outlist = [outlist] # Check to make sure all systems are consistent self.syslist = syslist self.syslist_index = {} - nstates = 0; self.state_offset = [] - ninputs = 0; self.input_offset = [] - noutputs = 0; self.output_offset = [] + nstates = 0 + self.state_offset = [] + ninputs = 0 + self.input_offset = [] + noutputs = 0 + self.output_offset = [] sysobj_name_dct = {} sysname_count_dct = {} for sysidx, sys in enumerate(syslist): @@ -943,14 +960,16 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Duplicates are renamed sysname_1, sysname_2, etc. if sys in sysobj_name_dct: sys = sys.copy() - warn("Duplicate object found in system list: %s. Making a copy" % str(sys)) + warn("Duplicate object found in system list: %s. " + "Making a copy" % str(sys)) if sys.name is not None and sys.name in sysname_count_dct: count = sysname_count_dct[sys.name] sysname_count_dct[sys.name] += 1 sysname = sys.name + "_" + str(count) sysobj_name_dct[sys] = sysname self.syslist_index[sysname] = sysidx - warn("Duplicate name found in system list. Renamed to {}".format(sysname)) + warn("Duplicate name found in system list. " + "Renamed to {}".format(sysname)) else: sysname_count_dct[sys.name] = 1 sysobj_name_dct[sys] = sys.name @@ -959,7 +978,8 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], if states is None: states = [] for sys, sysname in sysobj_name_dct.items(): - states += [sysname + '.' + statename for statename in sys.state_index.keys()] + states += [sysname + '.' + + statename for statename in sys.state_index.keys()] # Create the I/O system super(InterconnectedSystem, self).__init__( @@ -989,14 +1009,16 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Convert the input list to a matrix: maps system to subsystems self.input_map = np.zeros((ninputs, self.ninputs)) for index, inpspec in enumerate(inplist): - if isinstance(inpspec, (int, str, tuple)): inpspec = [inpspec] + if isinstance(inpspec, (int, str, tuple)): + inpspec = [inpspec] for spec in inpspec: self.input_map[self._parse_input_spec(spec), index] = 1 # Convert the output list to a matrix: maps subsystems to system self.output_map = np.zeros((self.noutputs, noutputs + ninputs)) for index, outspec in enumerate(outlist): - if isinstance(outspec, (int, str, tuple)): outspec = [outspec] + if isinstance(outspec, (int, str, tuple)): + outspec = [outspec] for spec in outspec: ylist_index, gain = self._parse_output_spec(spec) self.output_map[index, ylist_index] = gain @@ -1041,7 +1063,7 @@ def _rhs(self, t, x, u): # Go through each system and update the right hand side for that system xdot = np.zeros((self.nstates,)) # Array to hold results - state_index = 0; input_index = 0 # Start at the beginning + state_index, input_index = 0, 0 # Start at the beginning for sys in self.syslist: # Update the right hand side for this subsystem if sys.nstates != 0: @@ -1084,7 +1106,7 @@ def _compute_static_io(self, t, x, u): # TODO (later): see if there is a more efficient way to compute cycle_count = len(self.syslist) + 1 while cycle_count > 0: - state_index = 0; input_index = 0; output_index = 0 + state_index, input_index, output_index = 0, 0, 0 for sys in self.syslist: # Compute outputs for each system from current state ysys = sys._out( @@ -1097,8 +1119,8 @@ def _compute_static_io(self, t, x, u): # Store the input in the second part of ylist ylist[noutputs + input_index: - noutputs + input_index + sys.ninputs] = \ - ulist[input_index:input_index + sys.ninputs] + noutputs + input_index + sys.ninputs] = \ + ulist[input_index:input_index + sys.ninputs] # Increment the index pointers state_index += sys.nstates @@ -1229,7 +1251,8 @@ def _parse_signal(self, spec, signame='input', dictname=None): return spec # Figure out the name of the dictionary to use - if dictname is None: dictname = signame + '_index' + if dictname is None: + dictname = signame + '_index' if isinstance(spec, str): # If we got a dotted string, break up into pieces @@ -1415,7 +1438,8 @@ def input_output_response(sys, T, U=0., X0=0, params={}, method='RK45', for i in range(len(T)): u = U[i] if len(U.shape) == 1 else U[:, i] y[:, i] = sys._out(T[i], [], u) - if (squeeze): y = np.squeeze(y) + if squeeze: + y = np.squeeze(y) if return_x: return T, y, [] else: @@ -1495,7 +1519,8 @@ def ivp_rhs(t, x): return sys._rhs(t, x, u(t)) raise TypeError("Can't determine system type") # Get rid of extra dimensions in the output, of desired - if (squeeze): y = np.squeeze(y) + if squeeze: + y = np.squeeze(y) if return_x: return soln.t, y, soln.y @@ -1580,9 +1605,12 @@ def find_eqpt(sys, x0, u0=[], y0=None, t=0, params={}, noutputs = _find_size(sys.noutputs, y0) # Convert x0, u0, y0 to arrays, if needed - if np.isscalar(x0): x0 = np.ones((nstates,)) * x0 - if np.isscalar(u0): u0 = np.ones((ninputs,)) * u0 - if np.isscalar(y0): y0 = np.ones((ninputs,)) * y0 + if np.isscalar(x0): + x0 = np.ones((nstates,)) * x0 + if np.isscalar(u0): + u0 = np.ones((ninputs,)) * u0 + if np.isscalar(y0): + y0 = np.ones((ninputs,)) * y0 # Discrete-time not yet supported if isdtime(sys, strict=True): @@ -1718,7 +1746,8 @@ def rootfun(z): # Compute the update and output maps dx = sys._rhs(t, x, u) - dx0 - if dtime: dx -= x # TODO: check + if dtime: + dx -= x # TODO: check dy = sys._out(t, x, u) - y0 # Map the results into the constrained variables @@ -1736,7 +1765,8 @@ def rootfun(z): z = (x, u, sys._out(t, x, u)) # Return the result based on what the user wants and what we found - if not return_y: z = z[0:2] # Strip y from result if not desired + if not return_y: + z = z[0:2] # Strip y from result if not desired if return_result: # Return whatever we got, along with the result dictionary return z + (result,) @@ -1810,7 +1840,8 @@ def _find_size(sysval, vecval): # Convert a state space system into an input/output system (wrapper) -def ss2io(*args, **kw): return LinearIOSystem(*args, **kw) +def ss2io(*args, **kw): + return LinearIOSystem(*args, **kw) ss2io.__doc__ = LinearIOSystem.__init__.__doc__ From fea1aeace74a12e5efd6b146d3abf98dee9abe1b Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Thu, 31 Dec 2020 21:55:59 -0800 Subject: [PATCH 03/18] append _linearize for system name + unit tests --- control/iosys.py | 12 +++++++--- control/tests/iosys_test.py | 48 ++++++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 07dacdc5c..67b618a26 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -551,7 +551,7 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, A[:, i] = (self._rhs(t, x0 + dx, u0) - F0) / eps C[:, i] = (self._out(t, x0 + dx, u0) - H0) / eps - # Perturb each of the input variables and compute linearization + # Perturb each of the input variables and compute linearization for i in range(ninputs): du = np.zeros((ninputs,)) du[i] = eps @@ -565,6 +565,8 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, # Set the names the system, inputs, outputs, and states if copy: + if name is None: + linsys.name = self.name + "_linearized" linsys.ninputs, linsys.input_index = self.ninputs, \ self.input_index.copy() linsys.noutputs, linsys.output_index = \ @@ -1804,9 +1806,13 @@ def linearize(sys, xeq, ueq=[], t=0, params={}, **kw): for the system as default values, overriding internal defaults. copy : bool, Optional If `copy` is True, copy the names of the input signals, output signals, - and states to the linearized system. + and states to the linearized system. If `name` is not specified, + the system name is set to the input system name with the string + '_linearized' appended. name : string, optional - Set the name of the linearized system. + Set the name of the linearized system. If not specified and if `copy` + is `False`, a generic name is generated with a unique + integer id. Returns ------- diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index d96eefd39..5c7faee6c 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -167,7 +167,22 @@ def test_nonlinear_iosys(self, tsys): np.testing.assert_array_almost_equal(lti_t, ios_t) np.testing.assert_allclose(lti_y, ios_y,atol=0.002,rtol=0.) - def test_linearize(self, tsys): + @pytest.fixture + def kincar(self): + # Create a simple nonlinear system to check (kinematic car) + def kincar_update(t, x, u, params): + return np.array([np.cos(x[2]) * u[0], np.sin(x[2]) * u[0], u[1]]) + + def kincar_output(t, x, u, params): + return np.array([x[0], x[1]]) + + return ios.NonlinearIOSystem( + kincar_update, kincar_output, + inputs = ['v', 'phi'], + outputs = ['x', 'y'], + states = ['x', 'y', 'theta']) + + def test_linearize(self, tsys, kincar): # Create a single input/single output linear system linsys = tsys.siso_linsys iosys = ios.LinearIOSystem(linsys) @@ -180,13 +195,7 @@ def test_linearize(self, tsys): np.testing.assert_array_almost_equal(linsys.D, linearized.D) # Create a simple nonlinear system to check (kinematic car) - def kincar_update(t, x, u, params): - return np.array([np.cos(x[2]) * u[0], np.sin(x[2]) * u[0], u[1]]) - - def kincar_output(t, x, u, params): - return np.array([x[0], x[1]]) - - iosys = ios.NonlinearIOSystem(kincar_update, kincar_output) + iosys = kincar linearized = iosys.linearize([0, 0, 0], [0, 0]) np.testing.assert_array_almost_equal(linearized.A, np.zeros((3,3))) np.testing.assert_array_almost_equal( @@ -196,6 +205,29 @@ def kincar_output(t, x, u, params): np.testing.assert_array_almost_equal(linearized.D, np.zeros((2,2))) + def test_linearize_named_signals(self, kincar): + # Full form of the call + linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True, + name='linearized') + assert linearized.name == 'linearized' + assert linearized.find_input('v') == 0 + assert linearized.find_input('phi') == 1 + assert linearized.find_output('x') == 0 + assert linearized.find_output('y') == 1 + assert linearized.find_state('x') == 0 + assert linearized.find_state('y') == 1 + assert linearized.find_state('theta') == 2 + + # If we copy signal names w/out a system name, append '_linearized' + linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True) + assert linearized.name == kincar.name + '_linearized' + + # If copy is False, signal names should not be copied + lin_nocopy = kincar.linearize(0, 0, copy=False) + assert lin_nocopy.find_input('v') is None + assert lin_nocopy.find_output('x') is None + assert lin_nocopy.find_state('x') is None + @noscipy0 def test_connect(self, tsys): # Define a couple of (linear) systems to interconnection From 605ddd3ade0486ed810348249aa4672f1e201b43 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 1 Jan 2021 14:14:05 -0800 Subject: [PATCH 04/18] add keyword to geterate strictly proper rss systems --- control/statesp.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/control/statesp.py b/control/statesp.py index 288b0831d..b41f18c7d 100644 --- a/control/statesp.py +++ b/control/statesp.py @@ -1264,7 +1264,7 @@ def _convertToStateSpace(sys, **kw): # TODO: add discrete time option -def _rss_generate(states, inputs, outputs, type): +def _rss_generate(states, inputs, outputs, type, strictly_proper=False): """Generate a random state space. This does the actual random state space generation expected from rss and @@ -1374,7 +1374,7 @@ def _rss_generate(states, inputs, outputs, type): # Apply masks. B = B * Bmask C = C * Cmask - D = D * Dmask + D = D * Dmask if not strictly_proper else zeros(D.shape) return StateSpace(A, B, C, D) @@ -1649,7 +1649,7 @@ def tf2ss(*args): raise ValueError("Needs 1 or 2 arguments; received %i." % len(args)) -def rss(states=1, outputs=1, inputs=1): +def rss(states=1, outputs=1, inputs=1, strictly_proper=False): """ Create a stable *continuous* random state space object. @@ -1661,6 +1661,9 @@ def rss(states=1, outputs=1, inputs=1): Number of system inputs outputs : integer Number of system outputs + strictly_proper : bool, optional + If set to 'True', returns a proper system (no direct term). Default + value is 'False'. Returns ------- @@ -1684,10 +1687,11 @@ def rss(states=1, outputs=1, inputs=1): """ - return _rss_generate(states, inputs, outputs, 'c') + return _rss_generate(states, inputs, outputs, 'c', + strictly_proper=strictly_proper) -def drss(states=1, outputs=1, inputs=1): +def drss(states=1, outputs=1, inputs=1, strictly_proper=False): """ Create a stable *discrete* random state space object. @@ -1722,7 +1726,8 @@ def drss(states=1, outputs=1, inputs=1): """ - return _rss_generate(states, inputs, outputs, 'd') + return _rss_generate(states, inputs, outputs, 'd', + strictly_proper=strictly_proper) def ssdata(sys): From 1f2f01bf7c44f207d26b2d0e1fadbcb6a910cff5 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 1 Jan 2021 14:18:01 -0800 Subject: [PATCH 05/18] update docs, tests for InterconnectedSystem + small refactoring of gain parsing --- control/iosys.py | 162 +++++++++++++++++--------- control/tests/iosys_test.py | 203 +++++++++++++++++++++++++++------ examples/cruise-control.py | 14 +-- examples/steering-gainsched.py | 14 +-- 4 files changed, 289 insertions(+), 104 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 67b618a26..2324cc838 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -214,7 +214,6 @@ def __mul__(sys2, sys1): elif isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): # Special case: maintain linear systems structure new_ss_sys = StateSpace.__mul__(sys2, sys1) - # TODO: set input and output names new_io_sys = LinearIOSystem(new_ss_sys) return new_io_sys @@ -825,60 +824,70 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], Parameters ---------- - syslist : array_like of InputOutputSystems + syslist : list of InputOutputSystems The list of input/output systems to be connected - connections : list of tuple of connection specifications, optional - Description of the internal connections between the subsystems. + connections : list of connections, optional + Description of the internal connections between the subsystems: [connection1, connection2, ...] - Each connection is a tuple that describes an input to one of the - subsystems. The entries are of the form: + Each connection is itself a list that describes an input to one of + the subsystems. The entries are of the form: - (input-spec, output-spec1, output-spec2, ...) + [input-spec, output-spec1, output-spec2, ...] - The input-spec should be a tuple of the form `(subsys_i, inp_j)` + The input-spec can be in a number of different forms. The lowest + level representation is a tuple of the form `(subsys_i, inp_j)` where `subsys_i` is the index into `syslist` and `inp_j` is the index into the input vector for the subsystem. If `subsys_i` has a single input, then the subsystem index `subsys_i` can be listed as the input-spec. If systems and signals are given names, then the form 'sys.sig' or ('sys', 'sig') are also recognized. - Each output-spec should be a tuple of the form `(subsys_i, out_j, - gain)`. The input will be constructed by summing the listed - outputs after multiplying by the gain term. If the gain term is - omitted, it is assumed to be 1. If the system has a single - output, then the subsystem index `subsys_i` can be listed as the - input-spec. If systems and signals are given names, then the form - 'sys.sig', ('sys', 'sig') or ('sys', 'sig', gain) are also - recognized, and the special form '-sys.sig' can be used to specify - a signal with gain -1. + Similarly, each output-spec should describe an output signal from + one of the susystems. The lowest level representation is a tuple + of the form `(subsys_i, out_j, gain)`. The input will be + constructed by summing the listed outputs after multiplying by the + gain term. If the gain term is omitted, it is assumed to be 1. + If the system has a single output, then the subsystem index + `subsys_i` can be listed as the input-spec. If systems and + signals are given names, then the form 'sys.sig', ('sys', 'sig') + or ('sys', 'sig', gain) are also recognized, and the special form + '-sys.sig' can be used to specify a signal with gain -1. If omitted, the connection map (matrix) can be specified using the :func:`~control.InterconnectedSystem.set_connect_map` method. - inplist : List of tuple of input specifications, optional - List of specifications for how the inputs for the overall system + inplist : list of input connections, optional + List of connections for how the inputs for the overall system are mapped to the subsystem inputs. The input specification is - similar to the form defined in the connection specification, except - that connections do not specify an input-spec, since these are - the system inputs. The entries are thus of the form: + similar to the form defined in the connection specification, + except that connections do not specify an input-spec, since these + are the system inputs. The entries for a connection are thus of + the form: - (output-spec1, output-spec2, ...) + [input-spec1, input-spec2, ...] Each system input is added to the input for the listed subsystem. + If the system input connects to only one subsystem input, a single + input specification can be given (without the inner list). If omitted, the input map can be specified using the `set_input_map` method. - outlist : tuple of output specifications, optional - List of specifications for how the outputs for the subsystems are - mapped to overall system outputs. The output specification is the - same as the form defined in the inplist specification - (including the optional gain term). Numbered outputs must be - chosen from the list of subsystem outputs, but named outputs can - also be contained in the list of subsystem inputs. + outlist : list of output connections, optional + List of connections for how the outputs from the subsystems are + mapped to overall system outputs. The output connection + description is the same as the form defined in the inplist + specification (including the optional gain term). Numbered + outputs must be chosen from the list of subsystem outputs, but + named outputs can also be contained in the list of subsystem + inputs. + + If an output connection contains more than one signal + specification, then those signals are added together (multiplying + by the any gain term) to form the system output. If omitted, the output map can be specified using the `set_output_map` method. @@ -896,9 +905,10 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], Description of the system outputs. Same format as `inputs`. states : int, list of str, or None, optional - Description of the system states. Same format as `inputs`, except - the state names will be of the form '.', - for each subsys in syslist and each state_name of each subsys. + Description of the system states. Same format as `inputs`. The + default is `None`, in which case the states will be given names of + the form '.', for each subsys in syslist + and each state_name of each subsys. params : dict, optional Parameter values for the systems. Passed to the evaluation @@ -919,6 +929,29 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], System name (used for specifying signals). If unspecified, a generic name is generated with a unique integer id. + Example + ------- + P = control.LinearIOSystem( + ct.rss(2, 2, 2, strictly_proper=True), name='P') + C = control.LinearIOSystem(control.rss(2, 2, 2), name='C') + S = control.InterconnectedSystem( + [P, C], + connections = [ + ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[0]'], + ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], + inplist = ['C.u[0]', 'C.u[1]'], + outlist = ['P.y[0]', 'P.y[1]'], + ) + + Notes + ----- + It is possible to replace lists in most of arguments with tuples + instead, but strictly speaking the only use of tuples should be in the + specification of an input- or output-signal via the tuple notation + `(subsys_i, signal_j, gain)` (where `gain` is optional). If you get + an unexpected error message about a specification being of the wrong + type, check your use of tuples. + """ # Convert input and output names to lists if they aren't already if not isinstance(inplist, (list, tuple)): @@ -1006,24 +1039,40 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], input_index = self._parse_input_spec(connection[0]) for output_spec in connection[1:]: output_index, gain = self._parse_output_spec(output_spec) - self.connect_map[input_index, output_index] = gain + if self.connect_map[input_index, output_index] != 0: + warn("multiple connections given for input %d" % + input_index + ". Combining with previous entries.") + self.connect_map[input_index, output_index] += gain # Convert the input list to a matrix: maps system to subsystems self.input_map = np.zeros((ninputs, self.ninputs)) for index, inpspec in enumerate(inplist): if isinstance(inpspec, (int, str, tuple)): inpspec = [inpspec] + if not isinstance(inpspec, list): + raise ValueError("specifications in inplist must be of type " + "int, str, tuple or list.") for spec in inpspec: - self.input_map[self._parse_input_spec(spec), index] = 1 + ulist_index = self._parse_input_spec(spec) + if self.input_map[ulist_index, index] != 0: + warn("multiple connections given for input %d" % + index + ". Combining with previous entries.") + self.input_map[ulist_index, index] += 1 # Convert the output list to a matrix: maps subsystems to system self.output_map = np.zeros((self.noutputs, noutputs + ninputs)) for index, outspec in enumerate(outlist): if isinstance(outspec, (int, str, tuple)): outspec = [outspec] + if not isinstance(outspec, list): + raise ValueError("specifications in outlist must be of type " + "int, str, tuple or list.") for spec in outspec: ylist_index, gain = self._parse_output_spec(spec) - self.output_map[index, ylist_index] = gain + if self.output_map[index, ylist_index] != 0: + warn("multiple connections given for output %d" % + index + ". Combining with previous entries.") + self.output_map[index, ylist_index] += gain # Save the parameters for the system self.params = params.copy() @@ -1166,7 +1215,9 @@ def _parse_input_spec(self, spec): """ # Parse the signal that we received - subsys_index, input_index = self._parse_signal(spec, 'input') + subsys_index, input_index, gain = self._parse_signal(spec, 'input') + if gain != 1: + raise ValueError("gain not allowed in spec '%s'." % str(spec)) # Return the index into the input vector list (ylist) return self.input_offset[subsys_index] + input_index @@ -1195,27 +1246,18 @@ def _parse_output_spec(self, spec): the gain to use for that output. """ - gain = 1 # Default gain - - # Check for special forms of the input - if isinstance(spec, tuple) and len(spec) == 3: - gain = spec[2] - spec = spec[:2] - elif isinstance(spec, str) and spec[0] == '-': - gain = -1 - spec = spec[1:] - # Parse the rest of the spec with standard signal parsing routine try: # Start by looking in the set of subsystem outputs - subsys_index, output_index = self._parse_signal(spec, 'output') + subsys_index, output_index, gain = \ + self._parse_signal(spec, 'output') # Return the index into the input vector list (ylist) return self.output_offset[subsys_index] + output_index, gain except ValueError: # Try looking in the set of subsystem *inputs* - subsys_index, input_index = self._parse_signal( + subsys_index, input_index, gain = self._parse_signal( spec, 'input or output', dictname='input_index') # Return the index into the input vector list (ylist) @@ -1240,17 +1282,27 @@ def _parse_signal(self, spec, signame='input', dictname=None): """ import re + gain = 1 # Default gain + + # Check for special forms of the input + if isinstance(spec, tuple) and len(spec) == 3: + gain = spec[2] + spec = spec[:2] + elif isinstance(spec, str) and spec[0] == '-': + gain = -1 + spec = spec[1:] + # Process cases where we are given indices as integers if isinstance(spec, int): - return spec, 0 + return spec, 0, gain elif isinstance(spec, tuple) and len(spec) == 1 \ and isinstance(spec[0], int): - return spec[0], 0 + return spec[0], 0, gain elif isinstance(spec, tuple) and len(spec) == 2 \ and all([isinstance(index, int) for index in spec]): - return spec + return spec + (gain,) # Figure out the name of the dictionary to use if dictname is None: @@ -1276,7 +1328,7 @@ def _parse_signal(self, spec, signame='input', dictname=None): raise ValueError("Couldn't find %s signal '%s.%s'." % (signame, namelist[0], namelist[1])) - return system_index, signal_index + return system_index, signal_index, gain # Handle the ('sys', 'sig'), (i, j), and mixed cases elif isinstance(spec, tuple) and len(spec) == 2 and \ @@ -1289,7 +1341,7 @@ def _parse_signal(self, spec, signame='input', dictname=None): else: system_index = self._find_system(spec[0]) if system_index is None: - raise ValueError("Couldn't find system %s." % spec[0]) + raise ValueError("Couldn't find system '%s'." % spec[0]) if isinstance(spec[1], int): signal_index = spec[1] @@ -1302,7 +1354,7 @@ def _parse_signal(self, spec, signame='input', dictname=None): if signal_index is None: raise ValueError("Couldn't find signal %s.%s." % tuple(spec)) - return system_index, signal_index + return system_index, signal_index, gain else: raise ValueError("Couldn't parse signal reference %s." % str(spec)) diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 5c7faee6c..e70c95dd5 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -239,8 +239,8 @@ def test_connect(self, tsys): # Connect systems in different ways and compare to StateSpace linsys_series = linsys2 * linsys1 iosys_series = ios.InterconnectedSystem( - (iosys1, iosys2), # systems - ((1, 0),), # interconnection (series) + [iosys1, iosys2], # systems + [[1, 0]], # interconnection (series) 0, # input = first system 1 # output = second system ) @@ -259,8 +259,8 @@ def test_connect(self, tsys): linsys2c.dt = 0 # Reset the timebase iosys2c = ios.LinearIOSystem(linsys2c) iosys_series = ios.InterconnectedSystem( - (iosys1, iosys2c), # systems - ((1, 0),), # interconnection (series) + [iosys1, iosys2c], # systems + [[1, 0]], # interconnection (series) 0, # input = first system 1 # output = second system ) @@ -274,9 +274,9 @@ def test_connect(self, tsys): # Feedback interconnection linsys_feedback = ct.feedback(linsys1, linsys2) iosys_feedback = ios.InterconnectedSystem( - (iosys1, iosys2), # systems - ((1, 0), # input of sys2 = output of sys1 - (0, (1, 0, -1))), # input of sys1 = -output of sys2 + [iosys1, iosys2], # systems + [[1, 0], # input of sys2 = output of sys1 + [0, (1, 0, -1)]], # input of sys1 = -output of sys2 0, # input = first system 0 # output = first system ) @@ -286,6 +286,83 @@ def test_connect(self, tsys): np.testing.assert_array_almost_equal(lti_t, ios_t) np.testing.assert_allclose(lti_y, ios_y,atol=0.002,rtol=0.) + @pytest.mark.parametrize( + "connections, inplist, outlist", + [pytest.param([[(1, 0), (0, 0, 1)]], [[(0, 0, 1)]], [[(1, 0, 1)]], + id="full, raw tuple"), + pytest.param([[(1, 0), (0, 0, -1)]], [[(0, 0)]], [[(1, 0, -1)]], + id="full, raw tuple, canceling gains"), + pytest.param([[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], + id="full, raw tuple, no gain"), + pytest.param([[(1, 0), (0, 0)]], [(0, 0)], [(1, 0)], + id="full, raw tuple, no gain, no outer list"), + pytest.param([['sys2.u[0]', 'sys1.y[0]']], ['sys1.u[0]'], + ['sys2.y[0]'], id="named, full"), + pytest.param([['sys2.u[0]', '-sys1.y[0]']], ['sys1.u[0]'], + ['-sys2.y[0]'], id="named, full, caneling gains"), + pytest.param([['sys2.u[0]', 'sys1.y[0]']], 'sys1.u[0]', 'sys2.y[0]', + id="named, full, no list"), + pytest.param([['sys2.u[0]', ('sys1', 'y[0]')]], [(0, 0)], [(1,)], + id="mixed"), + pytest.param([[1, 0]], 0, 1, id="minimal")]) + def test_connect_spec_variants(self, tsys, connections, inplist, outlist): + # Define a couple of (linear) systems to interconnection + linsys1 = tsys.siso_linsys + iosys1 = ios.LinearIOSystem(linsys1, name="sys1") + linsys2 = tsys.siso_linsys + iosys2 = ios.LinearIOSystem(linsys2, name="sys2") + + # Simple series connection + linsys_series = linsys2 * linsys1 + + # Create a simulation run to compare against + T, U = tsys.T, tsys.U + X0 = np.concatenate((tsys.X0, tsys.X0)) + lti_t, lti_y, lti_x = ct.forced_response(linsys_series, T, U, X0) + + # Create the input/output system with different parameter variations + iosys_series = ios.InterconnectedSystem( + [iosys1, iosys2], connections, inplist, outlist) + ios_t, ios_y, ios_x = ios.input_output_response( + iosys_series, T, U, X0, return_x=True) + np.testing.assert_array_almost_equal(lti_t, ios_t) + np.testing.assert_allclose(lti_y, ios_y, atol=0.002, rtol=0.) + + @pytest.mark.parametrize( + "connections, inplist, outlist", + [pytest.param([['sys2.u[0]', 'sys1.y[0]']], + [[('sys1', 'u[0]'), ('sys1', 'u[0]')]], + [('sys2', 'y[0]', 0.5)], id="duplicated input"), + pytest.param([['sys2.u[0]', ('sys1', 'y[0]', 0.5)], + ['sys2.u[0]', ('sys1', 'y[0]', 0.5)]], + 'sys1.u[0]', 'sys2.y[0]', id="duplicated connection"), + pytest.param([['sys2.u[0]', 'sys1.y[0]']], 'sys1.u[0]', + [[('sys2', 'y[0]', 0.5), ('sys2', 'y[0]', 0.5)]], + id="duplicated output")]) + def test_connect_spec_warnings(self, tsys, connections, inplist, outlist): + # Define a couple of (linear) systems to interconnection + linsys1 = tsys.siso_linsys + iosys1 = ios.LinearIOSystem(linsys1, name="sys1") + linsys2 = tsys.siso_linsys + iosys2 = ios.LinearIOSystem(linsys2, name="sys2") + + # Simple series connection + linsys_series = linsys2 * linsys1 + + # Create a simulation run to compare against + T, U = tsys.T, tsys.U + X0 = np.concatenate((tsys.X0, tsys.X0)) + lti_t, lti_y, lti_x = ct.forced_response(linsys_series, T, U, X0) + + # Set up multiple gainst and make sure a warning is generated + with pytest.warns(UserWarning, match="multiple.*Combining"): + iosys_series = ios.InterconnectedSystem( + [iosys1, iosys2], connections, inplist, outlist) + ios_t, ios_y, ios_x = ios.input_output_response( + iosys_series, T, U, X0, return_x=True) + np.testing.assert_array_almost_equal(lti_t, ios_t) + np.testing.assert_allclose(lti_y, ios_y, atol=0.002, rtol=0.) + @noscipy0 def test_static_nonlinearity(self, tsys): # Linear dynamical system @@ -344,9 +421,9 @@ def test_algebraic_loop(self, tsys): # Nonlinear system in feeback loop with LTI system iosys = ios.InterconnectedSystem( - (lnios, nlios), # linear system w/ nonlinear feedback - ((1,), # feedback interconnection (sig to 0) - (0, (1, 0, -1))), + [lnios, nlios], # linear system w/ nonlinear feedback + [[1], # feedback interconnection (sig to 0) + [0, (1, 0, -1)]], 0, # input to linear system 0 # output from linear system ) @@ -356,9 +433,9 @@ def test_algebraic_loop(self, tsys): # Algebraic loop from static nonlinear system in feedback # (error will be due to no states) iosys = ios.InterconnectedSystem( - (nlios1, nlios2), # two copies of a static nonlinear system - ((0, 1), # feedback interconnection - (1, (0, 0, -1))), + [nlios1, nlios2], # two copies of a static nonlinear system + [[0, 1], # feedback interconnection + [1, (0, 0, -1)]], 0, 0 ) args = (iosys, T, U) @@ -370,9 +447,9 @@ def test_algebraic_loop(self, tsys): [[-1, 1], [0, -2]], [[0], [1]], [[1, 0]], [[1]]) lnios = ios.LinearIOSystem(linsys) iosys = ios.InterconnectedSystem( - (nlios, lnios), # linear system w/ nonlinear feedback - ((0, 1), # feedback interconnection - (1, (0, 0, -1))), + [nlios, lnios], # linear system w/ nonlinear feedback + [[0, 1], # feedback interconnection + [1, (0, 0, -1)]], 0, 0 ) args = (iosys, T, U, X0) @@ -758,7 +835,6 @@ def test_params(self, tsys): ios_t, ios_y = ios.input_output_response( iosys, T, U, X0, params={'something':0}) - # Check to make sure results are OK np.testing.assert_array_almost_equal(lti_t, ios_t) np.testing.assert_allclose(lti_y, ios_y,atol=0.002,rtol=0.) @@ -773,13 +849,13 @@ def test_named_signals(self, tsys): np.dot(tsys.mimo_linsys1.C, np.reshape(x, (-1, 1))) \ + np.dot(tsys.mimo_linsys1.D, np.reshape(u, (-1, 1))) ).reshape(-1,), - inputs = ('u[0]', 'u[1]'), - outputs = ('y[0]', 'y[1]'), + inputs = ['u[0]', 'u[1]'], + outputs = ['y[0]', 'y[1]'], states = tsys.mimo_linsys1.states, name = 'sys1') sys2 = ios.LinearIOSystem(tsys.mimo_linsys2, - inputs = ('u[0]', 'u[1]'), - outputs = ('y[0]', 'y[1]'), + inputs = ['u[0]', 'u[1]'], + outputs = ['y[0]', 'y[1]'], name = 'sys2') # Series interconnection (sys1 * sys2) using __mul__ @@ -802,13 +878,30 @@ def test_named_signals(self, tsys): # Series interconnection (sys1 * sys2) using named + mixed signals ios_connect = ios.InterconnectedSystem( + [sys2, sys1], + connections=[ + [('sys1', 'u[0]'), 'sys2.y[0]'], + ['sys1.u[1]', 'sys2.y[1]'] + ], + inplist=['sys2.u[0]', ('sys2', 1)], + outlist=[(1, 'y[0]'), 'sys1.y[1]'] + ) + lin_series = ct.linearize(ios_connect, 0, 0) + np.testing.assert_array_almost_equal(ss_series.A, lin_series.A) + np.testing.assert_array_almost_equal(ss_series.B, lin_series.B) + np.testing.assert_array_almost_equal(ss_series.C, lin_series.C) + np.testing.assert_array_almost_equal(ss_series.D, lin_series.D) + + # Try the same thing using the interconnect function + # Since sys1 is nonlinear, we should get back the same result + ios_connect = ios.interconnect( (sys2, sys1), connections=( - (('sys1', 'u[0]'), 'sys2.y[0]'), - ('sys1.u[1]', 'sys2.y[1]') + [('sys1', 'u[0]'), 'sys2.y[0]'], + ['sys1.u[1]', 'sys2.y[1]'] ), - inplist=('sys2.u[0]', ('sys2', 1)), - outlist=((1, 'y[0]'), 'sys1.y[1]') + inplist=['sys2.u[0]', ('sys2', 1)], + outlist=[(1, 'y[0]'), 'sys1.y[1]'] ) lin_series = ct.linearize(ios_connect, 0, 0) np.testing.assert_array_almost_equal(ss_series.A, lin_series.A) @@ -816,15 +909,33 @@ def test_named_signals(self, tsys): np.testing.assert_array_almost_equal(ss_series.C, lin_series.C) np.testing.assert_array_almost_equal(ss_series.D, lin_series.D) - # Make sure that we can use input signal names as system outputs - ios_connect = ios.InterconnectedSystem( - (sys1, sys2), + # Try the same thing using the interconnect function + # Since sys1 is nonlinear, we should get back the same result + # Note: use a tuple for connections to make sure it works + ios_connect = ios.interconnect( + (sys2, sys1), connections=( - ('sys2.u[0]', 'sys1.y[0]'), ('sys2.u[1]', 'sys1.y[1]'), - ('sys1.u[0]', '-sys2.y[0]'), ('sys1.u[1]', '-sys2.y[1]') + [('sys1', 'u[0]'), 'sys2.y[0]'], + ['sys1.u[1]', 'sys2.y[1]'] ), - inplist=('sys1.u[0]', 'sys1.u[1]'), - outlist=('sys2.u[0]', 'sys2.u[1]') # = sys1.y[0], sys1.y[1] + inplist=['sys2.u[0]', ('sys2', 1)], + outlist=[(1, 'y[0]'), 'sys1.y[1]'] + ) + lin_series = ct.linearize(ios_connect, 0, 0) + np.testing.assert_array_almost_equal(ss_series.A, lin_series.A) + np.testing.assert_array_almost_equal(ss_series.B, lin_series.B) + np.testing.assert_array_almost_equal(ss_series.C, lin_series.C) + np.testing.assert_array_almost_equal(ss_series.D, lin_series.D) + + # Make sure that we can use input signal names as system outputs + ios_connect = ios.InterconnectedSystem( + [sys1, sys2], + connections=[ + ['sys2.u[0]', 'sys1.y[0]'], ['sys2.u[1]', 'sys1.y[1]'], + ['sys1.u[0]', '-sys2.y[0]'], ['sys1.u[1]', '-sys2.y[1]'] + ], + inplist=['sys1.u[0]', 'sys1.u[1]'], + outlist=['sys2.u[0]', 'sys2.u[1]'] # = sys1.y[0], sys1.y[1] ) ss_feedback = ct.feedback(tsys.mimo_linsys1, tsys.mimo_linsys2) lin_feedback = ct.linearize(ios_connect, 0, 0) @@ -1047,6 +1158,29 @@ def test_lineariosys_statespace(self, tsys): np.testing.assert_array_equal(io_feedback.C, ss_feedback.C) np.testing.assert_array_equal(io_feedback.D, ss_feedback.D) + def test_docstring_example(self): + P = ct.LinearIOSystem( + ct.rss(2, 2, 2, strictly_proper=True), name='P') + C = ct.LinearIOSystem(ct.rss(2, 2, 2), name='C') + S = ct.InterconnectedSystem( + [C, P], + connections = [ + ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[1]'], + ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], + inplist = ['C.u[0]', 'C.u[1]'], + outlist = ['P.y[0]', 'P.y[1]'], + ) + ss_P = ct.StateSpace(P.linearize(0, 0)) + ss_C = ct.StateSpace(C.linearize(0, 0)) + ss_eye = ct.StateSpace( + [], np.zeros((0, 2)), np.zeros((2, 0)), np.eye(2)) + ss_S = ct.feedback(ss_P * ss_C, ss_eye) + io_S = S.linearize(0, 0) + np.testing.assert_array_almost_equal(io_S.A, ss_S.A) + np.testing.assert_array_almost_equal(io_S.B, ss_S.B) + np.testing.assert_array_almost_equal(io_S.C, ss_S.C) + np.testing.assert_array_almost_equal(io_S.D, ss_S.D) + def test_duplicates(self, tsys): nlios = ios.NonlinearIOSystem(lambda t, x, u, params: x, lambda t, x, u, params: u * u, @@ -1075,7 +1209,7 @@ def test_duplicates(self, tsys): inputs=1, outputs=1, name="sys") with pytest.warns(UserWarning, match="Duplicate name"): - ct.InterconnectedSystem((nlios1, iosys_siso, nlios2), + ct.InterconnectedSystem([nlios1, iosys_siso, nlios2], inputs=0, outputs=0, states=0) # Same system, different names => everything should be OK @@ -1086,7 +1220,7 @@ def test_duplicates(self, tsys): lambda t, x, u, params: u * u, inputs=1, outputs=1, name="nlios2") with pytest.warns(None) as record: - ct.InterconnectedSystem((nlios1, iosys_siso, nlios2), + ct.InterconnectedSystem([nlios1, iosys_siso, nlios2], inputs=0, outputs=0, states=0) if record: pytest.fail("Warning not expected: " + record[0].message) @@ -1141,7 +1275,6 @@ def pvtol_full(t, x, u, params={}): ]) - def secord_update(t, x, u, params={}): """Second order system dynamics""" omega0 = params.get('omega0', 1.) diff --git a/examples/cruise-control.py b/examples/cruise-control.py index 8e59c79c7..505b4071c 100644 --- a/examples/cruise-control.py +++ b/examples/cruise-control.py @@ -141,8 +141,8 @@ def motor_torque(omega, params={}): cruise_tf = ct.InterconnectedSystem( (control_tf, vehicle), name='cruise', connections = ( - ('control.u', '-vehicle.v'), - ('vehicle.u', 'control.y')), + ['control.u', '-vehicle.v'], + ['vehicle.u', 'control.y']), inplist = ('control.u', 'vehicle.gear', 'vehicle.theta'), inputs = ('vref', 'gear', 'theta'), outlist = ('vehicle.v', 'vehicle.u'), @@ -279,8 +279,8 @@ def pi_output(t, x, u, params={}): cruise_pi = ct.InterconnectedSystem( (vehicle, control_pi), name='cruise', connections=( - ('vehicle.u', 'control.u'), - ('control.v', 'vehicle.v')), + ['vehicle.u', 'control.u'], + ['control.v', 'vehicle.v']), inplist=('control.vref', 'vehicle.gear', 'vehicle.theta'), outlist=('control.u', 'vehicle.v'), outputs=['u', 'v']) @@ -404,9 +404,9 @@ def sf_output(t, z, u, params={}): cruise_sf = ct.InterconnectedSystem( (vehicle, control_sf), name='cruise', connections=( - ('vehicle.u', 'control.u'), - ('control.x', 'vehicle.v'), - ('control.y', 'vehicle.v')), + ['vehicle.u', 'control.u'], + ['control.x', 'vehicle.v'], + ['control.y', 'vehicle.v']), inplist=('control.r', 'vehicle.gear', 'vehicle.theta'), outlist=('control.u', 'vehicle.v'), outputs=['u', 'v']) diff --git a/examples/steering-gainsched.py b/examples/steering-gainsched.py index 8f541ead8..7db2d9a73 100644 --- a/examples/steering-gainsched.py +++ b/examples/steering-gainsched.py @@ -143,13 +143,13 @@ def trajgen_output(t, x, u, params): # Interconnections between subsystems connections=( - ('controller.ex', 'trajgen.xd', '-vehicle.x'), - ('controller.ey', 'trajgen.yd', '-vehicle.y'), - ('controller.etheta', 'trajgen.thetad', '-vehicle.theta'), - ('controller.vd', 'trajgen.vd'), - ('controller.phid', 'trajgen.phid'), - ('vehicle.v', 'controller.v'), - ('vehicle.phi', 'controller.phi') + ['controller.ex', 'trajgen.xd', '-vehicle.x'], + ['controller.ey', 'trajgen.yd', '-vehicle.y'], + ['controller.etheta', 'trajgen.thetad', '-vehicle.theta'], + ['controller.vd', 'trajgen.vd'], + ['controller.phid', 'trajgen.phid'], + ['vehicle.v', 'controller.v'], + ['vehicle.phi', 'controller.phi'] ), # System inputs From cb136e820d100304f07a735af79df1b335ec7d13 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 1 Jan 2021 23:57:17 -0800 Subject: [PATCH 06/18] create LinearInterconnectedSystems for interconnections of linear systems --- control/config.py | 6 + control/iosys.py | 467 +++++++++++++++++++++--------------- control/tests/iosys_test.py | 109 ++++++++- 3 files changed, 381 insertions(+), 201 deletions(-) diff --git a/control/config.py b/control/config.py index 4d4512af7..2e2da14fc 100644 --- a/control/config.py +++ b/control/config.py @@ -215,7 +215,13 @@ def use_legacy_defaults(version): if major == 0 and minor < 9: # switched to 'array' as default for state space objects set_defaults('statesp', use_numpy_matrix=True) + # switched to 0 (=continuous) as default timestep set_defaults('control', default_dt=None) + # changed iosys naming conventions + set_defaults('iosys', state_name_delim='.', + duplicate_system_name_prefix='copy of ', + duplicate_system_name_suffix='') + return (major, minor, patch) diff --git a/control/iosys.py b/control/iosys.py index 2324cc838..009e4565f 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -43,10 +43,14 @@ __all__ = ['InputOutputSystem', 'LinearIOSystem', 'NonlinearIOSystem', 'InterconnectedSystem', 'input_output_response', 'find_eqpt', - 'linearize', 'ss2io', 'tf2io'] + 'linearize', 'ss2io', 'tf2io', 'interconnect'] # Define module default parameter values -_iosys_defaults = {} +_iosys_defaults = { + 'iosys.state_name_delim': '_', + 'iosys.duplicate_system_name_prefix': '', + 'iosys.duplicate_system_name_suffix': '$copy' +} class InputOutputSystem(object): @@ -208,15 +212,11 @@ def __mul__(sys2, sys1): if isinstance(sys1, (int, float, np.number)): # TODO: Scale the output raise NotImplemented("Scalar multiplication not yet implemented") + elif isinstance(sys1, np.ndarray): # TODO: Post-multiply by a matrix raise NotImplemented("Matrix multiplication not yet implemented") - elif isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): - # Special case: maintain linear systems structure - new_ss_sys = StateSpace.__mul__(sys2, sys1) - new_io_sys = LinearIOSystem(new_ss_sys) - return new_io_sys elif not isinstance(sys1, InputOutputSystem): raise ValueError("Unknown I/O system object ", sys1) @@ -228,13 +228,13 @@ def __mul__(sys2, sys1): # Make sure timebase are compatible dt = common_timebase(sys1.dt, sys2.dt) + # Create a new system to handle the composition inplist = [(0, i) for i in range(sys1.ninputs)] outlist = [(1, i) for i in range(sys2.noutputs)] - # Return the series interconnection between the systems newsys = InterconnectedSystem( (sys1, sys2), inplist=inplist, outlist=outlist) - # Set up the connection map manually + # Set up the connection map manually newsys.set_connect_map(np.block( [[np.zeros((sys1.ninputs, sys1.noutputs)), np.zeros((sys1.ninputs, sys2.noutputs))], @@ -242,7 +242,12 @@ def __mul__(sys2, sys1): np.zeros((sys2.ninputs, sys2.noutputs))]] )) - # Return the newly created system + # If both systems are linear, create LinearInterconnectedSystem + if isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): + ss_sys = StateSpace.__mul__(sys2, sys1) + return LinearInterconnectedSystem(newsys, ss_sys) + + # Return the newly created InterconnectedSystem return newsys def __rmul__(sys1, sys2): @@ -250,34 +255,31 @@ def __rmul__(sys1, sys2): if isinstance(sys2, (int, float, np.number)): # TODO: Scale the output raise NotImplemented("Scalar multiplication not yet implemented") + elif isinstance(sys2, np.ndarray): # TODO: Post-multiply by a matrix raise NotImplemented("Matrix multiplication not yet implemented") - elif isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): - # Special case: maintain linear systems structure - new_ss_sys = StateSpace.__rmul__(sys1, sys2) - # TODO: set input and output names - new_io_sys = LinearIOSystem(new_ss_sys) - return new_io_sys elif not isinstance(sys2, InputOutputSystem): raise ValueError("Unknown I/O system object ", sys1) + else: - # Both systetms are InputOutputSystems => use __mul__ + # Both systems are InputOutputSystems => use __mul__ return InputOutputSystem.__mul__(sys2, sys1) def __add__(sys1, sys2): """Add two input/output systems (parallel interconnection)""" # TODO: Allow addition of scalars and matrices - if not isinstance(sys2, InputOutputSystem): - raise ValueError("Unknown I/O system object ", sys2) - elif isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): - # Special case: maintain linear systems structure - new_ss_sys = StateSpace.__add__(sys1, sys2) - # TODO: set input and output names - new_io_sys = LinearIOSystem(new_ss_sys) + if isinstance(sys2, (int, float, np.number)): + # TODO: Scale the output + raise NotImplemented("Scalar addition not yet implemented") - return new_io_sys + elif isinstance(sys2, np.ndarray): + # TODO: Post-multiply by a matrix + raise NotImplemented("Matrix addition not yet implemented") + + elif not isinstance(sys2, InputOutputSystem): + raise ValueError("Unknown I/O system object ", sys2) # Make sure number of input and outputs match if sys1.ninputs != sys2.ninputs or sys1.noutputs != sys2.noutputs: @@ -286,26 +288,24 @@ def __add__(sys1, sys2): ninputs = sys1.ninputs noutputs = sys1.noutputs + # Create a new system to handle the composition inplist = [[(0, i), (1, i)] for i in range(ninputs)] outlist = [[(0, i), (1, i)] for i in range(noutputs)] - # Create a new system to handle the composition newsys = InterconnectedSystem( (sys1, sys2), inplist=inplist, outlist=outlist) - # Return the newly created system + # If both systems are linear, create LinearInterconnectedSystem + if isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): + ss_sys = StateSpace.__add__(sys2, sys1) + return LinearInterconnectedSystem(newsys, ss_sys) + + # Return the newly created InterconnectedSystem return newsys # TODO: add __radd__ to allow postaddition by scalars and matrices def __neg__(sys): """Negate an input/output systems (rescale)""" - if isinstance(sys, StateSpace): - # Special case: maintain linear systems structure - new_ss_sys = StateSpace.__neg__(sys) - # TODO: set input and output names - new_io_sys = LinearIOSystem(new_ss_sys) - - return new_io_sys if sys.ninputs is None or sys.noutputs is None: raise ValueError("Can't determine number of inputs or outputs") @@ -315,6 +315,11 @@ def __neg__(sys): newsys = InterconnectedSystem( (sys,), dt=sys.dt, inplist=inplist, outlist=outlist) + # If the system is linear, create LinearInterconnectedSystem + if isinstance(sys, StateSpace): + ss_sys = StateSpace.__neg__(sys) + return LinearInterconnectedSystem(newsys, ss_sys) + # Return the newly created system return newsys @@ -467,11 +472,6 @@ def feedback(self, other=1, sign=-1, params={}): # TODO: add conversion to I/O system when needed if not isinstance(other, InputOutputSystem): raise TypeError("Feedback around I/O system must be I/O system.") - elif isinstance(self, StateSpace) and isinstance(other, StateSpace): - # Special case: maintain linear systems structure - new_ss_sys = StateSpace.feedback(self, other, sign=sign) - # TODO: set input and output names - new_io_sys = LinearIOSystem(new_ss_sys) return new_io_sys @@ -499,6 +499,11 @@ def feedback(self, other=1, sign=-1, params={}): np.zeros((other.ninputs, other.noutputs))]] )) + if isinstance(self, StateSpace) and isinstance(other, StateSpace): + # Special case: maintain linear systems structure + ss_sys = StateSpace.feedback(self, other, sign=sign) + return LinearInterconnectedSystem(newsys, ss_sys) + # Return the newly created system return newsys @@ -577,9 +582,11 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, def copy(self, newname=None): """Make a copy of an input/output system.""" + dup_prefix = config.defaults['iosys.duplicate_system_name_prefix'] + dup_suffix = config.defaults['iosys.duplicate_system_name_suffix'] newsys = copy.copy(self) newsys.name = self.name_or_default( - "copy of " + self.name if not newname else newname) + dup_prefix + self.name + dup_suffix if not newname else newname) return newsys @@ -822,135 +829,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], inputs to other subsystems. The overall system inputs and outputs can be any subset of subsystem inputs and outputs. - Parameters - ---------- - syslist : list of InputOutputSystems - The list of input/output systems to be connected - - connections : list of connections, optional - Description of the internal connections between the subsystems: - - [connection1, connection2, ...] - - Each connection is itself a list that describes an input to one of - the subsystems. The entries are of the form: - - [input-spec, output-spec1, output-spec2, ...] - - The input-spec can be in a number of different forms. The lowest - level representation is a tuple of the form `(subsys_i, inp_j)` - where `subsys_i` is the index into `syslist` and `inp_j` is the - index into the input vector for the subsystem. If `subsys_i` has - a single input, then the subsystem index `subsys_i` can be listed - as the input-spec. If systems and signals are given names, then - the form 'sys.sig' or ('sys', 'sig') are also recognized. - - Similarly, each output-spec should describe an output signal from - one of the susystems. The lowest level representation is a tuple - of the form `(subsys_i, out_j, gain)`. The input will be - constructed by summing the listed outputs after multiplying by the - gain term. If the gain term is omitted, it is assumed to be 1. - If the system has a single output, then the subsystem index - `subsys_i` can be listed as the input-spec. If systems and - signals are given names, then the form 'sys.sig', ('sys', 'sig') - or ('sys', 'sig', gain) are also recognized, and the special form - '-sys.sig' can be used to specify a signal with gain -1. - - If omitted, the connection map (matrix) can be specified using the - :func:`~control.InterconnectedSystem.set_connect_map` method. - - inplist : list of input connections, optional - List of connections for how the inputs for the overall system - are mapped to the subsystem inputs. The input specification is - similar to the form defined in the connection specification, - except that connections do not specify an input-spec, since these - are the system inputs. The entries for a connection are thus of - the form: - - [input-spec1, input-spec2, ...] - - Each system input is added to the input for the listed subsystem. - If the system input connects to only one subsystem input, a single - input specification can be given (without the inner list). - - If omitted, the input map can be specified using the - `set_input_map` method. - - outlist : list of output connections, optional - List of connections for how the outputs from the subsystems are - mapped to overall system outputs. The output connection - description is the same as the form defined in the inplist - specification (including the optional gain term). Numbered - outputs must be chosen from the list of subsystem outputs, but - named outputs can also be contained in the list of subsystem - inputs. - - If an output connection contains more than one signal - specification, then those signals are added together (multiplying - by the any gain term) to form the system output. - - If omitted, the output map can be specified using the - `set_output_map` method. - - inputs : int, list of str or None, optional - Description of the system inputs. This can be given as an integer - count or as a list of strings that name the individual signals. - If an integer count is specified, the names of the signal will be - of the form `s[i]` (where `s` is one of `u`, `y`, or `x`). If - this parameter is not given or given as `None`, the relevant - quantity will be determined when possible based on other - information provided to functions using the system. - - outputs : int, list of str or None, optional - Description of the system outputs. Same format as `inputs`. - - states : int, list of str, or None, optional - Description of the system states. Same format as `inputs`. The - default is `None`, in which case the states will be given names of - the form '.', for each subsys in syslist - and each state_name of each subsys. - - params : dict, optional - Parameter values for the systems. Passed to the evaluation - functions for the system as default values, overriding internal - defaults. - - dt : timebase, optional - The timebase for the system, used to specify whether the system is - operating in continuous or discrete time. It can have the - following values: - - * dt = 0: continuous time system (default) - * dt > 0: discrete time system with sampling period 'dt' - * dt = True: discrete time with unspecified sampling period - * dt = None: no timebase specified - - name : string, optional - System name (used for specifying signals). If unspecified, a - generic name is generated with a unique integer id. - - Example - ------- - P = control.LinearIOSystem( - ct.rss(2, 2, 2, strictly_proper=True), name='P') - C = control.LinearIOSystem(control.rss(2, 2, 2), name='C') - S = control.InterconnectedSystem( - [P, C], - connections = [ - ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[0]'], - ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], - inplist = ['C.u[0]', 'C.u[1]'], - outlist = ['P.y[0]', 'P.y[1]'], - ) - - Notes - ----- - It is possible to replace lists in most of arguments with tuples - instead, but strictly speaking the only use of tuples should be in the - specification of an input- or output-signal via the tuple notation - `(subsys_i, signal_j, gain)` (where `gain` is optional). If you get - an unexpected error message about a specification being of the wrong - type, check your use of tuples. + See :func:`~control.interconnect` for a list of parameters. """ # Convert input and output names to lists if they aren't already @@ -996,7 +875,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], if sys in sysobj_name_dct: sys = sys.copy() warn("Duplicate object found in system list: %s. " - "Making a copy" % str(sys)) + "Making a copy" % str(sys.name)) if sys.name is not None and sys.name in sysname_count_dct: count = sysname_count_dct[sys.name] sysname_count_dct[sys.name] += 1 @@ -1012,8 +891,9 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], if states is None: states = [] + state_name_delim = config.defaults['iosys.state_name_delim'] for sys, sysname in sysobj_name_dct.items(): - states += [sysname + '.' + + states += [sysname + state_name_delim + statename for statename in sys.state_index.keys()] # Create the I/O system @@ -1077,26 +957,6 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Save the parameters for the system self.params = params.copy() - def __add__(self, sys): - # TODO: implement special processing to maintain flat structure - return super(InterconnectedSystem, self).__add__(sys) - - def __radd__(self, sys): - # TODO: implement special processing to maintain flat structure - return super(InterconnectedSystem, self).__radd__(sys) - - def __mul__(self, sys): - # TODO: implement special processing to maintain flat structure - return super(InterconnectedSystem, self).__mul__(sys) - - def __rmul__(self, sys): - # TODO: implement special processing to maintain flat structure - return super(InterconnectedSystem, self).__rmul__(sys) - - def __neg__(self): - # TODO: implement special processing to maintain flat structure - return super(InterconnectedSystem, self).__neg__() - def _update_params(self, params, warning=False): for sys in self.syslist: local = sys.params.copy() # start with system parameters @@ -1424,6 +1284,64 @@ def set_output_map(self, output_map): self.noutputs = output_map.shape[0] +class LinearInterconnectedSystem(InterconnectedSystem, LinearIOSystem): + """Interconnection of a set of linear input/output systems. + + This class is used to implement a system that is an interconnection of + linear input/output systems. It has all of the structure of an + :class:`InterconnectedSystem`, but also maintains the requirement elements + of :class:`LinearIOSystem`, including the :class:`StateSpace` class + structure, allowing it to be passed to functions that expect a + :class:`StateSpace` system. + """ + + def __init__(self, io_sys, ss_sys=None): + if not isinstance(io_sys, InterconnectedSystem): + raise TypeError("First argument must be an interconnected system.") + + # Create the I/O system object + InputOutputSystem.__init__( + self, name=io_sys.name, params=io_sys.params) + + # Copy over the I/O systems attributes + self.syslist = io_sys.syslist + self.ninputs = io_sys.ninputs + self.noutputs = io_sys.noutputs + self.nstates = io_sys.nstates + self.input_index = io_sys.input_index + self.output_index = io_sys.output_index + self.state_index = io_sys.state_index + self.dt = io_sys.dt + + # Copy over the attributes from the interconnected system + self.syslist_index = io_sys.syslist_index + self.state_offset = io_sys.state_offset + self.input_offset = io_sys.input_offset + self.output_offset = io_sys.output_offset + self.connect_map = io_sys.connect_map + self.input_map = io_sys.input_map + self.output_map = io_sys.output_map + self.params = io_sys.params + + # If we didnt' get a state space system, linearize the full system + # TODO: this could be replaced with a direct computation (someday) + if ss_sys is None: + ss_sys = self.linearize(0, 0) + + # Initialize the state space attributes + if isinstance(ss_sys, StateSpace): + # Make sure the dimension match + if io_sys.ninputs != ss_sys.inputs or \ + io_sys.noutputs != ss_sys.outputs or \ + io_sys.nstates != ss_sys.states: + raise ValueError("System dimensions for first and second " + "arguments must match.") + StateSpace.__init__(self, ss_sys, remove_useless=False) + + else: + raise TypeError("Second argument must be a state space system.") + + def input_output_response(sys, T, U=0., X0=0, params={}, method='RK45', return_x=False, squeeze=True): @@ -1898,17 +1816,176 @@ def _find_size(sysval, vecval): # Convert a state space system into an input/output system (wrapper) -def ss2io(*args, **kw): - return LinearIOSystem(*args, **kw) +def ss2io(*args, **kwargs): + return LinearIOSystem(*args, **kwargs) ss2io.__doc__ = LinearIOSystem.__init__.__doc__ # Convert a transfer function into an input/output system (wrapper) -def tf2io(*args, **kw): +def tf2io(*args, **kwargs): """Convert a transfer function into an I/O system""" # TODO: add remaining documentation # Convert the system to a state space system linsys = tf2ss(*args) # Now convert the state space system to an I/O system - return LinearIOSystem(linsys, **kw) + return LinearIOSystem(linsys, **kwargs) + + +# Function to create an interconnected system +def interconnect(syslist, connections=[], inplist=[], outlist=[], + inputs=None, outputs=None, states=None, + params={}, dt=None, name=None): + """Interconnect a set of input/output systems. + + This function creates a new system that is an interconnection of a set of + input/output systems. If all of the input systems are linear I/O systems + (type `LinearIOSystem`) then the resulting system will be a linear + interconnected I/O system (type `LinearInterconnectedSystem`) with the + appropriate inputs, outputs, and states. Otherwise, an interconnected I/O + system (type `InterconnectedSystem`) will be created. + + Parameters + ---------- + syslist : list of InputOutputSystems + The list of input/output systems to be connected + + connections : list of connections, optional + Description of the internal connections between the subsystems: + + [connection1, connection2, ...] + + Each connection is itself a list that describes an input to one of the + subsystems. The entries are of the form: + + [input-spec, output-spec1, output-spec2, ...] + + The input-spec can be in a number of different forms. The lowest + level representation is a tuple of the form `(subsys_i, inp_j)` where + `subsys_i` is the index into `syslist` and `inp_j` is the index into + the input vector for the subsystem. If `subsys_i` has a single input, + then the subsystem index `subsys_i` can be listed as the input-spec. + If systems and signals are given names, then the form 'sys.sig' or + ('sys', 'sig') are also recognized. + + Similarly, each output-spec should describe an output signal from one + of the susystems. The lowest level representation is a tuple of the + form `(subsys_i, out_j, gain)`. The input will be constructed by + summing the listed outputs after multiplying by the gain term. If the + gain term is omitted, it is assumed to be 1. If the system has a + single output, then the subsystem index `subsys_i` can be listed as + the input-spec. If systems and signals are given names, then the form + 'sys.sig', ('sys', 'sig') or ('sys', 'sig', gain) are also recognized, + and the special form '-sys.sig' can be used to specify a signal with + gain -1. + + If omitted, the connection map (matrix) can be specified using the + :func:`~control.InterconnectedSystem.set_connect_map` method. + + inplist : list of input connections, optional + List of connections for how the inputs for the overall system are + mapped to the subsystem inputs. The input specification is similar to + the form defined in the connection specification, except that + connections do not specify an input-spec, since these are the system + inputs. The entries for a connection are thus of the form: + + [input-spec1, input-spec2, ...] + + Each system input is added to the input for the listed subsystem. If + the system input connects to only one subsystem input, a single input + specification can be given (without the inner list). + + If omitted, the input map can be specified using the `set_input_map` + method. + + outlist : list of output connections, optional + List of connections for how the outputs from the subsystems are mapped + to overall system outputs. The output connection description is the + same as the form defined in the inplist specification (including the + optional gain term). Numbered outputs must be chosen from the list of + subsystem outputs, but named outputs can also be contained in the list + of subsystem inputs. + + If an output connection contains more than one signal specification, + then those signals are added together (multiplying by the any gain + term) to form the system output. + + If omitted, the output map can be specified using the `set_output_map` + method. + + inputs : int, list of str or None, optional + Description of the system inputs. This can be given as an integer + count or as a list of strings that name the individual signals. If an + integer count is specified, the names of the signal will be of the + form `s[i]` (where `s` is one of `u`, `y`, or `x`). If this parameter + is not given or given as `None`, the relevant quantity will be + determined when possible based on other information provided to + functions using the system. + + outputs : int, list of str or None, optional + Description of the system outputs. Same format as `inputs`. + + states : int, list of str, or None, optional + Description of the system states. Same format as `inputs`. The + default is `None`, in which case the states will be given names of the + form '.', for each subsys in syslist and each + state_name of each subsys. + + params : dict, optional + Parameter values for the systems. Passed to the evaluation functions + for the system as default values, overriding internal defaults. + + dt : timebase, optional + The timebase for the system, used to specify whether the system is + operating in continuous or discrete time. It can have the following + values: + + * dt = 0: continuous time system (default) + * dt > 0: discrete time system with sampling period 'dt' + * dt = True: discrete time with unspecified sampling period + * dt = None: no timebase specified + + name : string, optional + System name (used for specifying signals). If unspecified, a generic + name is generated with a unique integer id. + + Example + ------- + P = control.LinearIOSystem( + ct.rss(2, 2, 2, strictly_proper=True), name='P') + C = control.LinearIOSystem(control.rss(2, 2, 2), name='C') + S = control.InterconnectedSystem( + [P, C], + connections = [ + ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[0]'], + ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], + inplist = ['C.u[0]', 'C.u[1]'], + outlist = ['P.y[0]', 'P.y[1]'], + ) + + Notes + ----- + It is possible to replace lists in most of arguments with tuples instead, + but strictly speaking the only use of tuples should be in the + specification of an input- or output-signal via the tuple notation + `(subsys_i, signal_j, gain)` (where `gain` is optional). If you get an + unexpected error message about a specification being of the wrong type, + check your use of tuples. + + In addition to its use for general nonlinear I/O systems, the + `interconnect` function allows linear systems to be interconnected using + named signals (compared with the `connect` function, which uses signal + indicies) and to be treated as both a `StateSpace` system as well as an + `InputOutputSystem`. + + """ + newsys = InterconnectedSystem( + syslist, connections=connections, inplist=inplist, outlist=outlist, + inputs=inputs, outputs=outputs, states=states, + params=params, dt=dt, name=name) + + # If all subsystems are linear systems, maintain linear structure + if all([isinstance(sys, LinearIOSystem) for sys in syslist]): + return LinearInterconnectedSystem(newsys, None) + + return newsys diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index e70c95dd5..dea816deb 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -461,10 +461,11 @@ def test_algebraic_loop(self, tsys): def test_summer(self, tsys): # Construct a MIMO system for testing linsys = tsys.mimo_linsys1 - linio = ios.LinearIOSystem(linsys) + linio1 = ios.LinearIOSystem(linsys) + linio2 = ios.LinearIOSystem(linsys) linsys_parallel = linsys + linsys - iosys_parallel = linio + linio + iosys_parallel = linio1 + linio2 # Set up parameters for simulation T = tsys.T @@ -948,6 +949,8 @@ def test_sys_naming_convention(self, tsys): """Enforce generic system names 'sys[i]' to be present when systems are created without explicit names.""" + ct.config.use_legacy_defaults('0.8.4') # changed delims in 0.9.0 + ct.config.use_numpy_matrix(False) # get rid of warning messages ct.InputOutputSystem.idCounter = 0 sys = ct.LinearIOSystem(tsys.mimo_linsys1) @@ -1001,13 +1004,18 @@ def test_sys_naming_convention(self, tsys): with pytest.warns(UserWarning): unnamedsys1 * unnamedsys1 - def test_signals_naming_convention(self, tsys): + ct.config.reset_defaults() # reset defaults + + def test_signals_naming_convention_0_8_4(self, tsys): """Enforce generic names to be present when systems are created without explicit signal names: input: 'u[i]' state: 'x[i]' output: 'y[i]' """ + + ct.config.use_legacy_defaults('0.8.4') # changed delims in 0.9.0 + ct.config.use_numpy_matrix(False) # get rid of warning messages ct.InputOutputSystem.idCounter = 0 sys = ct.LinearIOSystem(tsys.mimo_linsys1) for statename in ["x[0]", "x[1]"]: @@ -1060,6 +1068,8 @@ def test_signals_naming_convention(self, tsys): assert "sys[1].x[0]" in same_name_series.state_index assert "copy of sys[1].x[0]" in same_name_series.state_index + ct.config.reset_defaults() # reset defaults + def test_named_signals_linearize_inconsistent(self, tsys): """Mare sure that providing inputs or outputs not consistent with updfcn or outfcn fail @@ -1109,6 +1119,7 @@ def outfcn(t, x, u, params): def test_lineariosys_statespace(self, tsys): """Make sure that a LinearIOSystem is also a StateSpace object""" iosys_siso = ct.LinearIOSystem(tsys.siso_linsys) + iosys_siso2 = ct.LinearIOSystem(tsys.siso_linsys) assert isinstance(iosys_siso, ct.StateSpace) # Make sure that state space functions work for LinearIOSystems @@ -1122,7 +1133,7 @@ def test_lineariosys_statespace(self, tsys): np.testing.assert_array_equal(omega_io, omega_ss) # LinearIOSystem methods should override StateSpace methods - io_mul = iosys_siso * iosys_siso + io_mul = iosys_siso * iosys_siso2 assert isinstance(io_mul, ct.InputOutputSystem) # But also retain linear structure @@ -1136,7 +1147,7 @@ def test_lineariosys_statespace(self, tsys): np.testing.assert_array_equal(io_mul.D, ss_series.D) # Make sure that series does the same thing - io_series = ct.series(iosys_siso, iosys_siso) + io_series = ct.series(iosys_siso, iosys_siso2) assert isinstance(io_series, ct.InputOutputSystem) assert isinstance(io_series, ct.StateSpace) np.testing.assert_array_equal(io_series.A, ss_series.A) @@ -1145,7 +1156,7 @@ def test_lineariosys_statespace(self, tsys): np.testing.assert_array_equal(io_series.D, ss_series.D) # Test out feedback as well - io_feedback = ct.feedback(iosys_siso, iosys_siso) + io_feedback = ct.feedback(iosys_siso, iosys_siso2) assert isinstance(io_series, ct.InputOutputSystem) # But also retain linear structure @@ -1158,6 +1169,23 @@ def test_lineariosys_statespace(self, tsys): np.testing.assert_array_equal(io_feedback.C, ss_feedback.C) np.testing.assert_array_equal(io_feedback.D, ss_feedback.D) + # Make sure series interconnections are done in the right order + ss_sys1 = ct.rss(2, 3, 2) + io_sys1 = ct.ss2io(ss_sys1) + ss_sys2 = ct.rss(2, 2, 3) + io_sys2 = ct.ss2io(ss_sys2) + io_series = io_sys2 * io_sys1 + assert io_series.ninputs == 2 + assert io_series.noutputs == 2 + assert io_series.nstates == 4 + + # While we are at it, check that the state space matrices match + ss_series = ss_sys2 * ss_sys1 + np.testing.assert_array_equal(io_series.A, ss_series.A) + np.testing.assert_array_equal(io_series.B, ss_series.B) + np.testing.assert_array_equal(io_series.C, ss_series.C) + np.testing.assert_array_equal(io_series.D, ss_series.D) + def test_docstring_example(self): P = ct.LinearIOSystem( ct.rss(2, 2, 2, strictly_proper=True), name='P') @@ -1192,12 +1220,14 @@ def test_duplicates(self, tsys): ios_series = nlios * nlios # Nonduplicate objects + ct.config.use_legacy_defaults('0.8.4') # changed delims in 0.9.0 nlios1 = nlios.copy() nlios2 = nlios.copy() with pytest.warns(UserWarning, match="Duplicate name"): ios_series = nlios1 * nlios2 assert "copy of sys_1.x[0]" in ios_series.state_index.keys() assert "copy of sys.x[0]" in ios_series.state_index.keys() + ct.config.reset_defaults() # reset defaults # Duplicate names iosys_siso = ct.LinearIOSystem(tsys.siso_linsys) @@ -1226,6 +1256,73 @@ def test_duplicates(self, tsys): pytest.fail("Warning not expected: " + record[0].message) +def test_linear_interconnection(): + ss_sys1 = ct.rss(2, 2, 2, strictly_proper=True) + ss_sys2 = ct.rss(2, 2, 2) + io_sys1 = ios.LinearIOSystem( + ss_sys1, inputs = ('u[0]', 'u[1]'), + outputs = ('y[0]', 'y[1]'), name = 'sys1') + io_sys2 = ios.LinearIOSystem( + ss_sys2, inputs = ('u[0]', 'u[1]'), + outputs = ('y[0]', 'y[1]'), name = 'sys2') + nl_sys2 = ios.NonlinearIOSystem( + lambda t, x, u, params: np.array( + np.dot(ss_sys2.A, np.reshape(x, (-1, 1))) \ + + np.dot(ss_sys2.B, np.reshape(u, (-1, 1)))).reshape((-1,)), + lambda t, x, u, params: np.array( + np.dot(ss_sys2.C, np.reshape(x, (-1, 1))) \ + + np.dot(ss_sys2.D, np.reshape(u, (-1, 1)))).reshape((-1,)), + states = 2, + inputs = ('u[0]', 'u[1]'), + outputs = ('y[0]', 'y[1]'), + name = 'sys2') + + # Create a "regular" InterconnectedSystem + nl_connect = ios.interconnect( + (io_sys1, nl_sys2), + connections=[ + ['sys1.u[1]', 'sys2.y[0]'], + ['sys2.u[0]', 'sys1.y[1]'] + ], + inplist=[ + ['sys1.u[0]', 'sys1.u[1]'], + ['sys2.u[1]']], + outlist=[ + ['sys1.y[0]', '-sys2.y[0]'], + ['sys2.y[1]'], + ['sys2.u[1]']]) + assert isinstance(nl_connect, ios.InterconnectedSystem) + assert not isinstance(nl_connect, ios.LinearInterconnectedSystem) + + # Now take its linearization + ss_connect = nl_connect.linearize(0, 0) + assert isinstance(ss_connect, ios.LinearIOSystem) + + io_connect = ios.interconnect( + (io_sys1, io_sys2), + connections=[ + ['sys1.u[1]', 'sys2.y[0]'], + ['sys2.u[0]', 'sys1.y[1]'] + ], + inplist=[ + ['sys1.u[0]', 'sys1.u[1]'], + ['sys2.u[1]']], + outlist=[ + ['sys1.y[0]', '-sys2.y[0]'], + ['sys2.y[1]'], + ['sys2.u[1]']]) + assert isinstance(io_connect, ios.InterconnectedSystem) + assert isinstance(io_connect, ios.LinearInterconnectedSystem) + assert isinstance(io_connect, ios.LinearIOSystem) + assert isinstance(io_connect, ct.StateSpace) + + # Finally compare the linearization with the linear system + np.testing.assert_array_almost_equal(io_connect.A, ss_connect.A) + np.testing.assert_array_almost_equal(io_connect.B, ss_connect.B) + np.testing.assert_array_almost_equal(io_connect.C, ss_connect.C) + np.testing.assert_array_almost_equal(io_connect.D, ss_connect.D) + + def predprey(t, x, u, params={}): """Predator prey dynamics""" r = params.get('r', 2) From a155f1b3320cb92034e80dd166d8624117dad17d Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 2 Jan 2021 07:35:23 -0800 Subject: [PATCH 07/18] turn off selected unit tests for scipy-0.19 --- control/tests/iosys_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index dea816deb..34e8bae93 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -204,7 +204,6 @@ def test_linearize(self, tsys, kincar): linearized.C, [[1, 0, 0], [0, 1, 0]]) np.testing.assert_array_almost_equal(linearized.D, np.zeros((2,2))) - def test_linearize_named_signals(self, kincar): # Full form of the call linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True, @@ -286,6 +285,7 @@ def test_connect(self, tsys): np.testing.assert_array_almost_equal(lti_t, ios_t) np.testing.assert_allclose(lti_y, ios_y,atol=0.002,rtol=0.) + @noscipy0 @pytest.mark.parametrize( "connections, inplist, outlist", [pytest.param([[(1, 0), (0, 0, 1)]], [[(0, 0, 1)]], [[(1, 0, 1)]], @@ -328,6 +328,7 @@ def test_connect_spec_variants(self, tsys, connections, inplist, outlist): np.testing.assert_array_almost_equal(lti_t, ios_t) np.testing.assert_allclose(lti_y, ios_y, atol=0.002, rtol=0.) + @noscipy0 @pytest.mark.parametrize( "connections, inplist, outlist", [pytest.param([['sys2.u[0]', 'sys1.y[0]']], From caed48da6ef6aaa89ae9ba139816f44af3856eab Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 2 Jan 2021 14:59:40 -0800 Subject: [PATCH 08/18] updated sphinx documentation + changed to LinearICSystem --- control/bdalg.py | 7 +++ control/iosys.py | 97 +++++++++++++++++++------------------ control/tests/iosys_test.py | 4 +- control/xferfcn.py | 2 +- doc/classes.rst | 7 ++- doc/control.rst | 11 +++-- doc/iosys.rst | 62 ++++++++++++------------ 7 files changed, 99 insertions(+), 91 deletions(-) diff --git a/control/bdalg.py b/control/bdalg.py index a9ba6cd16..f88e8e813 100644 --- a/control/bdalg.py +++ b/control/bdalg.py @@ -326,6 +326,13 @@ def connect(sys, Q, inputv, outputv): >>> Q = [[1, 2], [2, -1]] # negative feedback interconnection >>> sysc = connect(sys, Q, [2], [1, 2]) + Notes + ----- + The :func:`~control.interconnect` function in the + :ref:`input/output systems ` module allows the use + of named signals and provides an alternative method for + interconnecting multiple systems. + """ inputv, outputv, Q = np.asarray(inputv), np.asarray(outputv), np.asarray(Q) # check indices diff --git a/control/iosys.py b/control/iosys.py index 009e4565f..77868692f 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -42,8 +42,8 @@ from . import config __all__ = ['InputOutputSystem', 'LinearIOSystem', 'NonlinearIOSystem', - 'InterconnectedSystem', 'input_output_response', 'find_eqpt', - 'linearize', 'ss2io', 'tf2io', 'interconnect'] + 'InterconnectedSystem', 'LinearICSystem', 'input_output_response', + 'find_eqpt', 'linearize', 'ss2io', 'tf2io', 'interconnect'] # Define module default parameter values _iosys_defaults = { @@ -110,8 +110,8 @@ class for a set of subclasses that are used to implement specific Notes ----- - The `InputOuputSystem` class (and its subclasses) makes use of two special - methods for implementing much of the work of the class: + The :class:`~control.InputOuputSystem` class (and its subclasses) makes + use of two special methods for implementing much of the work of the class: * _rhs(t, x, u): compute the right hand side of the differential or difference equation for the system. This must be specified by the @@ -137,8 +137,8 @@ def __init__(self, inputs=None, outputs=None, states=None, params={}, The InputOutputSystem contructor is used to create an input/output object with the core information required for all input/output systems. Instances of this class are normally created by one of the - input/output subclasses: :class:`~control.LinearIOSystem`, - :class:`~control.NonlinearIOSystem`, + input/output subclasses: :class:`~control.LinearICSystem`, + :class:`~control.LinearIOSystem`, :class:`~control.NonlinearIOSystem`, :class:`~control.InterconnectedSystem`. Parameters @@ -242,10 +242,10 @@ def __mul__(sys2, sys1): np.zeros((sys2.ninputs, sys2.noutputs))]] )) - # If both systems are linear, create LinearInterconnectedSystem + # If both systems are linear, create LinearICSystem if isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): ss_sys = StateSpace.__mul__(sys2, sys1) - return LinearInterconnectedSystem(newsys, ss_sys) + return LinearICSystem(newsys, ss_sys) # Return the newly created InterconnectedSystem return newsys @@ -294,10 +294,10 @@ def __add__(sys1, sys2): newsys = InterconnectedSystem( (sys1, sys2), inplist=inplist, outlist=outlist) - # If both systems are linear, create LinearInterconnectedSystem + # If both systems are linear, create LinearICSystem if isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): ss_sys = StateSpace.__add__(sys2, sys1) - return LinearInterconnectedSystem(newsys, ss_sys) + return LinearICSystem(newsys, ss_sys) # Return the newly created InterconnectedSystem return newsys @@ -315,10 +315,10 @@ def __neg__(sys): newsys = InterconnectedSystem( (sys,), dt=sys.dt, inplist=inplist, outlist=outlist) - # If the system is linear, create LinearInterconnectedSystem + # If the system is linear, create LinearICSystem if isinstance(sys, StateSpace): ss_sys = StateSpace.__neg__(sys) - return LinearInterconnectedSystem(newsys, ss_sys) + return LinearICSystem(newsys, ss_sys) # Return the newly created system return newsys @@ -502,7 +502,7 @@ def feedback(self, other=1, sign=-1, params={}): if isinstance(self, StateSpace) and isinstance(other, StateSpace): # Special case: maintain linear systems structure ss_sys = StateSpace.feedback(self, other, sign=sign) - return LinearInterconnectedSystem(newsys, ss_sys) + return LinearICSystem(newsys, ss_sys) # Return the newly created system return newsys @@ -697,10 +697,10 @@ def __init__(self, updfcn, outfcn=None, inputs=None, outputs=None, name=None, **kwargs): """Create a nonlinear I/O system given update and output functions. - Creates an `InputOutputSystem` for a nonlinear system by specifying a - state update function and an output function. The new system can be a - continuous or discrete time system (Note: discrete-time systems not - yet supported by most function.) + Creates an :class:`~control.InputOutputSystem` for a nonlinear system + by specifying a state update function and an output function. The new + system can be a continuous or discrete time system (Note: + discrete-time systems not yet supported by most function.) Parameters ---------- @@ -1284,15 +1284,16 @@ def set_output_map(self, output_map): self.noutputs = output_map.shape[0] -class LinearInterconnectedSystem(InterconnectedSystem, LinearIOSystem): +class LinearICSystem(InterconnectedSystem, LinearIOSystem): """Interconnection of a set of linear input/output systems. This class is used to implement a system that is an interconnection of linear input/output systems. It has all of the structure of an - :class:`InterconnectedSystem`, but also maintains the requirement elements - of :class:`LinearIOSystem`, including the :class:`StateSpace` class - structure, allowing it to be passed to functions that expect a - :class:`StateSpace` system. + :class:`~control.InterconnectedSystem`, but also maintains the requirement + elements of :class:`~control.LinearIOSystem`, including the + :class:`StateSpace` class structure, allowing it to be passed to functions + that expect a :class:`StateSpace` system. + """ def __init__(self, io_sys, ss_sys=None): @@ -1755,7 +1756,7 @@ def linearize(sys, xeq, ueq=[], t=0, params={}, **kw): """Linearize an input/output system at a given state and input. This function computes the linearization of an input/output system at a - given state and input value and returns a :class:`control.StateSpace` + given state and input value and returns a :class:`~control.StateSpace` object. The eavaluation point need not be an equilibrium point. Parameters @@ -1840,10 +1841,11 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], This function creates a new system that is an interconnection of a set of input/output systems. If all of the input systems are linear I/O systems - (type `LinearIOSystem`) then the resulting system will be a linear - interconnected I/O system (type `LinearInterconnectedSystem`) with the - appropriate inputs, outputs, and states. Otherwise, an interconnected I/O - system (type `InterconnectedSystem`) will be created. + (type :class:`~control.LinearIOSystem`) then the resulting system will be + a linear interconnected I/O system (type :class:`~control.LinearICSystem`) + with the appropriate inputs, outputs, and states. Otherwise, an + interconnected I/O system (type :class:`~control.InterconnectedSystem`) + will be created. Parameters ---------- @@ -1895,8 +1897,8 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], the system input connects to only one subsystem input, a single input specification can be given (without the inner list). - If omitted, the input map can be specified using the `set_input_map` - method. + If omitted, the input map can be specified using the + :func:`~control.InterconnectedSystem.set_input_map` method. outlist : list of output connections, optional List of connections for how the outputs from the subsystems are mapped @@ -1910,8 +1912,8 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], then those signals are added together (multiplying by the any gain term) to form the system output. - If omitted, the output map can be specified using the `set_output_map` - method. + If omitted, the output map can be specified using the + :func:`~control.InterconnectedSystem.set_output_map` method. inputs : int, list of str or None, optional Description of the system inputs. This can be given as an integer @@ -1951,17 +1953,17 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], Example ------- - P = control.LinearIOSystem( - ct.rss(2, 2, 2, strictly_proper=True), name='P') - C = control.LinearIOSystem(control.rss(2, 2, 2), name='C') - S = control.InterconnectedSystem( - [P, C], - connections = [ - ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[0]'], - ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], - inplist = ['C.u[0]', 'C.u[1]'], - outlist = ['P.y[0]', 'P.y[1]'], - ) + >>> P = control.LinearIOSystem( + >>> ct.rss(2, 2, 2, strictly_proper=True), name='P') + >>> C = control.LinearIOSystem(control.rss(2, 2, 2), name='C') + >>> S = control.InterconnectedSystem( + >>> [P, C], + >>> connections = [ + >>> ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[0]'], + >>> ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], + >>> inplist = ['C.u[0]', 'C.u[1]'], + >>> outlist = ['P.y[0]', 'P.y[1]'], + >>> ) Notes ----- @@ -1973,10 +1975,11 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], check your use of tuples. In addition to its use for general nonlinear I/O systems, the - `interconnect` function allows linear systems to be interconnected using - named signals (compared with the `connect` function, which uses signal - indicies) and to be treated as both a `StateSpace` system as well as an - `InputOutputSystem`. + :func:`~control.interconnect` function allows linear systems to be + interconnected using named signals (compared with the + :func:`~control.connect` function, which uses signal indicies) and to be + treated as both a :class:`~control.StateSpace` system as well as an + :class:`~control.InputOutputSystem`. """ newsys = InterconnectedSystem( @@ -1986,6 +1989,6 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], # If all subsystems are linear systems, maintain linear structure if all([isinstance(sys, LinearIOSystem) for sys in syslist]): - return LinearInterconnectedSystem(newsys, None) + return LinearICSystem(newsys, None) return newsys diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 34e8bae93..5ab91a8f3 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -1293,7 +1293,7 @@ def test_linear_interconnection(): ['sys2.y[1]'], ['sys2.u[1]']]) assert isinstance(nl_connect, ios.InterconnectedSystem) - assert not isinstance(nl_connect, ios.LinearInterconnectedSystem) + assert not isinstance(nl_connect, ios.LinearICSystem) # Now take its linearization ss_connect = nl_connect.linearize(0, 0) @@ -1313,7 +1313,7 @@ def test_linear_interconnection(): ['sys2.y[1]'], ['sys2.u[1]']]) assert isinstance(io_connect, ios.InterconnectedSystem) - assert isinstance(io_connect, ios.LinearInterconnectedSystem) + assert isinstance(io_connect, ios.LinearICSystem) assert isinstance(io_connect, ios.LinearIOSystem) assert isinstance(io_connect, ct.StateSpace) diff --git a/control/xferfcn.py b/control/xferfcn.py index 93743deb1..9a70e36b6 100644 --- a/control/xferfcn.py +++ b/control/xferfcn.py @@ -809,7 +809,7 @@ def returnScipySignalLTI(self, strict=True): continuous (0) or discrete (True or > 0). False: if `tfobject.dt` is None, continuous time - :class:`scipy.signal.lti`objects are returned + :class:`scipy.signal.lti` objects are returned Returns ------- diff --git a/doc/classes.rst b/doc/classes.rst index 0981843ca..b948f23aa 100644 --- a/doc/classes.rst +++ b/doc/classes.rst @@ -16,18 +16,17 @@ these directly. TransferFunction StateSpace FrequencyResponseData - ~iosys.InputOutputSystem + InputOutputSystem Input/output system subclasses ============================== -.. currentmodule:: control.iosys - Input/output systems are accessed primarily via a set of subclasses that allow for linear, nonlinear, and interconnected elements: .. autosummary:: :toctree: generated/ + InterconnectedSystem + LinearICSystem LinearIOSystem NonlinearIOSystem - InterconnectedSystem diff --git a/doc/control.rst b/doc/control.rst index d44de3f04..a3423e379 100644 --- a/doc/control.rst +++ b/doc/control.rst @@ -139,11 +139,12 @@ Nonlinear system support .. autosummary:: :toctree: generated/ - ~iosys.find_eqpt - ~iosys.linearize - ~iosys.input_output_response - ~iosys.ss2io - ~iosys.tf2io + find_eqpt + interconnect + linearize + input_output_response + ss2io + tf2io flatsys.point_to_point .. _utility-and-conversions: diff --git a/doc/iosys.rst b/doc/iosys.rst index 0353a01d7..b2ac752af 100644 --- a/doc/iosys.rst +++ b/doc/iosys.rst @@ -4,10 +4,6 @@ Input/output systems ******************** -.. automodule:: control.iosys - :no-members: - :no-inherited-members: - Module usage ============ @@ -40,16 +36,16 @@ equation) and and output function (computes the outputs from the state):: io_sys = NonlinearIOSystem(updfcn, outfcn, inputs=M, outputs=P, states=N) More complex input/output systems can be constructed by using the -:class:`~control.InterconnectedSystem` class, which allows a collection of -input/output subsystems to be combined with internal connections between the -subsystems and a set of overall system inputs and outputs that link to the -subsystems:: +:func:`~control.interconnect` function, which allows a collection of +input/output subsystems to be combined with internal connections +between the subsystems and a set of overall system inputs and outputs +that link to the subsystems:: - steering = ct.InterconnectedSystem( - (plant, controller), name='system', - connections=(('controller.e', '-plant.y')), - inplist=('controller.e'), inputs='r', - outlist=('plant.y'), outputs='y') + steering = ct.interconnect( + [plant, controller], name='system', + connections=[['controller.e', '-plant.y']], + inplist=['controller.e'], inputs='r', + outlist=['plant.y'], outputs='y') Interconnected systems can also be created using block diagram manipulations such as the :func:`~control.series`, :func:`~control.parallel`, and @@ -160,19 +156,19 @@ The input to the controller is `u`, consisting of the vector of hare and lynx populations followed by the desired lynx population. To connect the controller to the predatory-prey model, we create an -`InterconnectedSystem`: +:class:`~control.InterconnectedSystem`: .. code-block:: python - io_closed = control.InterconnectedSystem( - (io_predprey, io_controller), # systems - connections=( - ('predprey.u', 'control.y[0]'), - ('control.u1', 'predprey.H'), - ('control.u2', 'predprey.L') - ), - inplist=('control.Ld'), - outlist=('predprey.H', 'predprey.L', 'control.y[0]') + io_closed = control.interconnect( + [io_predprey, io_controller], # systems + connections=[ + ['predprey.u', 'control.y[0]'], + ['control.u1', 'predprey.H'], + ['control.u2', 'predprey.L'] + ], + inplist=['control.Ld'], + outlist=['predprey.H', 'predprey.L', 'control.y[0]'] ) Finally, we simulate the closed loop system: @@ -200,18 +196,20 @@ Input/output system classes --------------------------- .. autosummary:: - InputOutputSystem - InterconnectedSystem - LinearIOSystem - NonlinearIOSystem + ~control.InputOutputSystem + ~control.InterconnectedSystem + ~control.LinearICSystem + ~control.LinearIOSystem + ~control.NonlinearIOSystem Input/output system functions ----------------------------- .. autosummary:: - find_eqpt - linearize - input_output_response - ss2io - tf2io + ~control.find_eqpt + ~control.linearize + ~control.input_output_response + ~control.interconnect + ~control.ss2io + ~control.tf2io From 841584fc04f1419b0b72b39b48434dafc8cec87a Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 2 Jan 2021 19:29:05 -0800 Subject: [PATCH 09/18] allow default linearized system name to be customized --- control/config.py | 4 +++- control/iosys.py | 28 ++++++++++++++++++++++------ control/tests/iosys_test.py | 5 +++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/control/config.py b/control/config.py index 2e2da14fc..b4950ae5e 100644 --- a/control/config.py +++ b/control/config.py @@ -222,6 +222,8 @@ def use_legacy_defaults(version): # changed iosys naming conventions set_defaults('iosys', state_name_delim='.', duplicate_system_name_prefix='copy of ', - duplicate_system_name_suffix='') + duplicate_system_name_suffix='', + linearized_system_name_prefix='', + linearized_system_name_suffix='_linearized') return (major, minor, patch) diff --git a/control/iosys.py b/control/iosys.py index 77868692f..94b8234c6 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -49,7 +49,9 @@ _iosys_defaults = { 'iosys.state_name_delim': '_', 'iosys.duplicate_system_name_prefix': '', - 'iosys.duplicate_system_name_suffix': '$copy' + 'iosys.duplicate_system_name_suffix': '$copy', + 'iosys.linearized_system_name_prefix': '', + 'iosys.linearized_system_name_suffix': '$linearized' } @@ -570,7 +572,10 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, # Set the names the system, inputs, outputs, and states if copy: if name is None: - linsys.name = self.name + "_linearized" + linsys.name = \ + config.defaults['iosys.linearized_system_name_prefix'] + \ + self.name + \ + config.defaults['iosys.linearized_system_name_suffix'] linsys.ninputs, linsys.input_index = self.ninputs, \ self.input_index.copy() linsys.noutputs, linsys.output_index = \ @@ -1781,9 +1786,13 @@ def linearize(sys, xeq, ueq=[], t=0, params={}, **kw): the system name is set to the input system name with the string '_linearized' appended. name : string, optional - Set the name of the linearized system. If not specified and if `copy` - is `False`, a generic name is generated with a unique - integer id. + Set the name of the linearized system. If not specified and + if `copy` is `False`, a generic name is generated + with a unique integer id. If `copy` is `True`, the new system + name is determined by adding the prefix and suffix strings in + config.defaults['iosys.linearized_system_name_prefix'] and + config.defaults['iosys.linearized_system_name_suffix'], with the + default being to add the suffix '$linearized'. Returns ------- @@ -1967,6 +1976,13 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], Notes ----- + If a system is duplicated in the list of systems to be connected, + a warning is generated a copy of the system is created with the + name of the new system determined by adding the prefix and suffix + strings in config.defaults['iosys.linearized_system_name_prefix'] + and config.defaults['iosys.linearized_system_name_suffix'], with the + default being to add the suffix '$copy'$ to the system name. + It is possible to replace lists in most of arguments with tuples instead, but strictly speaking the only use of tuples should be in the specification of an input- or output-signal via the tuple notation @@ -1977,7 +1993,7 @@ def interconnect(syslist, connections=[], inplist=[], outlist=[], In addition to its use for general nonlinear I/O systems, the :func:`~control.interconnect` function allows linear systems to be interconnected using named signals (compared with the - :func:`~control.connect` function, which uses signal indicies) and to be + :func:`~control.connect` function, which uses signal indices) and to be treated as both a :class:`~control.StateSpace` system as well as an :class:`~control.InputOutputSystem`. diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 5ab91a8f3..64bac53d8 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -218,8 +218,13 @@ def test_linearize_named_signals(self, kincar): assert linearized.find_state('theta') == 2 # If we copy signal names w/out a system name, append '_linearized' + ct.use_legacy_defaults('0.8.4') linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True) assert linearized.name == kincar.name + '_linearized' + ct.use_legacy_defaults('0.9.0') + linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True) + assert linearized.name == kincar.name + '$linearized' + ct.reset_defaults() # If copy is False, signal names should not be copied lin_nocopy = kincar.linearize(0, 0, copy=False) From b8184618440dfd4d5171a61642d7f33bc1adaf8b Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 2 Jan 2021 21:56:26 -0800 Subject: [PATCH 10/18] fix bug in iosys unit test (reset_defaults) --- control/tests/iosys_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 64bac53d8..291ce868e 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -219,8 +219,11 @@ def test_linearize_named_signals(self, kincar): # If we copy signal names w/out a system name, append '_linearized' ct.use_legacy_defaults('0.8.4') + ct.config.use_numpy_matrix(False) # get rid of warning messages linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True) assert linearized.name == kincar.name + '_linearized' + + ct.reset_defaults() ct.use_legacy_defaults('0.9.0') linearized = kincar.linearize([0, 0, 0], [0, 0], copy=True) assert linearized.name == kincar.name + '$linearized' From 213ae491fe85824847c61860cf592f8ac97a1311 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 3 Jan 2021 13:20:40 -0800 Subject: [PATCH 11/18] set array_priority=11 for TransferFunction, to match StateSpace --- control/tests/xferfcn_test.py | 19 +++++++++++++++++++ control/xferfcn.py | 3 +++ 2 files changed, 22 insertions(+) diff --git a/control/tests/xferfcn_test.py b/control/tests/xferfcn_test.py index c0b5e227f..570492495 100644 --- a/control/tests/xferfcn_test.py +++ b/control/tests/xferfcn_test.py @@ -5,7 +5,9 @@ import numpy as np import pytest +import operator +import control as ct from control.statesp import StateSpace, _convertToStateSpace, rss from control.xferfcn import TransferFunction, _convert_to_transfer_function, \ ss2tf @@ -1015,3 +1017,20 @@ def test_returnScipySignalLTI_error(self, mimotf): mimotf.returnScipySignalLTI() with pytest.raises(ValueError): mimotf.returnScipySignalLTI(strict=True) + +@pytest.mark.parametrize( + "op", + [pytest.param(getattr(operator, s), id=s) for s in ('add', 'sub', 'mul')]) +@pytest.mark.parametrize( + "tf, arr", + [# pytest.param(ct.tf([1], [0.5, 1]), np.array(2.), id="0D scalar"), + # pytest.param(ct.tf([1], [0.5, 1]), np.array([2.]), id="1D scalar"), + pytest.param(ct.tf([1], [0.5, 1]), np.array([[2.]]), id="2D scalar")]) +def test_xferfcn_ndarray_precedence(op, tf, arr): + # Apply the operator to the transfer function and array + result = op(tf, arr) + assert isinstance(result, ct.TransferFunction) + + # Apply the operator to the array and transfer function + result = op(arr, tf) + assert isinstance(result, ct.TransferFunction) diff --git a/control/xferfcn.py b/control/xferfcn.py index 9a70e36b6..584424bfb 100644 --- a/control/xferfcn.py +++ b/control/xferfcn.py @@ -114,6 +114,9 @@ class TransferFunction(LTI): >>> G = (s + 1)/(s**2 + 2*s + 1) """ + # Give TransferFunction._rmul_() priority for ndarray * TransferFunction + __array_priority__ = 11 # override ndarray and matrix types + def __init__(self, *args, **kwargs): """TransferFunction(num, den[, dt]) From 8d30e76402fe6e080f388ef4d9024b3125db9d05 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 3 Jan 2021 13:24:06 -0800 Subject: [PATCH 12/18] update _convert_to_transfer_function to allow 0D and 1D arrays --- control/tests/bdalg_test.py | 2 +- control/tests/xferfcn_test.py | 4 ++-- control/xferfcn.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/control/tests/bdalg_test.py b/control/tests/bdalg_test.py index fc5f78f91..433a584cc 100644 --- a/control/tests/bdalg_test.py +++ b/control/tests/bdalg_test.py @@ -255,7 +255,7 @@ def test_feedback_args(self, tsys): ctrl.feedback(*args) # If second argument is not LTI or convertable, generate an exception - args = (tsys.sys1, np.array([1])) + args = (tsys.sys1, 'hello world') with pytest.raises(TypeError): ctrl.feedback(*args) diff --git a/control/tests/xferfcn_test.py b/control/tests/xferfcn_test.py index 570492495..890bbe178 100644 --- a/control/tests/xferfcn_test.py +++ b/control/tests/xferfcn_test.py @@ -1023,8 +1023,8 @@ def test_returnScipySignalLTI_error(self, mimotf): [pytest.param(getattr(operator, s), id=s) for s in ('add', 'sub', 'mul')]) @pytest.mark.parametrize( "tf, arr", - [# pytest.param(ct.tf([1], [0.5, 1]), np.array(2.), id="0D scalar"), - # pytest.param(ct.tf([1], [0.5, 1]), np.array([2.]), id="1D scalar"), + [pytest.param(ct.tf([1], [0.5, 1]), np.array(2.), id="0D scalar"), + pytest.param(ct.tf([1], [0.5, 1]), np.array([2.]), id="1D scalar"), pytest.param(ct.tf([1], [0.5, 1]), np.array([[2.]]), id="2D scalar")]) def test_xferfcn_ndarray_precedence(op, tf, arr): # Apply the operator to the transfer function and array diff --git a/control/xferfcn.py b/control/xferfcn.py index 584424bfb..5dc12c30a 100644 --- a/control/xferfcn.py +++ b/control/xferfcn.py @@ -1314,7 +1314,7 @@ def _convert_to_transfer_function(sys, **kw): # If this is array-like, try to create a constant feedthrough try: - D = array(sys) + D = array(sys, ndmin=2) outputs, inputs = D.shape num = [[[D[i, j]] for j in range(inputs)] for i in range(outputs)] den = [[[1] for j in range(inputs)] for i in range(outputs)] From c51b161da95ddfaade5aeb8b5b8f56cf44554fd3 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 3 Jan 2021 13:40:29 -0800 Subject: [PATCH 13/18] rename _convertToX to _convert_to_x + statesp/ndarray unit tests --- control/bdalg.py | 4 ++-- control/dtime.py | 2 +- control/frdata.py | 18 +++++++++--------- control/statesp.py | 32 ++++++++++++++++---------------- control/tests/frd_test.py | 8 ++++---- control/tests/statesp_test.py | 32 ++++++++++++++++++++++++++------ control/tests/xferfcn_test.py | 4 ++-- control/timeresp.py | 10 +++++----- 8 files changed, 65 insertions(+), 45 deletions(-) diff --git a/control/bdalg.py b/control/bdalg.py index f88e8e813..e00dcfa3c 100644 --- a/control/bdalg.py +++ b/control/bdalg.py @@ -242,9 +242,9 @@ def feedback(sys1, sys2=1, sign=-1): if isinstance(sys2, tf.TransferFunction): sys1 = tf._convert_to_transfer_function(sys1) elif isinstance(sys2, ss.StateSpace): - sys1 = ss._convertToStateSpace(sys1) + sys1 = ss._convert_to_statespace(sys1) elif isinstance(sys2, frd.FRD): - sys1 = frd._convertToFRD(sys1, sys2.omega) + sys1 = frd._convert_to_FRD(sys1, sys2.omega) else: # sys2 is a scalar. sys1 = tf._convert_to_transfer_function(sys1) sys2 = tf._convert_to_transfer_function(sys2) diff --git a/control/dtime.py b/control/dtime.py index 725bcde47..8c0fe53e9 100644 --- a/control/dtime.py +++ b/control/dtime.py @@ -47,7 +47,7 @@ """ from .lti import isctime -from .statesp import StateSpace, _convertToStateSpace +from .statesp import StateSpace __all__ = ['sample_system', 'c2d'] diff --git a/control/frdata.py b/control/frdata.py index 8ca9dfd9d..8310cb34b 100644 --- a/control/frdata.py +++ b/control/frdata.py @@ -203,7 +203,7 @@ def __add__(self, other): # Convert the second argument to a frequency response function. # or re-base the frd to the current omega (if needed) - other = _convertToFRD(other, omega=self.omega) + other = _convert_to_FRD(other, omega=self.omega) # Check that the input-output sizes are consistent. if self.inputs != other.inputs: @@ -238,7 +238,7 @@ def __mul__(self, other): return FRD(self.fresp * other, self.omega, smooth=(self.ifunc is not None)) else: - other = _convertToFRD(other, omega=self.omega) + other = _convert_to_FRD(other, omega=self.omega) # Check that the input-output sizes are consistent. if self.inputs != other.outputs: @@ -265,7 +265,7 @@ def __rmul__(self, other): return FRD(self.fresp * other, self.omega, smooth=(self.ifunc is not None)) else: - other = _convertToFRD(other, omega=self.omega) + other = _convert_to_FRD(other, omega=self.omega) # Check that the input-output sizes are consistent. if self.outputs != other.inputs: @@ -293,7 +293,7 @@ def __truediv__(self, other): return FRD(self.fresp * (1/other), self.omega, smooth=(self.ifunc is not None)) else: - other = _convertToFRD(other, omega=self.omega) + other = _convert_to_FRD(other, omega=self.omega) if (self.inputs > 1 or self.outputs > 1 or other.inputs > 1 or other.outputs > 1): @@ -316,7 +316,7 @@ def __rtruediv__(self, other): return FRD(other / self.fresp, self.omega, smooth=(self.ifunc is not None)) else: - other = _convertToFRD(other, omega=self.omega) + other = _convert_to_FRD(other, omega=self.omega) if (self.inputs > 1 or self.outputs > 1 or other.inputs > 1 or other.outputs > 1): @@ -450,7 +450,7 @@ def freqresp(self, omega): def feedback(self, other=1, sign=-1): """Feedback interconnection between two FRD objects.""" - other = _convertToFRD(other, omega=self.omega) + other = _convert_to_FRD(other, omega=self.omega) if (self.outputs != other.inputs or self.inputs != other.outputs): raise ValueError( @@ -486,7 +486,7 @@ def feedback(self, other=1, sign=-1): FRD = FrequencyResponseData -def _convertToFRD(sys, omega, inputs=1, outputs=1): +def _convert_to_FRD(sys, omega, inputs=1, outputs=1): """Convert a system to frequency response data form (if needed). If sys is already an frd, and its frequency range matches or @@ -496,8 +496,8 @@ def _convertToFRD(sys, omega, inputs=1, outputs=1): scalar, then the number of inputs and outputs can be specified manually, as in: - >>> frd = _convertToFRD(3., omega) # Assumes inputs = outputs = 1 - >>> frd = _convertToFRD(1., omegs, inputs=3, outputs=2) + >>> frd = _convert_to_FRD(3., omega) # Assumes inputs = outputs = 1 + >>> frd = _convert_to_FRD(1., omegs, inputs=3, outputs=2) In the latter example, sys's matrix transfer function is [[1., 1., 1.] [1., 1., 1.]]. diff --git a/control/statesp.py b/control/statesp.py index b41f18c7d..a37ab788e 100644 --- a/control/statesp.py +++ b/control/statesp.py @@ -10,7 +10,7 @@ # Python 3 compatibility (needs to go here) from __future__ import print_function -from __future__ import division # for _convertToStateSpace +from __future__ import division # for _convert_to_statespace """Copyright (c) 2010 by California Institute of Technology All rights reserved. @@ -527,7 +527,7 @@ def __add__(self, other): D = self.D + other dt = self.dt else: - other = _convertToStateSpace(other) + other = _convert_to_statespace(other) # Check to make sure the dimensions are OK if ((self.inputs != other.inputs) or @@ -577,7 +577,7 @@ def __mul__(self, other): D = self.D * other dt = self.dt else: - other = _convertToStateSpace(other) + other = _convert_to_statespace(other) # Check to make sure the dimensions are OK if self.inputs != other.outputs: @@ -614,7 +614,7 @@ def __rmul__(self, other): # is lti, and convertible? if isinstance(other, LTI): - return _convertToStateSpace(other) * self + return _convert_to_statespace(other) * self # try to treat this as a matrix try: @@ -814,7 +814,7 @@ def zero(self): def feedback(self, other=1, sign=-1): """Feedback interconnection between two LTI systems.""" - other = _convertToStateSpace(other) + other = _convert_to_statespace(other) # Check to make sure the dimensions are OK if (self.inputs != other.outputs) or (self.outputs != other.inputs): @@ -882,7 +882,7 @@ def lft(self, other, nu=-1, ny=-1): Dimension of (plant) control input. """ - other = _convertToStateSpace(other) + other = _convert_to_statespace(other) # maximal values for nu, ny if ny == -1: ny = min(other.inputs, self.outputs) @@ -1036,7 +1036,7 @@ def append(self, other): The second model is converted to state-space if necessary, inputs and outputs are appended and their order is preserved""" if not isinstance(other, StateSpace): - other = _convertToStateSpace(other) + other = _convert_to_statespace(other) self.dt = common_timebase(self.dt, other.dt) @@ -1161,7 +1161,7 @@ def is_static_gain(self): # TODO: add discrete time check -def _convertToStateSpace(sys, **kw): +def _convert_to_statespace(sys, **kw): """Convert a system to state space form (if needed). If sys is already a state space, then it is returned. If sys is a @@ -1169,8 +1169,8 @@ def _convertToStateSpace(sys, **kw): returned. If sys is a scalar, then the number of inputs and outputs can be specified manually, as in: - >>> sys = _convertToStateSpace(3.) # Assumes inputs = outputs = 1 - >>> sys = _convertToStateSpace(1., inputs=3, outputs=2) + >>> sys = _convert_to_statespace(3.) # Assumes inputs = outputs = 1 + >>> sys = _convert_to_statespace(1., inputs=3, outputs=2) In the latter example, A = B = C = 0 and D = [[1., 1., 1.] [1., 1., 1.]]. @@ -1180,7 +1180,7 @@ def _convertToStateSpace(sys, **kw): if isinstance(sys, StateSpace): if len(kw): - raise TypeError("If sys is a StateSpace, _convertToStateSpace " + raise TypeError("If sys is a StateSpace, _convert_to_statespace " "cannot take keywords.") # Already a state space system; just return it @@ -1196,7 +1196,7 @@ def _convertToStateSpace(sys, **kw): from slycot import td04ad if len(kw): raise TypeError("If sys is a TransferFunction, " - "_convertToStateSpace cannot take keywords.") + "_convert_to_statespace cannot take keywords.") # Change the numerator and denominator arrays so that the transfer # function matrix has a common denominator. @@ -1258,7 +1258,7 @@ def _convertToStateSpace(sys, **kw): return StateSpace([], [], [], D) except Exception as e: print("Failure to assume argument is matrix-like in" - " _convertToStateSpace, result %s" % e) + " _convert_to_statespace, result %s" % e) raise TypeError("Can't convert given type to StateSpace system.") @@ -1637,14 +1637,14 @@ def tf2ss(*args): from .xferfcn import TransferFunction if len(args) == 2 or len(args) == 3: # Assume we were given the num, den - return _convertToStateSpace(TransferFunction(*args)) + return _convert_to_statespace(TransferFunction(*args)) elif len(args) == 1: sys = args[0] if not isinstance(sys, TransferFunction): raise TypeError("tf2ss(sys): sys must be a TransferFunction " "object.") - return _convertToStateSpace(sys) + return _convert_to_statespace(sys) else: raise ValueError("Needs 1 or 2 arguments; received %i." % len(args)) @@ -1744,5 +1744,5 @@ def ssdata(sys): (A, B, C, D): list of matrices State space data for the system """ - ss = _convertToStateSpace(sys) + ss = _convert_to_statespace(sys) return ss.A, ss.B, ss.C, ss.D diff --git a/control/tests/frd_test.py b/control/tests/frd_test.py index 18f2f17b1..a193adb73 100644 --- a/control/tests/frd_test.py +++ b/control/tests/frd_test.py @@ -12,7 +12,7 @@ import control as ct from control.statesp import StateSpace from control.xferfcn import TransferFunction -from control.frdata import FRD, _convertToFRD, FrequencyResponseData +from control.frdata import FRD, _convert_to_FRD, FrequencyResponseData from control import bdalg, evalfr, freqplot from control.tests.conftest import slycotonly @@ -174,9 +174,9 @@ def testFeedback2(self): def testAuto(self): omega = np.logspace(-1, 2, 10) - f1 = _convertToFRD(1, omega) - f2 = _convertToFRD(np.array([[1, 0], [0.1, -1]]), omega) - f2 = _convertToFRD([[1, 0], [0.1, -1]], omega) + f1 = _convert_to_FRD(1, omega) + f2 = _convert_to_FRD(np.array([[1, 0], [0.1, -1]]), omega) + f2 = _convert_to_FRD([[1, 0], [0.1, -1]], omega) f1, f2 # reference to avoid pyflakes error def testNyquist(self): diff --git a/control/tests/statesp_test.py b/control/tests/statesp_test.py index 02fac1776..f570b2f72 100644 --- a/control/tests/statesp_test.py +++ b/control/tests/statesp_test.py @@ -9,14 +9,16 @@ import numpy as np import pytest +import operator from numpy.linalg import solve from scipy.linalg import block_diag, eigvals +import control as ct from control.config import defaults from control.dtime import sample_system from control.lti import evalfr -from control.statesp import (StateSpace, _convertToStateSpace, drss, rss, ss, - tf2ss, _statesp_defaults) +from control.statesp import (StateSpace, _convert_to_statespace, drss, + rss, ss, tf2ss, _statesp_defaults) from control.tests.conftest import ismatarrayout, slycotonly from control.xferfcn import TransferFunction, ss2tf @@ -224,7 +226,7 @@ def test_pole(self, sys322): def test_zero_empty(self): """Test to make sure zero() works with no zeros in system.""" - sys = _convertToStateSpace(TransferFunction([1], [1, 2, 1])) + sys = _convert_to_statespace(TransferFunction([1], [1, 2, 1])) np.testing.assert_array_equal(sys.zero(), np.array([])) @slycotonly @@ -455,7 +457,7 @@ def test_append_tf(self): s = TransferFunction([1, 0], [1]) h = 1 / (s + 1) / (s + 2) sys1 = StateSpace(A1, B1, C1, D1) - sys2 = _convertToStateSpace(h) + sys2 = _convert_to_statespace(h) sys3c = sys1.append(sys2) np.testing.assert_array_almost_equal(sys1.A, sys3c.A[:3, :3]) np.testing.assert_array_almost_equal(sys1.B, sys3c.B[:3, :2]) @@ -624,10 +626,10 @@ def test_empty(self): assert 0 == g1.outputs def test_matrix_to_state_space(self): - """_convertToStateSpace(matrix) gives ss([],[],[],D)""" + """_convert_to_statespace(matrix) gives ss([],[],[],D)""" with pytest.deprecated_call(): D = np.matrix([[1, 2, 3], [4, 5, 6]]) - g = _convertToStateSpace(D) + g = _convert_to_statespace(D) np.testing.assert_array_equal(np.empty((0, 0)), g.A) np.testing.assert_array_equal(np.empty((0, D.shape[1])), g.B) @@ -928,3 +930,21 @@ def test_latex_repr(gmats, ref, repr_type, num_format, editsdefaults): refkey = "{}_{}".format(refkey_n[num_format], refkey_r[repr_type]) assert g._repr_latex_() == ref[refkey] +@pytest.mark.parametrize( + "op", + [pytest.param(getattr(operator, s), id=s) for s in ('add', 'sub', 'mul')]) +@pytest.mark.parametrize( + "tf, arr", + [pytest.param(ct.tf([1], [0.5, 1]), np.array(2.), id="0D scalar"), + pytest.param(ct.tf([1], [0.5, 1]), np.array([2.]), id="1D scalar"), + pytest.param(ct.tf([1], [0.5, 1]), np.array([[2.]]), id="2D scalar")]) +def test_xferfcn_ndarray_precedence(op, tf, arr): + # Apply the operator to the transfer function and array + ss = ct.tf2ss(tf) + result = op(ss, arr) + assert isinstance(result, ct.StateSpace) + + # Apply the operator to the array and transfer function + ss = ct.tf2ss(tf) + result = op(arr, ss) + assert isinstance(result, ct.StateSpace) diff --git a/control/tests/xferfcn_test.py b/control/tests/xferfcn_test.py index 890bbe178..0f4424590 100644 --- a/control/tests/xferfcn_test.py +++ b/control/tests/xferfcn_test.py @@ -8,7 +8,7 @@ import operator import control as ct -from control.statesp import StateSpace, _convertToStateSpace, rss +from control.statesp import StateSpace, _convert_to_statespace, rss from control.xferfcn import TransferFunction, _convert_to_transfer_function, \ ss2tf from control.lti import evalfr @@ -704,7 +704,7 @@ def test_state_space_conversion_mimo(self): h = (b0 + b1*s + b2*s**2)/(a0 + a1*s + a2*s**2 + a3*s**3) H = TransferFunction([[h.num[0][0]], [(h*s).num[0][0]]], [[h.den[0][0]], [h.den[0][0]]]) - sys = _convertToStateSpace(H) + sys = _convert_to_statespace(H) H2 = _convert_to_transfer_function(sys) np.testing.assert_array_almost_equal(H.num[0][0], H2.num[0][0]) np.testing.assert_array_almost_equal(H.den[0][0], H2.den[0][0]) diff --git a/control/timeresp.py b/control/timeresp.py index f4f293bdf..a5cc245bf 100644 --- a/control/timeresp.py +++ b/control/timeresp.py @@ -79,7 +79,7 @@ atleast_1d) import warnings from .lti import LTI # base class of StateSpace, TransferFunction -from .statesp import _convertToStateSpace, _mimo2simo, _mimo2siso, ssdata +from .statesp import _convert_to_statespace, _mimo2simo, _mimo2siso, ssdata from .lti import isdtime, isctime __all__ = ['forced_response', 'step_response', 'step_info', 'initial_response', @@ -271,7 +271,7 @@ def forced_response(sys, T=None, U=0., X0=0., transpose=False, if not isinstance(sys, LTI): raise TypeError('Parameter ``sys``: must be a ``LTI`` object. ' '(For example ``StateSpace`` or ``TransferFunction``)') - sys = _convertToStateSpace(sys) + sys = _convert_to_statespace(sys) A, B, C, D = np.asarray(sys.A), np.asarray(sys.B), np.asarray(sys.C), \ np.asarray(sys.D) # d_type = A.dtype @@ -436,7 +436,7 @@ def _get_ss_simo(sys, input=None, output=None): If input is not specified, select first input and issue warning """ - sys_ss = _convertToStateSpace(sys) + sys_ss = _convert_to_statespace(sys) if sys_ss.issiso(): return sys_ss warn = False @@ -891,7 +891,7 @@ def _ideal_tfinal_and_dt(sys, is_step=True): dt = sys.dt if isdtime(sys, strict=True) else default_dt elif isdtime(sys, strict=True): dt = sys.dt - A = _convertToStateSpace(sys).A + A = _convert_to_statespace(sys).A tfinal = default_tfinal p = eigvals(A) # Array Masks @@ -931,7 +931,7 @@ def _ideal_tfinal_and_dt(sys, is_step=True): if p_int.size > 0: tfinal = tfinal * 5 else: # cont time - sys_ss = _convertToStateSpace(sys) + sys_ss = _convert_to_statespace(sys) # Improve conditioning via balancing and zeroing tiny entries # See for [[1,2,0], [9,1,0.01], [1,2,10*np.pi]] before/after balance b, (sca, perm) = matrix_balance(sys_ss.A, separate=True) From 766943b88879f14e5e2acd76fe7891e816a380cf Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 3 Jan 2021 16:41:42 -0800 Subject: [PATCH 14/18] fix converstion exceptions to be TypeError --- control/iosys.py | 6 +++--- control/statesp.py | 7 ++----- control/xferfcn.py | 7 ++----- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 94b8234c6..efce73e49 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -220,7 +220,7 @@ def __mul__(sys2, sys1): raise NotImplemented("Matrix multiplication not yet implemented") elif not isinstance(sys1, InputOutputSystem): - raise ValueError("Unknown I/O system object ", sys1) + raise TypeError("Unknown I/O system object ", sys1) # Make sure systems can be interconnected if sys1.noutputs != sys2.ninputs: @@ -263,7 +263,7 @@ def __rmul__(sys1, sys2): raise NotImplemented("Matrix multiplication not yet implemented") elif not isinstance(sys2, InputOutputSystem): - raise ValueError("Unknown I/O system object ", sys1) + raise TypeError("Unknown I/O system object ", sys1) else: # Both systems are InputOutputSystems => use __mul__ @@ -281,7 +281,7 @@ def __add__(sys1, sys2): raise NotImplemented("Matrix addition not yet implemented") elif not isinstance(sys2, InputOutputSystem): - raise ValueError("Unknown I/O system object ", sys2) + raise TypeError("Unknown I/O system object ", sys2) # Make sure number of input and outputs match if sys1.ninputs != sys2.ninputs or sys1.noutputs != sys2.noutputs: diff --git a/control/statesp.py b/control/statesp.py index a37ab788e..b899666d1 100644 --- a/control/statesp.py +++ b/control/statesp.py @@ -1256,11 +1256,8 @@ def _convert_to_statespace(sys, **kw): try: D = _ssmatrix(sys) return StateSpace([], [], [], D) - except Exception as e: - print("Failure to assume argument is matrix-like in" - " _convert_to_statespace, result %s" % e) - - raise TypeError("Can't convert given type to StateSpace system.") + except: + raise TypeError("Can't convert given type to StateSpace system.") # TODO: add discrete time option diff --git a/control/xferfcn.py b/control/xferfcn.py index 5dc12c30a..2886c7ee7 100644 --- a/control/xferfcn.py +++ b/control/xferfcn.py @@ -1319,11 +1319,8 @@ def _convert_to_transfer_function(sys, **kw): num = [[[D[i, j]] for j in range(inputs)] for i in range(outputs)] den = [[[1] for j in range(inputs)] for i in range(outputs)] return TransferFunction(num, den) - except Exception as e: - print("Failure to assume argument is matrix-like in" - " _convertToTransferFunction, result %s" % e) - - raise TypeError("Can't convert given type to TransferFunction system.") + except: + raise TypeError("Can't convert given type to TransferFunction system.") def tf(*args, **kwargs): From 535638811b49b438fd95d446e8e49b4832cc2c93 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 3 Jan 2021 21:19:25 -0800 Subject: [PATCH 15/18] add unit tests for checking type converstions --- control/tests/type_conversion_test.py | 119 ++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 control/tests/type_conversion_test.py diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py new file mode 100644 index 000000000..d574263a3 --- /dev/null +++ b/control/tests/type_conversion_test.py @@ -0,0 +1,119 @@ +# type_conversion_test.py - test type conversions +# RMM, 3 Jan 2021 +# +# This set of tests looks at how various classes are converted when using +# algebraic operations. See GitHub issue #459 for some discussion on what the +# desired combinations should be. + +import control as ct +import numpy as np +import operator +import pytest + +@pytest.fixture() +def sys_dict(): + sdict = {} + sdict['ss'] = ct.ss([[-1]], [[1]], [[1]], [[0]]) + sdict['tf'] = ct.tf([1],[0.5, 1]) + sdict['frd'] = ct.frd([10+0j, 9 + 1j, 8 + 2j], [1,2,3]) + sdict['lio'] = ct.LinearIOSystem(ct.ss([[-1]], [[5]], [[5]], [[0]])) + sdict['ios'] = ct.NonlinearIOSystem( + sdict['lio']._rhs, sdict['lio']._out, 1, 1, 1) + sdict['arr'] = np.array([[2.0]]) + sdict['flt'] = 3. + return sdict + +type_dict = { + 'ss': ct.StateSpace, 'tf': ct.TransferFunction, + 'frd': ct.FrequencyResponseData, 'lio': ct.LinearICSystem, + 'ios': ct.InterconnectedSystem, 'arr': np.ndarray, 'flt': float} + +# +# Table of expected conversions +# +# This table describes all of the conversions that are supposed to +# happen for various system combinations. This is written out this way +# to make it easy to read, but this is converted below into a list of +# specific tests that can be iterated over. +# +# Items marked as 'E' should generate an exception. +# +# Items starting with 'x' currently generate an expected exception but +# should eventually generate a useful result (when everything is +# implemented properly). +# +# Note 1: some of the entries below are currently converted to to lower level +# types than needed. In particular, LinearIOSystems should combine with +# StateSpace and TransferFunctions in a way that preserves I/O system +# structure when possible. +# +# Note 2: eventually the operator entry for this table can be pulled out and +# tested as a separate parameterized variable (since all operators should +# return consistent values). + +rtype_list = ['ss', 'tf', 'frd', 'lio', 'ios', 'arr', 'flt'] +conversion_table = [ + # op left ss tf frd lio ios arr flt + ('add', 'ss', ['ss', 'ss', 'xrd', 'ss', 'xos', 'ss', 'ss' ]), + ('add', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), + ('add', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'xrd']), + ('add', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), + ('add', 'ios', ['xos', 'xos', 'E', 'ios', 'ios', 'xos', 'xos']), + ('add', 'arr', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), + ('add', 'flt', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'flt']), + + # op left ss tf frd lio ios arr flt + ('sub', 'ss', ['ss', 'ss', 'xrd', 'ss', 'xos', 'ss', 'ss' ]), + ('sub', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), + ('sub', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'xrd']), + ('sub', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), + ('sub', 'ios', ['xos', 'xio', 'E', 'ios', 'xos' 'xos', 'xos']), + ('sub', 'arr', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), + ('sub', 'flt', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'flt']), + + # op left ss tf frd lio ios arr flt + ('mul', 'ss', ['ss', 'ss', 'xrd', 'xio', 'xos', 'ss', 'ss' ]), + ('mul', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), + ('mul', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'frd']), + ('mul', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), + ('mul', 'ios', ['xos', 'xos', 'E', 'ios', 'ios', 'xos', 'xos']), + ('mul', 'arr', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), + ('mul', 'flt', ['ss', 'tf', 'frd', 'xio', 'xos', 'arr', 'flt']), + + # op left ss tf frd lio ios arr flt + ('truediv', 'ss', ['xs', 'tf', 'xrd', 'xio', 'xos', 'xs', 'xs' ]), + ('truediv', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), + ('truediv', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'frd']), + ('truediv', 'lio', ['xio', 'tf', 'xrd', 'xio', 'xio', 'xio', 'xio']), + ('truediv', 'ios', ['xos', 'xos', 'E', 'xos', 'xos' 'xos', 'xos']), + ('truediv', 'arr', ['xs', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), + ('truediv', 'flt', ['xs', 'tf', 'frd', 'xio', 'xos', 'arr', 'flt'])] + +# Now create list of the tests we actually want to run +test_matrix = [] +for i, (opname, ltype, expected_list) in enumerate(conversion_table): + for rtype, expected in zip(rtype_list, expected_list): + # Add this to the list of tests to run + test_matrix.append([opname, ltype, rtype, expected]) + +@pytest.mark.parametrize("opname, ltype, rtype, expected", test_matrix) +def test_xferfcn_ndarray_precedence(opname, ltype, rtype, expected, sys_dict): + op = getattr(operator, opname) + leftsys = sys_dict[ltype] + rightsys = sys_dict[rtype] + + # Get rid of warnings for InputOutputSystem objects by making a copy + if isinstance(leftsys, ct.InputOutputSystem) and leftsys == rightsys: + rightsys = leftsys.copy() + + # Make sure we get the right result + if expected == 'E' or expected[0] == 'x': + # Exception expected + with pytest.raises(TypeError): + op(leftsys, rightsys) + else: + # Operation should work and return the given type + result = op(leftsys, rightsys) + + # Print out what we are testing in case something goes wrong + assert isinstance(result, type_dict[expected]) From 08b1c2a0fd5511957afd4317ea3b3c71a11e1769 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 3 Jan 2021 21:38:17 -0800 Subject: [PATCH 16/18] update LinearIOSystem.__rmul__ for Python2/Python3 consistency --- control/iosys.py | 14 +++++++++----- control/tests/type_conversion_test.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index efce73e49..913e8d471 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -254,7 +254,11 @@ def __mul__(sys2, sys1): def __rmul__(sys1, sys2): """Pre-multiply an input/output systems by a scalar/matrix""" - if isinstance(sys2, (int, float, np.number)): + if isinstance(sys2, InputOutputSystem): + # Both systems are InputOutputSystems => use __mul__ + return InputOutputSystem.__mul__(sys2, sys1) + + elif isinstance(sys2, (int, float, np.number)): # TODO: Scale the output raise NotImplemented("Scalar multiplication not yet implemented") @@ -262,12 +266,12 @@ def __rmul__(sys1, sys2): # TODO: Post-multiply by a matrix raise NotImplemented("Matrix multiplication not yet implemented") - elif not isinstance(sys2, InputOutputSystem): - raise TypeError("Unknown I/O system object ", sys1) + elif isinstance(sys2, StateSpace): + # TODO: Should eventuall preserve LinearIOSystem structure + return StateSpace.__mul__(sys2, sys1) else: - # Both systems are InputOutputSystems => use __mul__ - return InputOutputSystem.__mul__(sys2, sys1) + raise TypeError("Unknown I/O system object ", sys1) def __add__(sys1, sys2): """Add two input/output systems (parallel interconnection)""" diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py index d574263a3..44c6618d8 100644 --- a/control/tests/type_conversion_test.py +++ b/control/tests/type_conversion_test.py @@ -72,7 +72,7 @@ def sys_dict(): ('sub', 'flt', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'flt']), # op left ss tf frd lio ios arr flt - ('mul', 'ss', ['ss', 'ss', 'xrd', 'xio', 'xos', 'ss', 'ss' ]), + ('mul', 'ss', ['ss', 'ss', 'xrd', 'ss', 'xos', 'ss', 'ss' ]), ('mul', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), ('mul', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'frd']), ('mul', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), From 4ca0dac3e421fa8892fe3858307e7c8e210f1824 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Mon, 4 Jan 2021 22:38:40 -0800 Subject: [PATCH 17/18] add (skipped) function for desired binary operator conversions --- control/tests/type_conversion_test.py | 72 ++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py index 44c6618d8..72a02e00f 100644 --- a/control/tests/type_conversion_test.py +++ b/control/tests/type_conversion_test.py @@ -15,6 +15,7 @@ def sys_dict(): sdict = {} sdict['ss'] = ct.ss([[-1]], [[1]], [[1]], [[0]]) sdict['tf'] = ct.tf([1],[0.5, 1]) + sdict['tfx'] = ct.tf([1, 1],[1]) # non-proper transfer function sdict['frd'] = ct.frd([10+0j, 9 + 1j, 8 + 2j], [1,2,3]) sdict['lio'] = ct.LinearIOSystem(ct.ss([[-1]], [[5]], [[5]], [[0]])) sdict['ios'] = ct.NonlinearIOSystem( @@ -29,7 +30,7 @@ def sys_dict(): 'ios': ct.InterconnectedSystem, 'arr': np.ndarray, 'flt': float} # -# Table of expected conversions +# Current table of expected conversions # # This table describes all of the conversions that are supposed to # happen for various system combinations. This is written out this way @@ -50,6 +51,10 @@ def sys_dict(): # Note 2: eventually the operator entry for this table can be pulled out and # tested as a separate parameterized variable (since all operators should # return consistent values). +# +# Note 3: this table documents the current state, but not actually the desired +# state. See bottom of the file for the (eventual) desired behavior. +# rtype_list = ['ss', 'tf', 'frd', 'lio', 'ios', 'arr', 'flt'] conversion_table = [ @@ -97,7 +102,7 @@ def sys_dict(): test_matrix.append([opname, ltype, rtype, expected]) @pytest.mark.parametrize("opname, ltype, rtype, expected", test_matrix) -def test_xferfcn_ndarray_precedence(opname, ltype, rtype, expected, sys_dict): +def test_operator_type_conversion(opname, ltype, rtype, expected, sys_dict): op = getattr(operator, opname) leftsys = sys_dict[ltype] rightsys = sys_dict[rtype] @@ -117,3 +122,66 @@ def test_xferfcn_ndarray_precedence(opname, ltype, rtype, expected, sys_dict): # Print out what we are testing in case something goes wrong assert isinstance(result, type_dict[expected]) + +# +# Updated table that describes desired outputs for all operators +# +# General rules (subject to change) +# +# * For LTI/LTI, keep the type of the left operand whenever possible. This +# prioritizes the first operand, but we need to watch out for non-proper +# transfer functions (in which case TransferFunction should be returned) +# +# * For FRD/LTI, convert LTI to FRD by evaluating the LTI transfer function +# at the FRD frequencies (can't got the other way since we can't convert +# an FRD object to state space/transfer function). +# +# * For IOS/LTI, convert to IOS. In the case of a linear I/O system (LIO), +# this will preserve the linear structure since the LTI system will +# be converted to state space. +# +# * When combining state space or transfer with linear I/O systems, the +# * output should be of type Linear IO system, since that maintains the +# * underlying state space attributes. +# +# Note: tfx = non-proper transfer function, order(num) > order(den) +# + +type_list = ['ss', 'tf', 'tfx', 'frd', 'lio', 'ios', 'arr', 'flt'] +conversion_table = [ + # L / R ['ss', 'tf', 'tfx', 'frd', 'lio', 'ios', 'arr', 'flt'] + ('ss', ['ss', 'ss', 'tf' 'frd', 'lio', 'ios', 'ss', 'ss' ]), + ('tf', ['tf', 'tf', 'tf' 'frd', 'lio', 'ios', 'tf', 'tf' ]), + ('tfx', ['tf', 'tf', 'tf', 'frd', 'E', 'E', 'tf', 'tf' ]), + ('frd', ['frd', 'frd', 'frd', 'frd', 'E', 'E', 'frd', 'frd']), + ('lio', ['lio', 'lio', 'E', 'E', 'lio', 'ios', 'lio', 'lio']), + ('ios', ['ios', 'ios', 'E', 'E', 'ios', 'ios', 'ios', 'ios']), + ('arr', ['ss', 'tf', 'tf' 'frd', 'lio', 'ios', 'arr', 'arr']), + ('flt', ['ss', 'tf', 'tf' 'frd', 'lio', 'ios', 'arr', 'flt'])] + +@pytest.mark.skip(reason="future test; conversions not yet fully implemented") +# @pytest.mark.parametrize("opname", ['add', 'sub', 'mul', 'truediv']) +# @pytest.mark.parametrize("ltype", type_list) +# @pytest.mark.parametrize("rtype", type_list) +def test_binary_op_type_conversions(opname, ltype, rtype, sys_dict): + op = getattr(operator, opname) + leftsys = sys_dict[ltype] + rightsys = sys_dict[rtype] + expected = \ + conversion_table[type_list.index(ltype)][1][type_list.index(rtype)] + + # Get rid of warnings for InputOutputSystem objects by making a copy + if isinstance(leftsys, ct.InputOutputSystem) and leftsys == rightsys: + rightsys = leftsys.copy() + + # Make sure we get the right result + if expected == 'E' or expected[0] == 'x': + # Exception expected + with pytest.raises(TypeError): + op(leftsys, rightsys) + else: + # Operation should work and return the given type + result = op(leftsys, rightsys) + + # Print out what we are testing in case something goes wrong + assert isinstance(result, type_dict[expected]) From 80006e931cc141089a3a7a6b549a859db7efd14c Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Tue, 5 Jan 2021 07:37:33 -0800 Subject: [PATCH 18/18] Update control/tests/type_conversion_test.py Co-authored-by: Ben Greiner --- control/tests/type_conversion_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py index 72a02e00f..3f51c2bbc 100644 --- a/control/tests/type_conversion_test.py +++ b/control/tests/type_conversion_test.py @@ -149,7 +149,7 @@ def test_operator_type_conversion(opname, ltype, rtype, expected, sys_dict): type_list = ['ss', 'tf', 'tfx', 'frd', 'lio', 'ios', 'arr', 'flt'] conversion_table = [ - # L / R ['ss', 'tf', 'tfx', 'frd', 'lio', 'ios', 'arr', 'flt'] + # L \ R ['ss', 'tf', 'tfx', 'frd', 'lio', 'ios', 'arr', 'flt'] ('ss', ['ss', 'ss', 'tf' 'frd', 'lio', 'ios', 'ss', 'ss' ]), ('tf', ['tf', 'tf', 'tf' 'frd', 'lio', 'ios', 'tf', 'tf' ]), ('tfx', ['tf', 'tf', 'tf', 'frd', 'E', 'E', 'tf', 'tf' ]),