diff --git a/control/iosys.py b/control/iosys.py index 2881c5d64..9b48c9ad2 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -54,10 +54,9 @@ class InputOutputSystem(NamedIOSystem): """A class for representing input/output systems. The InputOutputSystem class allows (possibly nonlinear) input/output - systems to be represented in Python. It is intended as a parent - class for a set of subclasses that are used to implement specific - structures and operations for different types of input/output - dynamical systems. + systems to be represented in Python. It is used as a parent class for + a set of subclasses that are used to implement specific structures and + operations for different types of input/output dynamical systems. Parameters ---------- @@ -65,14 +64,16 @@ class for a set of subclasses that are used to implement specific Description of the system inputs. This can be given as an integer count or a list of strings that name the individual signals. If an integer count is specified, the names of the signal will be of the - form `s[i]` (where `s` is one of `u`, `y`, or `x`). If this parameter - is not given or given as `None`, the relevant quantity will be - determined when possible based on other information provided to - functions using the system. + form `s[i]` (where `s` is given by the `input_prefix` parameter and + has default value 'u'). If this parameter is not given or given as + `None`, the relevant quantity will be determined when possible + based on other information provided to functions using the system. outputs : int, list of str, or None - Description of the system outputs. Same format as `inputs`. + Description of the system outputs. Same format as `inputs`, with + the prefix given by output_prefix (defaults to 'y'). states : int, list of str, or None - Description of the system states. Same format as `inputs`. + Description of the system states. Same format as `inputs`, with + the prefix given by state_prefix (defaults to 'x'). dt : None, True or float, optional System timebase. 0 (default) indicates continuous time, True indicates discrete time with unspecified sampling time, positive @@ -103,6 +104,15 @@ class for a set of subclasses that are used to implement specific name : string, optional System name (used for specifying signals) + Other Parameters + ---------------- + input_prefix : string, optional + Set the prefix for input signals. Default = 'u'. + output_prefix : string, optional + Set the prefix for output signals. Default = 'y'. + state_prefix : string, optional + Set the prefix for state signals. Default = 'x'. + Notes ----- The :class:`~control.InputOuputSystem` class (and its subclasses) makes @@ -132,14 +142,13 @@ def __init__(self, params=None, **kwargs): """ # Store the system name, inputs, outputs, and states - name, inputs, outputs, states, dt = _process_namedio_keywords( - kwargs, end=True) + name, inputs, outputs, states, dt = _process_namedio_keywords(kwargs) # Initialize the data structure # Note: don't use super() to override LinearIOSystem/StateSpace MRO NamedIOSystem.__init__( self, inputs=inputs, outputs=outputs, - states=states, name=name, dt=dt) + states=states, name=name, dt=dt, **kwargs) # default parameters self.params = {} if params is None else params.copy() @@ -607,19 +616,13 @@ class LinearIOSystem(InputOutputSystem, StateSpace): Parameters ---------- linsys : StateSpace or TransferFunction - LTI system to be converted + LTI system to be converted. inputs : int, list of str or None, optional - Description of the system inputs. This can be given as an integer - count or as a list of strings that name the individual signals. If an - integer count is specified, the names of the signal will be of the - form `s[i]` (where `s` is one of `u`, `y`, or `x`). If this parameter - is not given or given as `None`, the relevant quantity will be - determined when possible based on other information provided to - functions using the system. + New system input labels (defaults to linsys input labels). outputs : int, list of str or None, optional - Description of the system outputs. Same format as `inputs`. + New system output labels (defaults to linsys output labels). states : int, list of str, or None, optional - Description of the system states. Same format as `inputs`. + New system input labels (defaults to linsys output labels). dt : None, True or float, optional System timebase. 0 (default) indicates continuous time, True indicates discrete time with unspecified sampling time, positive number is @@ -640,6 +643,10 @@ class LinearIOSystem(InputOutputSystem, StateSpace): A, B, C, D See :class:`~control.StateSpace` for inherited attributes. + See Also + -------- + InputOutputSystem : Input/output system class. + """ def __init__(self, linsys, **kwargs): """Create an I/O system from a state space linear system. @@ -659,13 +666,13 @@ def __init__(self, linsys, **kwargs): # Process keyword arguments name, inputs, outputs, states, dt = _process_namedio_keywords( - kwargs, linsys, end=True) + kwargs, linsys) # Create the I/O system object # Note: don't use super() to override StateSpace MRO InputOutputSystem.__init__( self, inputs=inputs, outputs=outputs, states=states, - params=None, dt=dt, name=name) + params=None, dt=dt, name=name, **kwargs) # Initalize additional state space variables StateSpace.__init__( @@ -775,6 +782,10 @@ class NonlinearIOSystem(InputOutputSystem): functions for the system as default values, overriding internal defaults. + See Also + -------- + InputOutputSystem : Input/output system class. + """ def __init__(self, updfcn, outfcn=None, params=None, **kwargs): """Create a nonlinear I/O system given update and output functions.""" @@ -880,27 +891,25 @@ class InterconnectedSystem(InputOutputSystem): whose inputs and outputs are connected via a connection map. The overall system inputs and outputs are subsets of the subsystem inputs and outputs. - See :func:`~control.interconnect` for a list of parameters. + The function :func:`~control.interconnect` should be used to create an + interconnected I/O system since it performs additional argument + processing and checking. """ def __init__(self, syslist, connections=None, inplist=None, outlist=None, params=None, warn_duplicate=None, **kwargs): """Create an I/O system from a list of systems + connection info.""" # Convert input and output names to lists if they aren't already - if inplist is not None and not isinstance(inplist, (list, tuple)): + if inplist is not None and not isinstance(inplist, list): inplist = [inplist] - if outlist is not None and not isinstance(outlist, (list, tuple)): + if outlist is not None and not isinstance(outlist, list): outlist = [outlist] # Check if dt argument was given; if not, pull from systems dt = kwargs.pop('dt', None) # Process keyword arguments (except dt) - defaults = { - 'inputs': len(inplist or []), - 'outputs': len(outlist or [])} - name, inputs, outputs, states, _ = _process_namedio_keywords( - kwargs, defaults, end=True) + name, inputs, outputs, states, _ = _process_namedio_keywords(kwargs) # Initialize the system list and index self.syslist = list(syslist) # insure modifications can be made @@ -990,22 +999,36 @@ def __init__(self, syslist, connections=None, inplist=None, outlist=None, f"construction of state labels failed; found: " f"{len(states)} labels; expecting {nstates}") + # Figure out what the inputs and outputs are + if inputs is None and inplist is not None: + inputs = len(inplist) + + if outputs is None and outlist is not None: + outputs = len(outlist) + # Create the I/O system # Note: don't use super() to override LinearICSystem/StateSpace MRO InputOutputSystem.__init__( self, inputs=inputs, outputs=outputs, - states=states, params=params, dt=dt, name=name) + states=states, params=params, dt=dt, name=name, **kwargs) # Convert the list of interconnections to a connection map (matrix) self.connect_map = np.zeros((ninputs, noutputs)) for connection in connections or []: - input_index = self._parse_input_spec(connection[0]) + input_indices = self._parse_input_spec(connection[0]) for output_spec in connection[1:]: - output_index, gain = self._parse_output_spec(output_spec) - if self.connect_map[input_index, output_index] != 0: - warn("multiple connections given for input %d" % - input_index + ". Combining with previous entries.") - self.connect_map[input_index, output_index] += gain + output_indices, gain = self._parse_output_spec(output_spec) + if len(output_indices) != len(input_indices): + raise ValueError( + f"inconsistent number of signals in connecting" + f" '{output_spec}' to '{connection[0]}'") + + for input_index, output_index in zip( + input_indices, output_indices): + if self.connect_map[input_index, output_index] != 0: + warn("multiple connections given for input %d" % + input_index + ". Combining with previous entries.") + self.connect_map[input_index, output_index] += gain # Convert the input list to a matrix: maps system to subsystems self.input_map = np.zeros((ninputs, self.ninputs)) @@ -1016,11 +1039,12 @@ def __init__(self, syslist, connections=None, inplist=None, outlist=None, raise ValueError("specifications in inplist must be of type " "int, str, tuple or list.") for spec in inpspec: - ulist_index = self._parse_input_spec(spec) - if self.input_map[ulist_index, index] != 0: - warn("multiple connections given for input %d" % - index + ". Combining with previous entries.") - self.input_map[ulist_index, index] += 1 + ulist_indices = self._parse_input_spec(spec) + for j, ulist_index in enumerate(ulist_indices): + if self.input_map[ulist_index, index] != 0: + warn("multiple connections given for input %d" % + index + ". Combining with previous entries.") + self.input_map[ulist_index, index + j] += 1 # Convert the output list to a matrix: maps subsystems to system self.output_map = np.zeros((self.noutputs, noutputs + ninputs)) @@ -1031,14 +1055,12 @@ def __init__(self, syslist, connections=None, inplist=None, outlist=None, raise ValueError("specifications in outlist must be of type " "int, str, tuple or list.") for spec in outspec: - ylist_index, gain = self._parse_output_spec(spec) - if self.output_map[index, ylist_index] != 0: - warn("multiple connections given for output %d" % - index + ". Combining with previous entries.") - self.output_map[index, ylist_index] += gain - - # Save the parameters for the system - self.params = {} if params is None else params.copy() + ylist_indices, gain = self._parse_output_spec(spec) + for j, ylist_index in enumerate(ylist_indices): + if self.output_map[index, ylist_index] != 0: + warn("multiple connections given for output %d" % + index + ". Combining with previous entries.") + self.output_map[index + j, ylist_index] += gain def _update_params(self, params, warning=False): for sys in self.syslist: @@ -1142,166 +1164,35 @@ def _compute_static_io(self, t, x, u): return ulist, ylist def _parse_input_spec(self, spec): - """Parse an input specification and returns the index - - This function parses a specification of an input of an interconnected - system component and returns the index of that input in the internal - input vector. Input specifications are of one of the following forms: - - i first input for the ith system - (i,) first input for the ith system - (i, j) jth input for the ith system - 'sys.sig' signal 'sig' in subsys 'sys' - ('sys', 'sig') signal 'sig' in subsys 'sys' - - The function returns an index into the input vector array and - the gain to use for that input. - - """ + """Parse an input specification and returns the indices.""" # Parse the signal that we received - subsys_index, input_index, gain = self._parse_signal(spec, 'input') + subsys_index, input_indices, gain = _parse_spec( + self.syslist, spec, 'input') if gain != 1: raise ValueError("gain not allowed in spec '%s'." % str(spec)) - # Return the index into the input vector list (ylist) - return self.input_offset[subsys_index] + input_index + # Return the indices into the input vector list (ylist) + return [self.input_offset[subsys_index] + i for i in input_indices] def _parse_output_spec(self, spec): - """Parse an output specification and returns the index and gain - - This function parses a specification of an output of an - interconnected system component and returns the index of that - output in the internal output vector (ylist). Output specifications - are of one of the following forms: - - i first output for the ith system - (i,) first output for the ith system - (i, j) jth output for the ith system - (i, j, gain) jth output for the ith system with gain - 'sys.sig' signal 'sig' in subsys 'sys' - '-sys.sig' signal 'sig' in subsys 'sys' with gain -1 - ('sys', 'sig', gain) signal 'sig' in subsys 'sys' with gain - - If the gain is not specified, it is taken to be 1. Numbered outputs - must be chosen from the list of subsystem outputs, but named outputs - can also be contained in the list of subsystem inputs. - - The function returns an index into the output vector array and - the gain to use for that output. - - """ + """Parse an output specification and returns the indices and gain.""" # Parse the rest of the spec with standard signal parsing routine try: # Start by looking in the set of subsystem outputs - subsys_index, output_index, gain = \ - self._parse_signal(spec, 'output') - - # Return the index into the input vector list (ylist) - return self.output_offset[subsys_index] + output_index, gain + subsys_index, output_indices, gain = \ + _parse_spec(self.syslist, spec, 'output') + output_offset = self.output_offset[subsys_index] except ValueError: # Try looking in the set of subsystem *inputs* - subsys_index, input_index, gain = self._parse_signal( - spec, 'input or output', dictname='input_index') + subsys_index, output_indices, gain = _parse_spec( + self.syslist, spec, 'input or output', dictname='input_index') # Return the index into the input vector list (ylist) - noutputs = sum(sys.noutputs for sys in self.syslist) - return noutputs + \ - self.input_offset[subsys_index] + input_index, gain - - def _parse_signal(self, spec, signame='input', dictname=None): - """Parse a signal specification, returning system and signal index. - - Signal specifications are of one of the following forms: - - i system_index = i, signal_index = 0 - (i,) system_index = i, signal_index = 0 - (i, j) system_index = i, signal_index = j - 'sys.sig' signal 'sig' in subsys 'sys' - ('sys', 'sig') signal 'sig' in subsys 'sys' - ('sys', j) signal_index j in subsys 'sys' - - The function returns an index into the input vector array and - the gain to use for that input. - """ - import re - - gain = 1 # Default gain - - # Check for special forms of the input - if isinstance(spec, tuple) and len(spec) == 3: - gain = spec[2] - spec = spec[:2] - elif isinstance(spec, str) and spec[0] == '-': - gain = -1 - spec = spec[1:] - - # Process cases where we are given indices as integers - if isinstance(spec, int): - return spec, 0, gain - - elif isinstance(spec, tuple) and len(spec) == 1 \ - and isinstance(spec[0], int): - return spec[0], 0, gain + output_offset = sum(sys.noutputs for sys in self.syslist) + \ + self.input_offset[subsys_index] - elif isinstance(spec, tuple) and len(spec) == 2 \ - and all([isinstance(index, int) for index in spec]): - return spec + (gain,) - - # Figure out the name of the dictionary to use - if dictname is None: - dictname = signame + '_index' - - if isinstance(spec, str): - # If we got a dotted string, break up into pieces - namelist = re.split(r'\.', spec) - - # For now, only allow signal level of system name - # TODO: expand to allow nested signal names - if len(namelist) != 2: - raise ValueError("Couldn't parse %s signal reference '%s'." - % (signame, spec)) - - system_index = self._find_system(namelist[0]) - if system_index is None: - raise ValueError("Couldn't find system '%s'." % namelist[0]) - - signal_index = self.syslist[system_index]._find_signal( - namelist[1], getattr(self.syslist[system_index], dictname)) - if signal_index is None: - raise ValueError("Couldn't find %s signal '%s.%s'." % - (signame, namelist[0], namelist[1])) - - return system_index, signal_index, gain - - # Handle the ('sys', 'sig'), (i, j), and mixed cases - elif isinstance(spec, tuple) and len(spec) == 2 and \ - isinstance(spec[0], (str, int)) and \ - isinstance(spec[1], (str, int)): - if isinstance(spec[0], int): - system_index = spec[0] - if system_index < 0 or system_index > len(self.syslist): - system_index = None - else: - system_index = self._find_system(spec[0]) - if system_index is None: - raise ValueError("Couldn't find system '%s'." % spec[0]) - - if isinstance(spec[1], int): - signal_index = spec[1] - # TODO (later): check against max length of appropriate list? - if signal_index < 0: - system_index = None - else: - signal_index = self.syslist[system_index]._find_signal( - spec[1], getattr(self.syslist[system_index], dictname)) - if signal_index is None: - raise ValueError("Couldn't find signal %s.%s." % tuple(spec)) - - return system_index, signal_index, gain - - else: - raise ValueError("Couldn't parse signal reference %s." % str(spec)) + return [output_offset + i for i in output_indices], gain def _find_system(self, name): return self.syslist_index.get(name, None) @@ -1348,7 +1239,9 @@ def set_output_map(self, output_map): ---------- output_map : 2D array Specify the matrix that will be used to multiply the vector of - subsystem outputs to obtain the vector of system outputs. + subsystem outputs concatenated with subsystem inputs to obtain + the vector of system outputs. + """ # Figure out the number of internal inputs and outputs ninputs = sum(sys.ninputs for sys in self.syslist) @@ -1486,8 +1379,10 @@ def check_unused_signals( f"{ignore_input} in subsystems") ignore_input_map.update(ignore_idxs) else: - ignore_input_map[self._parse_signal( - ignore_input, 'input')[:2]] = ignore_input + isys, isigs = _parse_spec( + self.syslist, ignore_input, 'input')[:2] + for isig in isigs: + ignore_input_map[(isys, isig)] = ignore_input # (osys, osig) -> signal-spec ignore_output_map = {} @@ -1499,8 +1394,10 @@ def check_unused_signals( f"{ignore_output} in subsystems") ignore_output_map.update(ignore_found) else: - ignore_output_map[self._parse_signal( - ignore_output, 'output')[:2]] = ignore_output + osys, osigs = _parse_spec( + self.syslist, ignore_output, 'output')[:2] + for osig in osigs: + ignore_output_map[(osys, osig)] = ignore_output dropped_inputs = set(unused_inputs) - set(ignore_input_map) dropped_outputs = set(unused_outputs) - set(ignore_output_map) @@ -2458,13 +2355,13 @@ def rss(states=1, outputs=1, inputs=1, strictly_proper=False, **kwargs): Returns ------- - sys : StateSpace - The randomly created linear system + sys : LinearIOSystem + The randomly created linear system. Raises ------ ValueError - if any input is not a positive integer + if any input is not a positive integer. Notes ----- @@ -2477,8 +2374,7 @@ def rss(states=1, outputs=1, inputs=1, strictly_proper=False, **kwargs): """ # Process keyword arguments kwargs.update({'states': states, 'outputs': outputs, 'inputs': inputs}) - name, inputs, outputs, states, dt = _process_namedio_keywords( - kwargs, end=True) + name, inputs, outputs, states, dt = _process_namedio_keywords(kwargs) # Figure out the size of the sytem nstates, _ = _process_signal_list(states) @@ -2490,7 +2386,8 @@ def rss(states=1, outputs=1, inputs=1, strictly_proper=False, **kwargs): strictly_proper=strictly_proper) return LinearIOSystem( - sys, name=name, states=states, inputs=inputs, outputs=outputs, dt=dt) + sys, name=name, states=states, inputs=inputs, outputs=outputs, dt=dt, + **kwargs) def drss(*args, **kwargs): @@ -2621,7 +2518,7 @@ def tf2io(*args, **kwargs): def interconnect( syslist, connections=None, inplist=None, outlist=None, params=None, check_unused=True, add_unused=False, ignore_inputs=None, - ignore_outputs=None, warn_duplicate=None, **kwargs): + ignore_outputs=None, warn_duplicate=None, debug=False, **kwargs): """Interconnect a set of input/output systems. This function creates a new system that is an interconnection of a set of @@ -2648,23 +2545,28 @@ def interconnect( [input-spec, output-spec1, output-spec2, ...] The input-spec can be in a number of different forms. The lowest - level representation is a tuple of the form `(subsys_i, inp_j)` where - `subsys_i` is the index into `syslist` and `inp_j` is the index into - the input vector for the subsystem. If `subsys_i` has a single input, - then the subsystem index `subsys_i` can be listed as the input-spec. - If systems and signals are given names, then the form 'sys.sig' or - ('sys', 'sig') are also recognized. - - Similarly, each output-spec should describe an output signal from one - of the subsystems. The lowest level representation is a tuple of the - form `(subsys_i, out_j, gain)`. The input will be constructed by - summing the listed outputs after multiplying by the gain term. If the - gain term is omitted, it is assumed to be 1. If the system has a - single output, then the subsystem index `subsys_i` can be listed as - the input-spec. If systems and signals are given names, then the form - 'sys.sig', ('sys', 'sig') or ('sys', 'sig', gain) are also recognized, - and the special form '-sys.sig' can be used to specify a signal with - gain -1. + level representation is a tuple of the form `(subsys_i, inp_j)` + where `subsys_i` is the index into `syslist` and `inp_j` is the + index into the input vector for the subsystem. If the signal index + is omitted, then all subsystem inputs are used. If systems and + signals are given names, then the forms 'sys.sig' or ('sys', 'sig') + are also recognized. Finally, for multivariable systems the signal + index can be given as a list, for example '(subsys_i, [inp_j1, ..., + inp_jn])'; as a slice, for example, 'sys.sig[i:j]'; or as a base + name `sys.sig` (which matches `sys.sig[i]`). + + Similarly, each output-spec should describe an output signal from + one of the subsystems. The lowest level representation is a tuple + of the form `(subsys_i, out_j, gain)`. The input will be + constructed by summing the listed outputs after multiplying by the + gain term. If the gain term is omitted, it is assumed to be 1. If + the subsystem index `subsys_i` is omitted, then all outputs of the + subsystem are used. If systems and signals are given names, then + the form 'sys.sig', ('sys', 'sig') or ('sys', 'sig', gain) are also + recognized, and the special form '-sys.sig' can be used to specify + a signal with gain -1. Lists, slices, and base namess can also be + used, as long as the number of elements for each output spec + mataches the input spec. If omitted, the `interconnect` function will attempt to create the interconnection map by connecting all signals with the same base names @@ -2688,20 +2590,20 @@ def interconnect( [input-spec1, input-spec2, ...] - Each system input is added to the input for the listed subsystem. If - the system input connects to only one subsystem input, a single input - specification can be given (without the inner list). + Each system input is added to the input for the listed subsystem. + If the system input connects to a subsystem with a single input, a + single input specification can be given (without the inner list). If omitted the `input` parameter will be used to identify the list of input signals to the overall system. outlist : list of output connections, optional - List of connections for how the outputs from the subsystems are mapped - to overall system outputs. The output connection description is the - same as the form defined in the inplist specification (including the - optional gain term). Numbered outputs must be chosen from the list of - subsystem outputs, but named outputs can also be contained in the list - of subsystem inputs. + List of connections for how the outputs from the subsystems are + mapped to overall system outputs. The output connection + description is the same as the form defined in the inplist + specification (including the optional gain term). Numbered outputs + must be chosen from the list of subsystem outputs, but named + outputs can also be contained in the list of subsystem inputs. If an output connection contains more than one signal specification, then those signals are added together (multiplying by the any gain @@ -2781,6 +2683,10 @@ def interconnect( systems that have non-generic names. If `False`, warnings are not generated and if `True` then warnings are always generated. + debug : bool, default=False + Print out information about how signals are being processed that + may be useful in understanding why something is not working. + Examples -------- @@ -2788,16 +2694,30 @@ def interconnect( >>> C = ct.rss(2, 2, 2, name='C') >>> T = ct.interconnect( ... [P, C], - ... connections = [ + ... connections=[ ... ['P.u[0]', 'C.y[0]'], ['P.u[1]', 'C.y[1]'], ... ['C.u[0]', '-P.y[0]'], ['C.u[1]', '-P.y[1]']], - ... inplist = ['C.u[0]', 'C.u[1]'], - ... outlist = ['P.y[0]', 'P.y[1]'], + ... inplist=['C.u[0]', 'C.u[1]'], + ... outlist=['P.y[0]', 'P.y[1]'], ... ) - For a SISO system, this example can be simplified by using the - :func:`~control.summing_block` function and the ability to automatically - interconnect signals with the same names: + This expression can be simplified using either slice notation or + just signal basenames: + + >>> T = ct.interconnect( + ... [P, C], connections=[['P.u[:]', 'C.y[:]'], ['C.u', '-P.y']], + ... inplist='C.u', outlist='P.y[:]') + + or further simplified by omitting the input and output signal + specifications (since all inputs and outputs are used): + + >>> T = ct.interconnect( + ... [P, C], connections=[['P', 'C'], ['C', '-P']], + ... inplist=['C'], outlist=['P']) + + A feedback system can also be constructed using the + :func:`~control.summing_block` function and the ability to + automatically interconnect signals with the same names: >>> P = ct.tf(1, [1, 0], inputs='u', outputs='y') >>> C = ct.tf(10, [1, 1], inputs='e', outputs='u') @@ -2811,14 +2731,22 @@ def interconnect( name of the new system determined by adding the prefix and suffix strings in config.defaults['namedio.linearized_system_name_prefix'] and config.defaults['namedio.linearized_system_name_suffix'], with the - default being to add the suffix '$copy'$ to the system name. + default being to add the suffix '$copy' to the system name. - It is possible to replace lists in most of arguments with tuples instead, - but strictly speaking the only use of tuples should be in the + In addition to explicit lists of system signals, it is possible to + lists vectors of signals, using one of the following forms:: + + (subsys, [i1, ..., iN], gain) signals with indices i1, ..., in + 'sysname.signal[i:j]' range of signal names, i through j-1 + 'sysname.signal[:]' all signals with given prefix + + While in many Python functions tuples can be used in place of lists, + for the interconnect() function the only use of tuples should be in the specification of an input- or output-signal via the tuple notation `(subsys_i, signal_j, gain)` (where `gain` is optional). If you get an - unexpected error message about a specification being of the wrong type, - check your use of tuples. + unexpected error message about a specification being of the wrong type + or not being found, check to make sure you are not using a tuple where + you should be using a list. In addition to its use for general nonlinear I/O systems, the :func:`~control.interconnect` function allows linear systems to be @@ -2832,8 +2760,7 @@ def interconnect( """ dt = kwargs.pop('dt', None) # by pass normal 'dt' processing - name, inputs, outputs, states, _ = _process_namedio_keywords( - kwargs, end=True) + name, inputs, outputs, states, _ = _process_namedio_keywords(kwargs) if not check_unused and (ignore_inputs or ignore_outputs): raise ValueError('check_unused is False, but either ' @@ -2865,72 +2792,245 @@ def interconnect( # Use an empty connections list connections = [] + elif isinstance(connections, list) and \ + all([isinstance(cnxn, (str, tuple)) for cnxn in connections]): + # Special case where there is a single connection + connections = [connections] + # If inplist/outlist is not present, try using inputs/outputs instead + inplist_none, outlist_none = False, False if inplist is None: - inplist = list(inputs or []) + inplist = inputs or [] + inplist_none = True # use to rewrite inputs below if outlist is None: - outlist = list(outputs or []) + outlist = outputs or [] + outlist_none = True # use to rewrite outputs below + + # Define a local debugging function + dprint = lambda s: None if not debug else print(s) + + # + # Pre-process connecton list + # + # Support for various "vector" forms of specifications is handled here, + # by expanding any specifications that refer to more than one signal. + # This includes signal lists such as ('sysname', ['sig1', 'sig2', ...]) + # as well as slice-based specifications such as 'sysname.signal[i:j]'. + # + dprint(f"Pre-processing connections:") + new_connections = [] + for connection in connections: + dprint(f" parsing {connection=}") + if not isinstance(connection, list): + raise ValueError( + f"invalid connection {connection}: should be a list") + # Parse and expand the input specification + input_spec = _parse_spec(syslist, connection[0], 'input') + input_spec_list = [input_spec] + + # Parse and expand the output specifications + output_specs_list = [[]] * len(input_spec_list) + for spec in connection[1:]: + output_spec = _parse_spec(syslist, spec, 'output') + output_specs_list[0].append(output_spec) + + # Create the new connection entry + for input_spec, output_specs in zip(input_spec_list, output_specs_list): + new_connection = [input_spec] + output_specs + dprint(f" adding {new_connection=}") + new_connections.append(new_connection) + connections = new_connections - # Process input list - if not isinstance(inplist, (list, tuple)): + # + # Pre-process input connections list + # + # Similar to the connections list, we now handle "vector" forms of + # specifications in the inplist parameter. This needs to be handled + # here because the InterconnectedSystem constructor assumes that the + # number of elements in `inplist` will match the number of inputs for + # the interconnected system. + # + # If inplist_none is True then inplist is a copy of inputs and so we + # also have to be careful that if we encounter any multivariable + # signals, we need to update the input list. + # + dprint(f"Pre-processing input connections: {inplist}") + if not isinstance(inplist, list): + dprint(f" converting inplist to list") inplist = [inplist] - new_inplist = [] - for signal in inplist: - # Create an empty connection and append to inplist - connection = [] + new_inplist, new_inputs = [], [] if inplist_none else inputs - # Check for signal names without a system name - if isinstance(signal, str) and len(signal.split('.')) == 1: - # Get the signal name - signal_name = signal[1:] if signal[0] == '-' else signal - sign = '-' if signal[0] == '-' else "" + # Go through the list of inputs and process each one + for iinp, connection in enumerate(inplist): + # Check for system name or signal names without a system name + if isinstance(connection, str) and len(connection.split('.')) == 1: + # Create an empty connections list to store matching connections + new_connections = [] - # Look for the signal name as a system input - for sys in syslist: - if signal_name in sys.input_labels: - connection.append(sign + sys.name + "." + signal_name) + # Get the signal/system name + sname = connection[1:] if connection[0] == '-' else connection + gain = -1 if connection[0] == '-' else 1 - # Make sure we found the name - if len(connection) == 0: - raise ValueError("could not find signal %s" % signal_name) - else: - new_inplist.append(connection) + # Look for the signal name as a system input + found_system, found_signal = False, False + for isys, sys in enumerate(syslist): + # Look for matching signals (returns None if no matches + indices = sys._find_signals(sname, sys.input_index) + + # See what types of matches we found + if sname == sys.name: + # System name matches => use all inputs + for isig in range(sys.ninputs): + dprint(f" adding input {(isys, isig, gain)}") + new_inplist.append((isys, isig, gain)) + found_system = True + elif indices: + # Signal name matches => store new connections + new_connection = [] + for isig in indices: + dprint(f" collecting input {(isys, isig, gain)}") + new_connection.append((isys, isig, gain)) + + if len(new_connections) == 0: + # First time we have seen this signal => initalize + for cnx in new_connection: + new_connections.append([cnx]) + if inplist_none: + # See if we need to rewrite the inputs + if len(new_connection) != 1: + new_inputs += [ + sys.input_labels[i] for i in indices] + else: + new_inputs.append(inputs[iinp]) + else: + # Additional signal match found =. add to the list + for i, cnx in enumerate(new_connection): + new_connections[i].append(cnx) + found_signal = True + + if found_system and found_signal: + raise ValueError( + f"signal '{sname}' is both signal and system name") + elif found_signal: + dprint(f" adding inputs {new_connections}") + new_inplist += new_connections + elif not found_system: + raise ValueError("could not find signal %s" % sname) else: - new_inplist.append(signal) - inplist = new_inplist + # Regular signal specification + if not isinstance(connection, list): + dprint(f" converting item to list") + connection = [connection] + for spec in connection: + isys, indices, gain = _parse_spec(syslist, spec, 'input') + for isig in indices: + dprint(f" adding input {(isys, isig, gain)}") + new_inplist.append((isys, isig, gain)) + inplist, inputs = new_inplist, new_inputs + dprint(f" {inplist=}\n {inputs=}") - # Process output list - if not isinstance(outlist, (list, tuple)): + # + # Pre-process output list + # + # This is similar to the processing of the input list, but we need to + # additionally take into account the fact that you can list subsystem + # inputs as system outputs. + # + dprint(f"Pre-processing output connections: {outlist}") + if not isinstance(outlist, list): + dprint(f" converting outlist to list") outlist = [outlist] - new_outlist = [] - for signal in outlist: - # Create an empty connection and append to inplist - connection = [] + new_outlist, new_outputs = [], [] if outlist_none else outputs + for iout, connection in enumerate(outlist): + # Create an empty connection list + new_connections = [] - # Check for signal names without a system name - if isinstance(signal, str) and len(signal.split('.')) == 1: - # Get the signal name - signal_name = signal[1:] if signal[0] == '-' else signal - sign = '-' if signal[0] == '-' else "" + # Check for system name or signal names without a system name + if isinstance(connection, str) and len(connection.split('.')) == 1: + # Get the signal/system name + sname = connection[1:] if connection[0] == '-' else connection + gain = -1 if connection[0] == '-' else 1 # Look for the signal name as a system output - for sys in syslist: - if signal_name in sys.output_index.keys(): - connection.append(sign + sys.name + "." + signal_name) - - # Make sure we found the name - if len(connection) == 0: - raise ValueError("could not find signal %s" % signal_name) - else: - new_outlist.append(connection) + found_system, found_signal = False, False + for osys, sys in enumerate(syslist): + indices = sys._find_signals(sname, sys.output_index) + if sname == sys.name: + # Use all outputs + for osig in range(sys.noutputs): + dprint(f" adding output {(osys, osig, gain)}") + new_outlist.append((osys, osig, gain)) + found_system = True + elif indices: + new_connection = [] + for osig in indices: + dprint(f" collecting output {(osys, osig, gain)}") + new_connection.append((osys, osig, gain)) + if len(new_connections) == 0: + for cnx in new_connection: + new_connections.append([cnx]) + if outlist_none: + # See if we need to rewrite the outputs + if len(new_connection) != 1: + new_outputs += [ + sys.output_labels[i] for i in indices] + else: + new_outputs.append(outputs[iout]) + else: + # Additional signal match found =. add to the list + for i, cnx in enumerate(new_connection): + new_connections[i].append(cnx) + found_signal = True + + if found_system and found_signal: + raise ValueError( + f"signal '{sname}' is both signal and system name") + elif found_signal: + dprint(f" adding outputs {new_connections}") + new_outlist += new_connections + elif not found_system: + raise ValueError("could not find signal %s" % sname) else: - new_outlist.append(signal) - outlist = new_outlist + # Regular signal specification + if not isinstance(connection, list): + dprint(f" converting item to list") + connection = [connection] + for spec in connection: + try: + # First trying looking in the output signals + osys, indices, gain = _parse_spec(syslist, spec, 'output') + for osig in indices: + dprint(f" adding output {(osys, osig, gain)}") + new_outlist.append((osys, osig, gain)) + except ValueError: + # If not, see if we can find it in inputs + isys, indices, gain = _parse_spec( + syslist, spec, 'input or output', + dictname='input_index') + for isig in indices: + # Use string form to allow searching input list + dprint(f" adding input {(isys, isig, gain)}") + new_outlist.append( + (syslist[isys].name, + syslist[isys].input_labels[isig], gain)) + outlist, outputs = new_outlist, new_outputs + dprint(f" {outlist=}\n {outputs=}") + + # Make sure inputs and outputs match inplist outlist, if specified + if inputs and ( + isinstance(inputs, (list, tuple)) and len(inputs) != len(inplist) + or isinstance(inputs, int) and inputs != len(inplist)): + raise ValueError("`inputs` incompatible with `inplist`") + if outputs and ( + isinstance(outputs, (list, tuple)) and len(outputs) != len(outlist) + or isinstance(outputs, int) and outputs != len(outlist)): + raise ValueError("`outputs` incompatible with `outlist`") newsys = InterconnectedSystem( syslist, connections=connections, inplist=inplist, outlist=outlist, inputs=inputs, outputs=outputs, states=states, - params=params, dt=dt, name=name, warn_duplicate=warn_duplicate) + params=params, dt=dt, name=name, warn_duplicate=warn_duplicate, + **kwargs) # See if we should add any signals if add_unused: @@ -2950,7 +3050,8 @@ def interconnect( newsys = InterconnectedSystem( syslist, connections=connections, inplist=inplist, outlist=outlist, inputs=inputs, outputs=outputs, states=states, - params=params, dt=dt, name=name, warn_duplicate=warn_duplicate) + params=params, dt=dt, name=name, warn_duplicate=warn_duplicate, + **kwargs) # check for implicitly dropped signals if check_unused: @@ -3088,3 +3189,110 @@ def _parse_list(signals, signame='input', prefix='u'): # Create a LinearIOSystem return LinearIOSystem( ss_sys, inputs=input_names, outputs=output_names, name=name) + + +# +# Utility function for parsing input/output specifications +# +# This function can be used to convert various forms of signal +# specifications used in the interconnect() function and the +# InterconnectedSystem class into a list of signals. Signal specifications +# are of one of the following forms (where 'n' is the number of signals in +# the named dictionary): +# +# i system_index = i, signal_list = [0, ..., n] +# (i,) system_index = i, signal_list = [0, ..., n] +# (i, j) system_index = i, signal_list = [j] +# (i, [j1, ..., jn]) system_index = i, signal_list = [j1, ..., jn] +# 'sys' system_index = i, signal_list = [0, ..., n] +# 'sys.sig' signal 'sig' in subsys 'sys' +# ('sys', 'sig') signal 'sig' in subsys 'sys' +# 'sys.sig[...]' signals 'sig[...]' (slice) in subsys 'sys' +# ('sys', j) signal_index j in subsys 'sys' +# ('sys', 'sig[...]') signals 'sig[...]' (slice) in subsys 'sys' +# +# This function returns the subsystem index, a list of indices for the +# system signals, and the gain to use for that set of signals. +# +import re + +def _parse_spec(syslist, spec, signame, dictname=None): + """Parse a signal specification, returning system and signal index.""" + + # Parse the signal spec into a system, signal, and gain spec + if isinstance(spec, int): + system_spec, signal_spec, gain = spec, None, None + elif isinstance(spec, str): + # If we got a dotted string, break up into pieces + namelist = re.split(r'\.', spec) + system_spec, gain = namelist[0], None + signal_spec = None if len(namelist) < 2 else namelist[1] + if len(namelist) > 2: + # TODO: expand to allow nested signal names + raise ValueError(f"couldn't parse signal reference '{spec}'") + elif isinstance(spec, tuple) and len(spec) <= 3: + system_spec = spec[0] + signal_spec = None if len(spec) < 2 else spec[1] + gain = None if len(spec) < 3 else spec[2] + else: + raise ValueError(f"unrecognized signal spec format '{spec}'") + + # Determine the gain + check_sign = lambda spec: isinstance(spec, str) and spec[0] == '-' + if (check_sign(system_spec) and gain is not None) or \ + (check_sign(signal_spec) and gain is not None) or \ + (check_sign(system_spec) and check_sign(signal_spec)): + # Gain is specified multiple times + raise ValueError(f"gain specified multiple times '{spec}'") + elif check_sign(system_spec): + gain = -1 + system_spec = system_spec[1:] + elif check_sign(signal_spec): + gain = -1 + signal_spec = signal_spec[1:] + elif gain is None: + gain = 1 + + # Figure out the subsystem index + if isinstance(system_spec, int): + system_index = system_spec + elif isinstance(system_spec, str): + syslist_index = {sys.name: i for i, sys in enumerate(syslist)} + system_index = syslist_index.get(system_spec, None) + if system_index is None: + raise ValueError(f"couldn't find system '{system_spec}'") + else: + raise ValueError(f"unknown system spec '{system_spec}'") + + # Make sure the system index is valid + if system_index < 0 or system_index >= len(syslist): + ValueError(f"system index '{system_index}' is out of range") + + # Figure out the name of the dictionary to use for signal names + dictname = signame + '_index' if dictname is None else dictname + signal_dict = getattr(syslist[system_index], dictname) + nsignals = len(signal_dict) + + # Figure out the signal indices + if signal_spec is None: + # No indices given => use the entire range of signals + signal_indices = list(range(nsignals)) + elif isinstance(signal_spec, int): + # Single index given + signal_indices = [signal_spec] + elif isinstance(signal_spec, list) and \ + all([isinstance(index, int) for index in signal_spec]): + # Simple list of integer indices + signal_indices = signal_spec + else: + signal_indices = syslist[system_index]._find_signals( + signal_spec, signal_dict) + if signal_indices is None: + raise ValueError(f"couldn't find {signame} signal '{spec}'") + + # Make sure the signal indices are valid + for index in signal_indices: + if index < 0 or index >= nsignals: + ValueError(f"signal index '{index}' is out of range") + + return system_index, signal_indices, gain diff --git a/control/namedio.py b/control/namedio.py index c0d5f11d5..a37155f09 100644 --- a/control/namedio.py +++ b/control/namedio.py @@ -8,6 +8,7 @@ import numpy as np from copy import deepcopy from warnings import warn +import re from . import config __all__ = ['issiso', 'timebase', 'common_timebase', 'timebaseEqual', @@ -22,6 +23,8 @@ 'namedio.linearized_system_name_suffix': '$linearized', 'namedio.sampled_system_name_prefix': '', 'namedio.sampled_system_name_suffix': '$sampled', + 'namedio.indexed_system_name_prefix': '', + 'namedio.indexed_system_name_suffix': '$indexed', 'namedio.converted_system_name_prefix': '', 'namedio.converted_system_name_suffix': '$converted', } @@ -29,15 +32,16 @@ class NamedIOSystem(object): def __init__( - self, name=None, inputs=None, outputs=None, states=None, **kwargs): + self, name=None, inputs=None, outputs=None, states=None, + input_prefix='u', output_prefix='y', state_prefix='x', **kwargs): # system name self.name = self._name_or_default(name) # Parse and store the number of inputs and outputs - self.set_inputs(inputs) - self.set_outputs(outputs) - self.set_states(states) + self.set_inputs(inputs, prefix=input_prefix) + self.set_outputs(outputs, prefix=output_prefix) + self.set_states(states, prefix=state_prefix) # Process timebase: if not given use default, but allow None as value self.dt = _process_dt_keyword(kwargs) @@ -56,6 +60,9 @@ def _name_or_default(self, name=None, prefix_suffix_name=None): if name is None: name = "sys[{}]".format(NamedIOSystem._idCounter) NamedIOSystem._idCounter += 1 + elif re.match(r".*\..*", name): + raise ValueError(f"invalid system name '{name}' ('.' not allowed)") + prefix = "" if prefix_suffix_name is None else config.defaults[ 'namedio.' + prefix_suffix_name + '_system_name_prefix'] suffix = "" if prefix_suffix_name is None else config.defaults[ @@ -64,7 +71,6 @@ def _name_or_default(self, name=None, prefix_suffix_name=None): # Check if system name is generic def _generic_name_check(self): - import re return re.match(r'^sys\[\d*\]$', self.name) is not None # @@ -106,6 +112,39 @@ def __str__(self): def _find_signal(self, name, sigdict): return sigdict.get(name, None) + # Find a list of signals by name, index, or pattern + def _find_signals(self, name_list, sigdict): + if not isinstance(name_list, (list, tuple)): + name_list = [name_list] + + index_list = [] + for name in name_list: + # Look for signal ranges (slice-like or base name) + ms = re.match(r'([\w$]+)\[([\d]*):([\d]*)\]$', name) # slice + mb = re.match(r'([\w$]+)$', name) # base + if ms: + base = ms.group(1) + start = None if ms.group(2) == '' else int(ms.group(2)) + stop = None if ms.group(3) == '' else int(ms.group(3)) + for var in sigdict: + # Find variables that match + msig = re.match(r'([\w$]+)\[([\d]+)\]$', var) + if msig and msig.group(1) == base and \ + (start is None or int(msig.group(2)) >= start) and \ + (stop is None or int(msig.group(2)) < stop): + index_list.append(sigdict.get(var)) + elif mb and sigdict.get(name, None) is None: + # Try to use name as a base name + for var in sigdict: + msig = re.match(name + r'\[([\d]+)\]$', var) + if msig: + index_list.append(sigdict.get(var)) + else: + index_list.append(sigdict.get(name, None)) + + return None if len(index_list) == 0 or \ + any([idx is None for idx in index_list]) else index_list + def _copy_names(self, sys, prefix="", suffix="", prefix_suffix_name=None): """copy the signal and system name of sys. Name is given as a keyword in case a specific name (e.g. append 'linearized') is desired. """ @@ -151,7 +190,6 @@ def copy(self, name=None, use_prefix_suffix=True): return newsys def set_inputs(self, inputs, prefix='u'): - """Set the number/names of the system inputs. Parameters @@ -175,6 +213,10 @@ def find_input(self, name): """Find the index for an input given its name (`None` if not found)""" return self.input_index.get(name, None) + def find_inputs(self, name_list): + """Return list of indices matching input spec (`None` if not found)""" + return self._find_signals(name_list, self.input_index) + # Property for getting and setting list of input signals input_labels = property( lambda self: list(self.input_index.keys()), # getter @@ -204,6 +246,10 @@ def find_output(self, name): """Find the index for an output given its name (`None` if not found)""" return self.output_index.get(name, None) + def find_outputs(self, name_list): + """Return list of indices matching output spec (`None` if not found)""" + return self._find_signals(name_list, self.output_index) + # Property for getting and setting list of output signals output_labels = property( lambda self: list(self.output_index.keys()), # getter @@ -227,12 +273,16 @@ def set_states(self, states, prefix='x'): """ self.nstates, self.state_index = \ - _process_signal_list(states, prefix=prefix) + _process_signal_list(states, prefix=prefix, allow_dot=True) def find_state(self, name): """Find the index for a state given its name (`None` if not found)""" return self.state_index.get(name, None) + def find_states(self, name_list): + """Return list of indices matching state spec (`None` if not found)""" + return self._find_signals(name_list, self.state_index) + # Property for getting and setting list of state signals state_labels = property( lambda self: list(self.state_index.keys()), # getter @@ -578,7 +628,7 @@ def _process_dt_keyword(keywords, defaults={}, static=False): # Utility function to parse a list of signals -def _process_signal_list(signals, prefix='s'): +def _process_signal_list(signals, prefix='s', allow_dot=False): if signals is None: # No information provided; try and make it up later return None, {} @@ -589,10 +639,17 @@ def _process_signal_list(signals, prefix='s'): elif isinstance(signals, str): # Single string given => single signal with given name + if not allow_dot and re.match(r".*\..*", signals): + raise ValueError( + f"invalid signal name '{signals}' ('.' not allowed)") return 1, {signals: 0} elif all(isinstance(s, str) for s in signals): # Use the list of strings as the signal names + for signal in signals: + if not allow_dot and re.match(r".*\..*", signal): + raise ValueError( + f"invalid signal name '{signal}' ('.' not allowed)") return len(signals), {signals[i]: i for i in range(len(signals))} else: diff --git a/control/statesp.py b/control/statesp.py index d3d1ab1d0..0dbc37c8b 100644 --- a/control/statesp.py +++ b/control/statesp.py @@ -1294,10 +1294,15 @@ def __getitem__(self, indices): """Array style access""" if len(indices) != 2: raise IOError('must provide indices of length 2 for state space') - i = indices[0] - j = indices[1] - return StateSpace(self.A, self.B[:, j], self.C[i, :], - self.D[i, j], self.dt) + outdx = indices[0] if isinstance(indices[0], list) else [indices[0]] + inpdx = indices[1] if isinstance(indices[1], list) else [indices[1]] + sysname = config.defaults['namedio.indexed_system_name_prefix'] + \ + self.name + config.defaults['namedio.indexed_system_name_suffix'] + return StateSpace( + self.A, self.B[:, inpdx], self.C[outdx, :], self.D[outdx, inpdx], + self.dt, name=sysname, + inputs=[self.input_labels[i] for i in list(inpdx)], + outputs=[self.output_labels[i] for i in list(outdx)]) def sample(self, Ts, method='zoh', alpha=None, prewarp_frequency=None, name=None, copy_names=True, **kwargs): diff --git a/control/tests/config_test.py b/control/tests/config_test.py index c36f67280..15229139e 100644 --- a/control/tests/config_test.py +++ b/control/tests/config_test.py @@ -301,3 +301,18 @@ def test_get_param_last(self): assert ct.config._get_param( 'config', 'second', kwargs, pop=True, last=True) == 2 + + def test_system_indexing(self): + # Default renaming + sys = ct.TransferFunction( + [ [ [1], [2], [3]], [ [3], [4], [5]] ], + [ [[1, 2], [1, 3], [1, 4]], [[1, 4], [1, 5], [1, 6]] ], 0.5) + sys1 = sys[1:, 1:] + assert sys1.name == sys.name + '$indexed' + + # Reset the format + ct.config.set_defaults( + 'namedio', indexed_system_name_prefix='PRE', + indexed_system_name_suffix='POST') + sys2 = sys[1:, 1:] + assert sys2.name == 'PRE' + sys.name + 'POST' diff --git a/control/tests/interconnect_test.py b/control/tests/interconnect_test.py index cf59c8c13..b301d3c26 100644 --- a/control/tests/interconnect_test.py +++ b/control/tests/interconnect_test.py @@ -56,30 +56,43 @@ def test_summation_exceptions(): sumblk = ct.summing_junction('u', 'y', dimension=False) -def test_interconnect_implicit(): +@pytest.mark.parametrize("dim", [1, 3]) +def test_interconnect_implicit(dim): """Test the use of implicit connections in interconnect()""" import random + if dim != 1 and not ct.slycot_check(): + pytest.xfail("slycot not installed") + # System definition - P = ct.ss2io( - ct.rss(2, 1, 1, strictly_proper=True), - inputs='u', outputs='y', name='P') - kp = ct.tf(random.uniform(1, 10), [1]) - ki = ct.tf(random.uniform(1, 10), [1, 0]) - C = ct.tf2io(kp + ki, inputs='e', outputs='u', name='C') + P = ct.ss2io(ct.rss(2, dim, dim, strictly_proper=True), name='P') + + # Controller defintion: PI in each input/output pair + kp = ct.tf(np.ones((dim, dim, 1)), np.ones((dim, dim, 1))) \ + * random.uniform(1, 10) + ki = random.uniform(1, 10) + num, den = np.zeros((dim, dim, 1)), np.ones((dim, dim, 2)) + for i, j in zip(range(dim), range(dim)): + num[i, j] = ki + den[i, j] = np.array([1, 0]) + ki = ct.tf(num, den) + C = ct.tf2io(kp + ki, name='C', + inputs=[f'e[{i}]' for i in range(dim)], + outputs=[f'u[{i}]' for i in range(dim)]) # same but static C2 - C2 = ct.tf(random.uniform(1, 10), 1, - inputs='e', outputs='u', name='C2') + C2 = ct.tf2io(kp * random.uniform(1, 10), name='C2', + inputs=[f'e[{i}]' for i in range(dim)], + outputs=[f'u[{i}]' for i in range(dim)]) # Block diagram computation - Tss = ct.feedback(P * C, 1) - Tss2 = ct.feedback(P * C2, 1) + Tss = ct.feedback(P * C, np.eye(dim)) + Tss2 = ct.feedback(P * C2, np.eye(dim)) # Construct the interconnection explicitly Tio_exp = ct.interconnect( (C, P), - connections = [['P.u', 'C.u'], ['C.e', '-P.y']], + connections=[['P.u', 'C.u'], ['C.e', '-P.y']], inplist='C.e', outlist='P.y') # Compare to bdalg computation @@ -89,9 +102,10 @@ def test_interconnect_implicit(): np.testing.assert_almost_equal(Tio_exp.D, Tss.D) # Construct the interconnection via a summing junction - sumblk = ct.summing_junction(inputs=['r', '-y'], output='e', name="sum") + sumblk = ct.summing_junction( + inputs=['r', '-y'], output='e', dimension=dim, name="sum") Tio_sum = ct.interconnect( - (C, P, sumblk), inplist=['r'], outlist=['y']) + [C, P, sumblk], inplist=['r'], outlist=['y'], debug=True) np.testing.assert_almost_equal(Tio_sum.A, Tss.A) np.testing.assert_almost_equal(Tio_sum.B, Tss.B) @@ -100,7 +114,7 @@ def test_interconnect_implicit(): # test whether signal names work for static system C2 Tio_sum2 = ct.interconnect( - [C2, P, sumblk], inputs='r', outputs='y') + [C2, P, sumblk], inplist='r', outlist='y') np.testing.assert_almost_equal(Tio_sum2.A, Tss2.A) np.testing.assert_almost_equal(Tio_sum2.B, Tss2.B) @@ -109,33 +123,26 @@ def test_interconnect_implicit(): # Setting connections to False should lead to an empty connection map empty = ct.interconnect( - (C, P, sumblk), connections=False, inplist=['r'], outlist=['y']) - np.testing.assert_allclose(empty.connect_map, np.zeros((4, 3))) - - # Implicit summation across repeated signals - kp_io = ct.tf2io(kp, inputs='e', outputs='u', name='kp') - ki_io = ct.tf2io(ki, inputs='e', outputs='u', name='ki') + [C, P, sumblk], connections=False, inplist=['r'], outlist=['y']) + np.testing.assert_allclose(empty.connect_map, np.zeros((4*dim, 3*dim))) + + # Implicit summation across repeated signals (using updated labels) + kp_io = ct.tf2io( + kp, inputs=dim, input_prefix='e', + outputs=dim, output_prefix='u', name='kp') + ki_io = ct.tf2io( + ki, inputs=dim, input_prefix='e', + outputs=dim, output_prefix='u', name='ki') Tio_sum = ct.interconnect( - (kp_io, ki_io, P, sumblk), inplist=['r'], outlist=['y']) + [kp_io, ki_io, P, sumblk], inplist=['r'], outlist=['y']) np.testing.assert_almost_equal(Tio_sum.A, Tss.A) np.testing.assert_almost_equal(Tio_sum.B, Tss.B) np.testing.assert_almost_equal(Tio_sum.C, Tss.C) np.testing.assert_almost_equal(Tio_sum.D, Tss.D) - # TODO: interconnect a MIMO system using implicit connections - # P = control.ss2io( - # control.rss(2, 2, 2, strictly_proper=True), - # input_prefix='u', output_prefix='y', name='P') - # C = control.ss2io( - # control.rss(2, 2, 2), - # input_prefix='e', output_prefix='u', name='C') - # sumblk = control.summing_junction( - # inputs=['r', '-y'], output='e', dimension=2) - # S = control.interconnect([P, C, sumblk], inplist='r', outlist='y') - # Make sure that repeated inplist/outlist names work pi_io = ct.interconnect( - (kp_io, ki_io), inplist=['e'], outlist=['u']) + [kp_io, ki_io], inplist=['e'], outlist=['u']) pi_ss = ct.tf2ss(kp + ki) np.testing.assert_almost_equal(pi_io.A, pi_ss.A) np.testing.assert_almost_equal(pi_io.B, pi_ss.B) @@ -144,7 +151,7 @@ def test_interconnect_implicit(): # Default input and output lists, along with singular versions Tio_sum = ct.interconnect( - (kp_io, ki_io, P, sumblk), input='r', output='y') + [kp_io, ki_io, P, sumblk], input='r', output='y', debug=True) np.testing.assert_almost_equal(Tio_sum.A, Tss.A) np.testing.assert_almost_equal(Tio_sum.B, Tss.B) np.testing.assert_almost_equal(Tio_sum.C, Tss.C) @@ -233,18 +240,24 @@ def test_string_inputoutput(): P2 = ct.rss(2, 1, 1) P2_iosys = ct.LinearIOSystem(P2, inputs='y1', outputs='y2') - P_s1 = ct.interconnect([P1_iosys, P2_iosys], inputs='u1', outputs=['y2']) + P_s1 = ct.interconnect( + [P1_iosys, P2_iosys], inputs='u1', outputs=['y2'], debug=True) assert P_s1.input_index == {'u1' : 0} + assert P_s1.output_index == {'y2' : 0} P_s2 = ct.interconnect([P1_iosys, P2_iosys], input='u1', outputs=['y2']) assert P_s2.input_index == {'u1' : 0} + assert P_s2.output_index == {'y2' : 0} P_s1 = ct.interconnect([P1_iosys, P2_iosys], inputs=['u1'], outputs='y2') + assert P_s1.input_index == {'u1' : 0} assert P_s1.output_index == {'y2' : 0} P_s2 = ct.interconnect([P1_iosys, P2_iosys], inputs=['u1'], output='y2') + assert P_s2.input_index == {'u1' : 0} assert P_s2.output_index == {'y2' : 0} + def test_linear_interconnect(): tf_ctrl = ct.tf(1, (10.1, 1), inputs='e', outputs='u', name='ctrl') tf_plant = ct.tf(1, (10.1, 1), inputs='u', outputs='y', name='plant') @@ -297,3 +310,229 @@ def test_linear_interconnect(): inplist=['sum.r'], inputs='r', outlist=['plant.y'], outputs='y') assert clsys.syslist[0].name == 'ctrl' + +@pytest.mark.parametrize( + "connections, inplist, outlist, inputs, outputs", [ + pytest.param( + [['sys2', 'sys1']], 'sys1', 'sys2', None, None, + id="sysname only, no i/o args"), + pytest.param( + [['sys2', 'sys1']], 'sys1', 'sys2', 3, 3, + id="i/o signal counts"), + pytest.param( + [[('sys2', [0, 1, 2]), ('sys1', [0, 1, 2])]], + [('sys1', [0, 1, 2])], [('sys2', [0, 1, 2])], + 3, 3, + id="signal lists, i/o counts"), + pytest.param( + [['sys2.u[0:3]', 'sys1.y[:]']], + 'sys1.u[:]', ['sys2.y[0:3]'], None, None, + id="signal slices"), + pytest.param( + ['sys2.u', 'sys1.y'], 'sys1.u', 'sys2.y', None, None, + id="signal basenames"), + pytest.param( + [[('sys2', [0, 1, 2]), ('sys1', [0, 1, 2])]], + [('sys1', [0, 1, 2])], [('sys2', [0, 1, 2])], + None, None, + id="signal lists, no i/o counts"), + pytest.param( + [[(1, ['u[0]', 'u[1]', 'u[2]']), (0, ['y[0]', 'y[1]', 'y[2]'])]], + [('sys1', [0, 1, 2])], [('sys2', [0, 1, 2])], + 3, ['y1', 'y2', 'y3'], + id="mixed specs"), + pytest.param( + [[f'sys2.u[{i}]', f'sys1.y[{i}]'] for i in range(3)], + [f'sys1.u[{i}]' for i in range(3)], + [f'sys2.y[{i}]' for i in range(3)], + [f'u[{i}]' for i in range(3)], [f'y[{i}]' for i in range(3)], + id="full enumeration"), +]) +def test_interconnect_series(connections, inplist, outlist, inputs, outputs): + # Create an interconnected system for testing + sys1 = ct.rss(4, 3, 3, name='sys1') + sys2 = ct.rss(4, 3, 3, name='sys2') + series = sys2 * sys1 + + # Simple series interconnection + icsys = ct.interconnect( + [sys1, sys2], connections=connections, + inplist=inplist, outlist=outlist, inputs=inputs, outputs=outputs + ) + np.testing.assert_allclose(icsys.A, series.A) + np.testing.assert_allclose(icsys.B, series.B) + np.testing.assert_allclose(icsys.C, series.C) + np.testing.assert_allclose(icsys.D, series.D) + + +@pytest.mark.parametrize( + "connections, inplist, outlist", [ + pytest.param( + [['P', 'C'], ['C', '-P']], 'C', 'P', + id="sysname only, no i/o args"), + pytest.param( + [['P.u', 'C.y'], ['C.u', '-P.y']], 'C.u', 'P.y', + id="sysname only, no i/o args"), + pytest.param( + [['P.u[:]', 'C.y[0:2]'], + [('C', 'u'), ('P', ['y[0]', 'y[1]'], -1)]], + ['C.u[0]', 'C.u[1]'], ('P', [0, 1]), + id="mixed cases"), +]) +def test_interconnect_feedback(connections, inplist, outlist): + # Create an interconnected system for testing + P = ct.rss(4, 2, 2, name='P', strictly_proper=True) + C = ct.rss(4, 2, 2, name='C') + feedback = ct.feedback(P * C, np.eye(2)) + + # Simple feedback interconnection + icsys = ct.interconnect( + [C, P], connections=connections, + inplist=inplist, outlist=outlist + ) + np.testing.assert_allclose(icsys.A, feedback.A) + np.testing.assert_allclose(icsys.B, feedback.B) + np.testing.assert_allclose(icsys.C, feedback.C) + np.testing.assert_allclose(icsys.D, feedback.D) + + +@pytest.mark.parametrize( + "pinputs, poutputs, connections, inplist, outlist", [ + pytest.param( + ['w[0]', 'w[1]', 'u[0]', 'u[1]'], # pinputs + ['z[0]', 'z[1]', 'y[0]', 'y[1]'], # poutputs + [[('P', [2, 3]), ('C', [0, 1])], [('C', [0, 1]), ('P', [2, 3], -1)]], + [('C', [0, 1]), ('P', [0, 1])], # inplist + [('P', [0, 1, 2, 3]), ('C', [0, 1])], # outlist + id="signal indices"), + pytest.param( + ['w[0]', 'w[1]', 'u[0]', 'u[1]'], # pinputs + ['z[0]', 'z[1]', 'y[0]', 'y[1]'], # poutputs + [[('P', [2, 3]), ('C', [0, 1])], [('C', [0, 1]), ('P', [2, 3], -1)]], + ['C', ('P', [0, 1])], ['P', 'C'], # inplist, outlist + id="signal indices, when needed"), + pytest.param( + 4, 4, # default I/O names + [['P.u[2:4]', 'C.y[:]'], ['C.u', '-P.y[2:]']], + ['C', 'P.u[:2]'], ['P.y[:]', 'P.u[2:]'], # inplist, outlist + id="signal slices"), + pytest.param( + ['w[0]', 'w[1]', 'u[0]', 'u[1]'], # pinputs + ['z[0]', 'z[1]', 'y[0]', 'y[1]'], # poutputs + [['P.u', 'C.y'], ['C.u', '-P.y']], # connections + ['C.u', 'P.w'], ['P.z', 'P.y', 'C.y'], # inplist, outlist + id="basename, control output"), + pytest.param( + ['w[0]', 'w[1]', 'u[0]', 'u[1]'], # pinputs + ['z[0]', 'z[1]', 'y[0]', 'y[1]'], # poutputs + [['P.u', 'C.y'], ['C.u', '-P.y']], # connections + ['C.u', 'P.w'], ['P.z', 'P.y', 'P.u'], # inplist, outlist + id="basename, process input"), +]) +def test_interconnect_partial_feedback( + pinputs, poutputs, connections, inplist, outlist): + P = ct.rss( + states=6, name='P', strictly_proper=True, + inputs=pinputs, outputs=poutputs) + C = ct.rss(4, 2, 2, name='C') + + # Low level feedback connection (feedback around "lower" process I/O) + partial = ct.interconnect( + [C, P], + connections=[ + [(1, 2), (0, 0)], [(1, 3), (0, 1)], + [(0, 0), (1, 2, -1)], [(0, 1), (1, 3, -1)]], + inplist=[(0, 0), (0, 1), (1, 0), (1, 1)], # C.u, P.w + outlist=[(1, 0), (1, 1), (1, 2), (1, 3), + (0, 0), (0, 1)], # P.z, P.y, C.y + ) + + # High level feedback conections + icsys = ct.interconnect( + [C, P], connections=connections, + inplist=inplist, outlist=outlist + ) + np.testing.assert_allclose(icsys.A, partial.A) + np.testing.assert_allclose(icsys.B, partial.B) + np.testing.assert_allclose(icsys.C, partial.C) + np.testing.assert_allclose(icsys.D, partial.D) + + +def test_interconnect_doctest(): + P = ct.rss( + states=6, name='P', strictly_proper=True, + inputs=['u[0]', 'u[1]', 'v[0]', 'v[1]'], + outputs=['y[0]', 'y[1]', 'z[0]', 'z[1]']) + C = ct.rss(4, 2, 2, name='C', input_prefix='e', output_prefix='u') + sumblk = ct.summing_junction( + inputs=['r', '-y'], outputs='e', dimension=2, name='sum') + + clsys1 = ct.interconnect( + [C, P, sumblk], + connections=[ + ['P.u[0]', 'C.u[0]'], ['P.u[1]', 'C.u[1]'], + ['C.e[0]', 'sum.e[0]'], ['C.e[1]', 'sum.e[1]'], + ['sum.y[0]', 'P.y[0]'], ['sum.y[1]', 'P.y[1]'], + ], + inplist=['sum.r[0]', 'sum.r[1]', 'P.v[0]', 'P.v[1]'], + outlist=['P.y[0]', 'P.y[1]', 'P.z[0]', 'P.z[1]', 'C.u[0]', 'C.u[1]'] + ) + + clsys2 = ct.interconnect( + [C, P, sumblk], + connections=[ + ['P.u[0:2]', 'C.u[0:2]'], + ['C.e[0:2]', 'sum.e[0:2]'], + ['sum.y[0:2]', 'P.y[0:2]'] + ], + inplist=['sum.r[0:2]', 'P.v[0:2]'], + outlist=['P.y[0:2]', 'P.z[0:2]', 'C.u[0:2]'] + ) + np.testing.assert_equal(clsys2.A, clsys1.A) + np.testing.assert_equal(clsys2.B, clsys1.B) + np.testing.assert_equal(clsys2.C, clsys1.C) + np.testing.assert_equal(clsys2.D, clsys1.D) + + clsys3 = ct.interconnect( + [C, P, sumblk], + connections=[['P.u', 'C.u'], ['C.e', 'sum.e'], ['sum.y', 'P.y']], + inplist=['sum.r', 'P.v'], outlist=['P.y', 'P.z', 'C.u'] + ) + np.testing.assert_equal(clsys3.A, clsys1.A) + np.testing.assert_equal(clsys3.B, clsys1.B) + np.testing.assert_equal(clsys3.C, clsys1.C) + np.testing.assert_equal(clsys3.D, clsys1.D) + + clsys4 = ct.interconnect( + [C, P, sumblk], + connections=[['P.u', 'C'], ['C', 'sum'], ['sum.y', 'P.y']], + inplist=['sum.r', 'P.v'], outlist=['P', 'C.u'] + ) + np.testing.assert_equal(clsys4.A, clsys1.A) + np.testing.assert_equal(clsys4.B, clsys1.B) + np.testing.assert_equal(clsys4.C, clsys1.C) + np.testing.assert_equal(clsys4.D, clsys1.D) + + clsys5 = ct.interconnect( + [C, P, sumblk], + inplist=['sum.r', 'P.v'], outlist=['P', 'C.u'] + ) + np.testing.assert_equal(clsys5.A, clsys1.A) + np.testing.assert_equal(clsys5.B, clsys1.B) + np.testing.assert_equal(clsys5.C, clsys1.C) + np.testing.assert_equal(clsys5.D, clsys1.D) + + +def test_interconnect_rewrite(): + sys = ct.rss( + states=2, name='sys', strictly_proper=True, + inputs=['u[0]', 'u[1]', 'v[0]', 'v[1]', 'w[0]', 'w[1]'], + outputs=['y[0]', 'y[1]', 'z[0]', 'z[1]', 'z[2]']) + + # Create an input/output system w/out inplist, outlist + icsys = ct.interconnect( + [sys], connections=[['sys.v', 'sys.y']], + inputs=['u', 'w'], + outputs=['y', 'z']) + + assert icsys.input_labels == ['u[0]', 'u[1]', 'w[0]', 'w[1]'] diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 59338fc62..4012770ba 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -1789,7 +1789,7 @@ def test_interconnect_add_unused(): # Try a normal interconnection G1 = ct.interconnect( - [P, S, C], inputs=['r', 'u2'], outputs=['y1', 'y2']) + [P, S, C], inputs=['r', 'u2'], outputs=['y1', 'y2'], debug=True) # Same system, but using add_unused G2 = ct.interconnect( diff --git a/control/tests/namedio_test.py b/control/tests/namedio_test.py index cf30b94aa..5968dc484 100644 --- a/control/tests/namedio_test.py +++ b/control/tests/namedio_test.py @@ -293,3 +293,45 @@ def test_duplicate_sysname(): sys = ct.rss(4, 1, 1, name='sys') with pytest.warns(UserWarning, match="duplicate object found"): res = sys * sys + + +# Finding signals +def test_find_signals(): + sys = ct.rss( + states=['x[1]', 'x[2]', 'x[3]', 'x[4]', 'x4', 'x5'], + inputs=['u[0]', 'u[1]', 'u[2]', 'v[0]', 'v[1]'], + outputs=['y[0]', 'y[1]', 'y[2]', 'z[0]', 'z1'], + name='sys') + + # States + assert sys.find_states('x[1]') == [0] + assert sys.find_states('x') == [0, 1, 2, 3] + assert sys.find_states('x4') == [4] + assert sys.find_states(['x4', 'x5']) == [4, 5] + assert sys.find_states(['x', 'x5']) == [0, 1, 2, 3, 5] + assert sys.find_states(['x[2:]']) == [1, 2, 3] + + # Inputs + assert sys.find_inputs('u[1]') == [1] + assert sys.find_inputs('u') == [0, 1, 2] + assert sys.find_inputs('v') == [3, 4] + assert sys.find_inputs(['u', 'v']) == [0, 1, 2, 3, 4] + assert sys.find_inputs(['u[1:]', 'v']) == [1, 2, 3, 4] + assert sys.find_inputs(['u', 'v[:1]']) == [0, 1, 2, 3] + + # Outputs + assert sys.find_outputs('y[1]') == [1] + assert sys.find_outputs('y') == [0, 1, 2] + assert sys.find_outputs('z') == [3] + assert sys.find_outputs(['y', 'z']) == [0, 1, 2, 3] + assert sys.find_outputs(['y[1:]', 'z']) == [1, 2, 3] + assert sys.find_outputs(['y', 'z[:1]']) == [0, 1, 2, 3] + + +# Invalid signal names +def test_invalid_signal_names(): + with pytest.raises(ValueError, match="invalid signal name"): + sys = ct.rss(4, inputs="input.signal", outputs=1) + + with pytest.raises(ValueError, match="invalid system name"): + sys = ct.rss(4, inputs=1, outputs=1, name="system.subsys") diff --git a/control/tests/statesp_test.py b/control/tests/statesp_test.py index fa837f30d..1182674c1 100644 --- a/control/tests/statesp_test.py +++ b/control/tests/statesp_test.py @@ -479,22 +479,27 @@ def test_append_tf(self): def test_array_access_ss(self): - sys1 = StateSpace([[1., 2.], [3., 4.]], - [[5., 6.], [6., 8.]], - [[9., 10.], [11., 12.]], - [[13., 14.], [15., 16.]], 1) - - sys1_11 = sys1[0, 1] - np.testing.assert_array_almost_equal(sys1_11.A, + sys1 = StateSpace( + [[1., 2.], [3., 4.]], + [[5., 6.], [6., 8.]], + [[9., 10.], [11., 12.]], + [[13., 14.], [15., 16.]], 1, + inputs=['u0', 'u1'], outputs=['y0', 'y1']) + + sys1_01 = sys1[0, 1] + np.testing.assert_array_almost_equal(sys1_01.A, sys1.A) - np.testing.assert_array_almost_equal(sys1_11.B, + np.testing.assert_array_almost_equal(sys1_01.B, sys1.B[:, 1:2]) - np.testing.assert_array_almost_equal(sys1_11.C, + np.testing.assert_array_almost_equal(sys1_01.C, sys1.C[0:1, :]) - np.testing.assert_array_almost_equal(sys1_11.D, + np.testing.assert_array_almost_equal(sys1_01.D, sys1.D[0, 1]) - assert sys1.dt == sys1_11.dt + assert sys1.dt == sys1_01.dt + assert sys1_01.input_labels == ['u1'] + assert sys1_01.output_labels == ['y0'] + assert sys1_01.name == sys1.name + "$indexed" def test_dc_gain_cont(self): """Test DC gain for continuous-time state-space systems.""" @@ -831,7 +836,7 @@ def test_error_u_dynamics_mimo(self, u, sys222): sys222.dynamics(0, (1, 1), u) with pytest.raises(ValueError): sys222.output(0, (1, 1), u) - + def test_sample_named_signals(self): sysc = ct.StateSpace(1.1, 1, 1, 1, inputs='u', outputs='y', states='a') @@ -859,14 +864,14 @@ def test_sample_named_signals(self): assert sysd_newnames.find_output('x') == 0 assert sysd_newnames.find_output('y') is None assert sysd_newnames.find_state('b') == 0 - assert sysd_newnames.find_state('a') is None + assert sysd_newnames.find_state('a') is None # test just one name sysd_newnames = sysc.sample(0.1, inputs='v') assert sysd_newnames.find_input('v') == 0 assert sysd_newnames.find_input('u') is None assert sysd_newnames.find_output('y') == 0 assert sysd_newnames.find_output('x') is None - + class TestRss: """These are tests for the proper functionality of statesp.rss.""" diff --git a/control/tests/xferfcn_test.py b/control/tests/xferfcn_test.py index 7d561e770..b999acd95 100644 --- a/control/tests/xferfcn_test.py +++ b/control/tests/xferfcn_test.py @@ -392,12 +392,20 @@ def test_pow(self): def test_slice(self): sys = TransferFunction( [ [ [1], [2], [3]], [ [3], [4], [5]] ], - [ [[1, 2], [1, 3], [1, 4]], [[1, 4], [1, 5], [1, 6]] ]) + [ [[1, 2], [1, 3], [1, 4]], [[1, 4], [1, 5], [1, 6]] ], + inputs=['u0', 'u1', 'u2'], outputs=['y0', 'y1'], name='sys') + sys1 = sys[1:, 1:] assert (sys1.ninputs, sys1.noutputs) == (2, 1) + assert sys1.input_labels == ['u1', 'u2'] + assert sys1.output_labels == ['y1'] + assert sys1.name == 'sys$indexed' sys2 = sys[:2, :2] assert (sys2.ninputs, sys2.noutputs) == (2, 2) + assert sys2.input_labels == ['u0', 'u1'] + assert sys2.output_labels == ['y0', 'y1'] + assert sys2.name == 'sys$indexed' sys = TransferFunction( [ [ [1], [2], [3]], [ [3], [4], [5]] ], @@ -405,6 +413,9 @@ def test_slice(self): sys1 = sys[1:, 1:] assert (sys1.ninputs, sys1.noutputs) == (2, 1) assert sys1.dt == 0.5 + assert sys1.input_labels == ['u[1]', 'u[2]'] + assert sys1.output_labels == ['y[1]'] + assert sys1.name == sys.name + '$indexed' def test__isstatic(self): numstatic = 1.1 diff --git a/control/xferfcn.py b/control/xferfcn.py index 60a40eca0..8715fe0b6 100644 --- a/control/xferfcn.py +++ b/control/xferfcn.py @@ -791,8 +791,7 @@ def __getitem__(self, key): if stop2 is None: stop2 = len(self.num[0]) - num = [] - den = [] + num, den = [], [] for i in range(start1, stop1, step1): num_i = [] den_i = [] @@ -801,10 +800,17 @@ def __getitem__(self, key): den_i.append(self.den[i][j]) num.append(num_i) den.append(den_i) - if self.isctime(): - return TransferFunction(num, den) - else: - return TransferFunction(num, den, self.dt) + + # Save the label names + outputs = [self.output_labels[i] for i in range(start1, stop1, step1)] + inputs = [self.input_labels[j] for j in range(start2, stop2, step2)] + + # Create the system name + sysname = config.defaults['namedio.indexed_system_name_prefix'] + \ + self.name + config.defaults['namedio.indexed_system_name_suffix'] + + return TransferFunction( + num, den, self.dt, inputs=inputs, outputs=outputs, name=sysname) def freqresp(self, omega): """(deprecated) Evaluate transfer function at complex frequencies. diff --git a/doc/iosys.rst b/doc/iosys.rst index 0f6a80b4d..dddcb00c9 100644 --- a/doc/iosys.rst +++ b/doc/iosys.rst @@ -13,7 +13,8 @@ The dynamics of the system can be in continuous or discrete time. To simulate an input/output system, use the :func:`~control.input_output_response` function:: - t, y = ct.input_output_response(io_sys, T, U, X0, params) + resp = ct.input_output_response(io_sys, T, U, X0, params) + t, y, x = resp.time, resp.outputs, resp.states An input/output system can be linearized around an equilibrium point to obtain a :class:`~control.StateSpace` linear system. Use the @@ -256,6 +257,119 @@ of the interconnected system) is not found, but inputs and outputs of individual systems that are not connected to other systems are left unconnected (so be careful!). +Advanced specification of signal names +-------------------------------------- + +In addition to manual specification of signal names and automatic +connection of signals with the same name, the +:func:`~control.interconnect` has a variety of other mechanisms +available for specifying signal names. The following forms are +recognized for the `connections`, `inplist`, and `outlist` +parameters:: + + (subsys, index, gain) tuple form with integer indices + ('sysname', 'signal', gain) tuple form with name lookup + 'sysname.signal[i]' string form (gain = 1) + '-sysname.signal[i]' set gain to -1 + (subsys, [i1, ..., iN], gain) signals with indices i1, ..., in + 'sysname.signal[i:j]' range of signal names, i through j-1 + 'sysname' all input or outputs of system + 'signal' all matching signals (in any subsystem) + +For tuple forms, mixed specifications using integer indices and +strings are possible. + +For the index range form `sysname.signal[i:j]`, if either `i` or `j` +is not specified, then it defaults to the minimum or maximum value of +the signal range. Note that despite the similarity to slice notation, +negative indices and step specifications are not supported. + +Using these various forms can simplfy the specification of +interconnections. For example, consider a process with inputs 'u' and +'v', each of dimension 2, and two outputs 'w' and 'y', each of +dimension 2:: + + P = ct.rss( + states=6, name='P', strictly_proper=True, + inputs=['u[0]', 'u[1]', 'v[0]', 'v[1]'], + outputs=['y[0]', 'y[1]', 'z[0]', 'z[1]']) + +Suppose we construct a controller with 2 inputs and 2 outputs that +takes the (2-dimensional) error `e` and outputs and control signal `u`:: + + C = ct.rss(4, 2, 2, name='C', input_prefix='e', output_prefix='u') + +Finally, we include a summing block that will take the difference between +the reference input `r` and the measured output `y`:: + + sumblk = ct.summing_junction( + inputs=['r', '-y'], outputs='e', dimension=2, name='sum') + +The closed loop system should close the loop around the process +outputs `y` and inputs `u`, leaving the process inputs `v` and outputs +'w', as well as the reference input `r`. We would like the output of +the closed loop system to consist of all system outputs `y` and `z`, +as well as the controller input `u`. + +This collection of systems can be combined in a variety of ways. The +most explict would specify every signal:: + + clsys1 = ct.interconnect( + [C, P, sumblk], + connections=[ + ['P.u[0]', 'C.u[0]'], ['P.u[1]', 'C.u[1]'], + ['C.e[0]', 'sum.e[0]'], ['C.e[1]', 'sum.e[1]'], + ['sum.y[0]', 'P.y[0]'], ['sum.y[1]', 'P.y[1]'], + ], + inplist=['sum.r[0]', 'sum.r[1]', 'P.v[0]', 'P.v[1]'], + outlist=['P.y[0]', 'P.y[1]', 'P.z[0]', 'P.z[1]', 'C.u[0]', 'C.u[1]'] + ) + +This connections can be simplified using signal ranges:: + + clsys2 = ct.interconnect( + [C, P, sumblk], + connections=[ + ['P.u[0:2]', 'C.u[0:2]'], + ['C.e[0:2]', 'sum.e[0:2]'], + ['sum.y[0:2]', 'P.y[0:2]'] + ], + inplist=['sum.r[0:2]', 'P.v[0:2]'], + outlist=['P.y[0:2]', 'P.z[0:2]', 'C.u[0:2]'] + ) + +An even simpler form can be used by omitting the range specification +when all signals with the same prefix are used:: + + clsys3 = ct.interconnect( + [C, P, sumblk], + connections=[['P.u', 'C.u'], ['C.e', 'sum.e'], ['sum.y', 'P.y']], + inplist=['sum.r', 'P.v'], outlist=['P.y', 'P.z', 'C.u'] + ) + +A further simplification is possible when all of the inputs or outputs +of an individual system are used in a given specification:: + + clsys4 = ct.interconnect( + [C, P, sumblk], + connections=[['P.u', 'C'], ['C', 'sum'], ['sum.y', 'P.y']], + inplist=['sum.r', 'P.v'], outlist=['P', 'C.u'] + ) + +And finally, since we have named the signals throughout the system in +a consistent way, we could let :func:`ct.interconnect` do all of the +work:: + + clsys5 = ct.interconnect( + [C, P, sumblk], inplist=['sum.r', 'P.v'], outlist=['P', 'C.u'] + ) + +Various other simplifications are possible, but it can sometimes be +complicated to debug error message when things go wrong. Setting +`debug=True` when calling :func:`~control.interconnect` prints out +information about how the arguments are processed that may be helpful +in understanding what is going wrong. + Automated creation of state feedback systems -------------------------------------------- diff --git a/examples/cruise-control.py b/examples/cruise-control.py index 8c654477b..08439b1a4 100644 --- a/examples/cruise-control.py +++ b/examples/cruise-control.py @@ -140,13 +140,13 @@ def motor_torque(omega, params={}): # Outputs: v (vehicle velocity) cruise_tf = ct.InterconnectedSystem( (control_tf, vehicle), name='cruise', - connections=( + connections=[ ['control.u', '-vehicle.v'], - ['vehicle.u', 'control.y']), - inplist=('control.u', 'vehicle.gear', 'vehicle.theta'), - inputs=('vref', 'gear', 'theta'), - outlist=('vehicle.v', 'vehicle.u'), - outputs=('v', 'u')) + ['vehicle.u', 'control.y']], + inplist=['control.u', 'vehicle.gear', 'vehicle.theta'], + inputs=['vref', 'gear', 'theta'], + outlist=['vehicle.v', 'vehicle.u'], + outputs=['v', 'u']) # Define the time and input vectors T = np.linspace(0, 25, 101) @@ -280,11 +280,11 @@ def pi_output(t, x, u, params={}): # Create the closed loop system cruise_pi = ct.InterconnectedSystem( (vehicle, control_pi), name='cruise', - connections=( + connections=[ ['vehicle.u', 'control.u'], - ['control.v', 'vehicle.v']), - inplist=('control.vref', 'vehicle.gear', 'vehicle.theta'), - outlist=('control.u', 'vehicle.v'), outputs=['u', 'v']) + ['control.v', 'vehicle.v']], + inplist=['control.vref', 'vehicle.gear', 'vehicle.theta'], + outlist=['control.u', 'vehicle.v'], outputs=['u', 'v']) # Figure 4.3b shows the response of the closed loop system. The figure shows # that even if the hill is so steep that the throttle changes from 0.17 to @@ -409,12 +409,12 @@ def sf_output(t, z, u, params={}): # Create the closed loop system for the state space controller cruise_sf = ct.InterconnectedSystem( (vehicle, control_sf), name='cruise', - connections=( + connections=[ ['vehicle.u', 'control.u'], ['control.x', 'vehicle.v'], - ['control.y', 'vehicle.v']), - inplist=('control.r', 'vehicle.gear', 'vehicle.theta'), - outlist=('control.u', 'vehicle.v'), outputs=['u', 'v']) + ['control.y', 'vehicle.v']], + inplist=['control.r', 'vehicle.gear', 'vehicle.theta'], + outlist=['control.u', 'vehicle.v'], outputs=['u', 'v']) # Compute the linearization of the dynamics around the equilibrium point diff --git a/examples/cruise.ipynb b/examples/cruise.ipynb index 7be0c8644..c3e76aec1 100644 --- a/examples/cruise.ipynb +++ b/examples/cruise.ipynb @@ -328,13 +328,13 @@ "\n", "# Create the closed loop system for the state space controller\n", "cruise_sf = ct.InterconnectedSystem(\n", - " (vehicle, control_sf), name='cruise',\n", - " connections=(\n", - " ('vehicle.u', 'control.u'),\n", - " ('control.x', 'vehicle.v'),\n", - " ('control.y', 'vehicle.v')),\n", - " inplist=('control.r', 'vehicle.gear', 'vehicle.theta'),\n", - " outlist=('control.u', 'vehicle.v'), outputs=['u', 'v'])\n", + " [vehicle, control_sf], name='cruise',\n", + " connections=[\n", + " ['vehicle.u', 'control.u'],\n", + " ['control.x', 'vehicle.v'],\n", + " ['control.y', 'vehicle.v']],\n", + " inplist=['control.r', 'vehicle.gear', 'vehicle.theta'],\n", + " outlist=['control.u', 'vehicle.v'], outputs=['u', 'v'])\n", "\n", "# Define the time and input vectors\n", "T = np.linspace(0, 25, 501)\n", @@ -460,14 +460,14 @@ "# Construct the closed loop system and plot the response\n", "# Create the closed loop system for the state space controller\n", "cruise_pz = ct.InterconnectedSystem(\n", - " (vehicle, control_pz), name='cruise_pz',\n", - " connections = (\n", - " ('control.u', '-vehicle.v'),\n", - " ('vehicle.u', 'control.y')),\n", - " inplist = ('control.u', 'vehicle.gear', 'vehicle.theta'),\n", - " inputs = ('vref', 'gear', 'theta'),\n", - " outlist = ('vehicle.v', 'vehicle.u'),\n", - " outputs = ('v', 'u'))\n", + " [vehicle, control_pz], name='cruise_pz',\n", + " connections = [\n", + " ['control.u', '-vehicle.v'],\n", + " ['vehicle.u', 'control.y']],\n", + " inplist = ['control.u', 'vehicle.gear', 'vehicle.theta'],\n", + " inputs = ['vref', 'gear', 'theta'],\n", + " outlist = ['vehicle.v', 'vehicle.u'],\n", + " outputs = ['v', 'u'])\n", "\n", "# Find the equilibrium point\n", "X0, U0 = ct.find_eqpt(\n", @@ -546,11 +546,11 @@ " \n", " # Construct the closed loop system by interconnecting process and controller\n", " cruise_tf = ct.InterconnectedSystem(\n", - " (vehicle, control_tf), name='cruise',\n", - " connections = [('control.u', '-vehicle.v'), ('vehicle.u', 'control.y')],\n", - " inplist = ('control.u', 'vehicle.gear', 'vehicle.theta'), \n", - " inputs = ('vref', 'gear', 'theta'),\n", - " outlist = ('vehicle.v', 'vehicle.u'), outputs = ('v', 'u'))\n", + " [vehicle, control_tf], name='cruise',\n", + " connections = [['control.u', '-vehicle.v'], ['vehicle.u', 'control.y']],\n", + " inplist = ['control.u', 'vehicle.gear', 'vehicle.theta'], \n", + " inputs = ['vref', 'gear', 'theta'],\n", + " outlist = ['vehicle.v', 'vehicle.u'], outputs = ['v', 'u'])\n", "\n", " # Plot the velocity response\n", " X0, U0 = ct.find_eqpt(\n", @@ -593,11 +593,11 @@ " \n", " # Construct the closed loop system by interconnecting process and controller\n", " cruise_tf = ct.InterconnectedSystem(\n", - " (vehicle, control_tf), name='cruise',\n", - " connections = [('control.u', '-vehicle.v'), ('vehicle.u', 'control.y')],\n", - " inplist = ('control.u', 'vehicle.gear', 'vehicle.theta'), \n", - " inputs = ('vref', 'gear', 'theta'),\n", - " outlist = ('vehicle.v', 'vehicle.u'), outputs = ('v', 'u'))\n", + " [vehicle, control_tf], name='cruise',\n", + " connections = [['control.u', '-vehicle.v'], ['vehicle.u', 'control.y']],\n", + " inplist = ['control.u', 'vehicle.gear', 'vehicle.theta'], \n", + " inputs = ['vref', 'gear', 'theta'],\n", + " outlist = ['vehicle.v', 'vehicle.u'], outputs = ['v', 'u'])\n", "\n", " # Plot the velocity response\n", " X0, U0 = ct.find_eqpt(\n", @@ -630,10 +630,10 @@ " name='control', inputs='u', outputs='y')\n", "\n", "cruise_tf = ct.InterconnectedSystem(\n", - " (vehicle, control_tf), name='cruise',\n", - " connections = [('control.u', '-vehicle.v'), ('vehicle.u', 'control.y')],\n", - " inplist = ('control.u', 'vehicle.gear', 'vehicle.theta'), inputs = ('vref', 'gear', 'theta'),\n", - " outlist = ('vehicle.v', 'vehicle.u'), outputs = ('v', 'u'))" + " [vehicle, control_tf], name='cruise',\n", + " connections = [['control.u', '-vehicle.v'], ['vehicle.u', 'control.y']],\n", + " inplist = ['control.u', 'vehicle.gear', 'vehicle.theta'], inputs = ['vref', 'gear', 'theta'],\n", + " outlist = ['vehicle.v', 'vehicle.u'], outputs = ['v', 'u'])" ] }, { @@ -750,12 +750,12 @@ "\n", "# Create the closed loop system\n", "cruise_pi = ct.InterconnectedSystem(\n", - " (vehicle, control_pi), name='cruise',\n", - " connections=(\n", - " ('vehicle.u', 'control.u'),\n", - " ('control.v', 'vehicle.v')),\n", - " inplist=('control.vref', 'vehicle.gear', 'vehicle.theta'),\n", - " outlist=('control.u', 'vehicle.v'), outputs=['u', 'v'])" + " [vehicle, control_pi], name='cruise',\n", + " connections=[\n", + " ['vehicle.u', 'control.u'],\n", + " ['control.v', 'vehicle.v']],\n", + " inplist=['control.vref', 'vehicle.gear', 'vehicle.theta'],\n", + " outlist=['control.u', 'vehicle.v'], outputs=['u', 'v'])" ] }, {