models.node.processing package#

Subpackages#

Submodules#

models.node.processing.fill module#

class models.node.processing.fill.Fill(parameters=None)[source]#

Bases: ProcessingNode

This node is used to fill the input buffer with a certain amount of samples. This is useful when the input buffer is not filled with the desired amount of samples. This node can fill the input buffer with zeros or with the last sample in the input buffer.

Attributes:

_MODULE_NAME (str): The name of the module(in this case node.processing.fill) INPUT_MAIN (str): The name of the main input(in this case main) OUTPUT_MAIN (str): The name of the main output(in this case main) FILL_TYPE_ZEROFILL (str): The name of the fill type that fills the input buffer with zeros(in this case zero_fill) FILL_TYPE_SAMPLE_AND_HOLD (str): The name of the fill type that fills the input buffer with the last sample(in this case sample_and_hold)

configuration.json usage:

module (str): The name of the module (node.processing.fill)

type (str): The name of the class (Fill)

fill_size (int): The size of the input buffer after filling.

filling_type (str): The type of filling to use. Can be zero_fill or sample_and_hold.

buffer_options (dict): Buffer options.

clear_output_buffer_after_process (bool): Whether to clear the output buffer after processing.

clear_input_buffer_after_process (bool): Whether to clear the input buffer after processing.

clear_output_buffer_on_data_input (bool): Whether to clear the output buffer when new data is inserted in the input buffer.

FILL_TYPE_SAMPLE_AND_HOLD: Final[str] = 'sample_and_hold'#
FILL_TYPE_ZEROFILL: Final[str] = 'zero_fill'#
INPUT_MAIN: Final[str] = 'main'#
OUTPUT_MAIN: Final[str] = 'main'#

models.node.processing.merge module#

class models.node.processing.merge.Merge(parameters=None)[source]#

Bases: Synchronize

This node merges the data from the master and the slave. The master data is always kept. If the slave data contains a channel that is also present in the master data, the slave data is renamed to a new channel name and added to the master data. This node also automatically synchronizes the data if the slave data is not synchronized with the master data. This node usage is a bit tricky. It goes in the “common” section of the configuration file, and it needs to have at least two nodes before him (the master and the slave) that will point their outputs to this “merge” node. Those previous nodes must have a additional “timestamp” property, that will define if they have a master or a slave timestamp. The “output” property of this previous node will also have a property that defines if this node will be the master or the slave. Here’s an example of the previous nodes “output” and “timestamp” properties:

{
    "nodes": {
        "root": {
            "node_a": {
                ...,
                "output": {
                    "data_a": [
                        {
                            "node": "merge",
                            "input": "master_main"
                        }
                    ],
                    "timestamp": [
                        {
                            "node": "merge",
                            "input": "master_timestamp"
                            }
                    ]
                },
            }, 
            "node_b": {
                ...,
                "output": {
                    "data_b": [
                        {
                            "node": "merge",
                            "input": "slave_main"
                        }
                    ],
                    "timestamp": [
                        {
                            "node": "merge",
                            "input": "slave_timestamp"
                        }
                    ]
                },
            }
        },
        "common": {
            "merge": {
                "module": "models.node.processing",
                "type": "Merge",
                ...
            }
        },
        ...
    }
}
configuration.json usage:

module (str): Current module name (in this case models.node.processing).

type (str): Current node type (in this case Merge).

slave_filling (str): Slave filling type. It can be zero_fill or sample_and_hold.

statistics_enabled (bool): If True, the node will calculate the synchronization error statistics.

buffer_options (dict): Buffer options:

clear_output_buffer_on_data_input (bool): If True, the output buffer will be cleared when data is inputted.

clear_input_buffer_after_process (bool): If True, the input buffer will be cleared after the node is executed.

clear_output_buffer_after_process (bool): If True, the output buffer will be cleared after the node is executed.

OUTPUT_MERGED_MAIN: Final[str] = 'merged_main'#
OUTPUT_MERGED_TIMESTAMP: Final[str] = 'merged_timestamp'#

models.node.processing.processing_node module#

class models.node.processing.processing_node.ProcessingNode(parameters=None)[source]#

Bases: Node

A ProcessingNode is a type of Node that processes data from its input buffer to produce output data, based on its implemented processing logic. It extends the Node abstract class.

Attributes:

_MODULE_NAME (Final[str]): a constant string representing the name of the module where this class is defined.

Methods:
__init__(self, parameters=None):

Initializes a ProcessingNode instance with the specified parameters.

_validate_parameters(self, parameters: dict):

Validates the specified parameters for the ProcessingNode instance, raising MissingParameterError if any required parameters are missing.

_initialize_parameter_fields(self, parameters: dict):

Initializes any custom parameter fields for the ProcessingNode instance.

_initialize_buffer_options(self, buffer_options: dict) -> None:

Initializes buffer behavior options for the ProcessingNode instance.

_run(self, data: FrameworkData, input_name: str) -> None:

Runs the ProcessingNode instance by inserting new input data, processing the input buffer, and outputting the processed data.

_process_input_buffer(self):

Processes the input buffer to produce output data.

_is_next_node_call_enabled(self) -> bool:

Returns True if the ProcessingNode instance allows the next node to be called, False otherwise.

_is_processing_condition_satisfied(self) -> bool:

Returns True if the processing condition for the ProcessingNode instance is satisfied, False otherwise.

_process(self, data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]:

Processes the specified data to produce output data.

_get_inputs(self) -> List[str]:

Returns a list of input names for the ProcessingNode instance.

_get_outputs(self) -> List[str]:

Returns a list of output names for the ProcessingNode instance.

dispose(self) -> None:

Disposes of the ProcessingNode instance.

dispose() None[source]#

Node self implementation of disposal of allocated resources.

models.node.processing.signalcheck module#

class models.node.processing.signalcheck.SignalCheck(parameters=None)[source]#

Bases: ProcessingNode

INPUT_MAIN: Final[str] = 'main'#
OUTPUT_MAIN: Final[str] = 'main'#

models.node.processing.synchronize module#

class models.node.processing.synchronize.Synchronize(parameters=None)[source]#

Bases: ProcessingNode

FILL_TYPE_SAMPLE_AND_HOLD: Final[str] = 'sample_and_hold'#
FILL_TYPE_ZEROFILL: Final[str] = 'zero_fill'#
INPUT_MASTER_MAIN: Final[str] = 'master_main'#
INPUT_MASTER_TIMESTAMP: Final[str] = 'master_timestamp'#
INPUT_SLAVE_MAIN: Final[str] = 'slave_main'#
INPUT_SLAVE_TIMESTAMP: Final[str] = 'slave_timestamp'#
OUTPUT_SYNCHRONIZED_MASTER: Final[str] = 'synchronized_master'#
OUTPUT_SYNCHRONIZED_SLAVE: Final[str] = 'synchronized_slave'#
OUTPUT_SYNCHRONIZED_TIMESTAMP: Final[str] = 'synchronized_timestamp'#

Module contents#