Source code for models.node.processing.processing_node

import abc
from typing import List, Dict, Final

from models.exception.missing_parameter import MissingParameterError
from models.framework_data import FrameworkData
from models.node.node import Node


[docs]class ProcessingNode(Node): """ A ProcessingNode is a type of Node that processes data from its input buffer to produce output data, based on its implemented processing logic. It extends the Node abstract class. Attributes: _MODULE_NAME (Final[str]): a constant string representing the name of the module where this class is defined. Methods: __init__(self, parameters=None): Initializes a ProcessingNode instance with the specified parameters. _validate_parameters(self, parameters: dict): Validates the specified parameters for the ProcessingNode instance, raising MissingParameterError if any required parameters are missing. _initialize_parameter_fields(self, parameters: dict): Initializes any custom parameter fields for the ProcessingNode instance. _initialize_buffer_options(self, buffer_options: dict) -> None: Initializes buffer behavior options for the ProcessingNode instance. _run(self, data: FrameworkData, input_name: str) -> None: Runs the ProcessingNode instance by inserting new input data, processing the input buffer, and outputting the processed data. _process_input_buffer(self): Processes the input buffer to produce output data. _is_next_node_call_enabled(self) -> bool: Returns True if the ProcessingNode instance allows the next node to be called, False otherwise. _is_processing_condition_satisfied(self) -> bool: Returns True if the processing condition for the ProcessingNode instance is satisfied, False otherwise. _process(self, data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: Processes the specified data to produce output data. _get_inputs(self) -> List[str]: Returns a list of input names for the ProcessingNode instance. _get_outputs(self) -> List[str]: Returns a list of output names for the ProcessingNode instance. dispose(self) -> None: Disposes of the ProcessingNode instance. """ _MODULE_NAME: Final[str] = 'models.node.processing' def __init__(self, parameters=None) -> None: super().__init__(parameters=parameters) @abc.abstractmethod def _validate_parameters(self, parameters: dict): super()._validate_parameters(parameters) if 'clear_output_buffer_on_data_input' not in parameters['buffer_options']: raise MissingParameterError( module=self._MODULE_NAME,name=self.name, parameter='buffer_options.clear_output_buffer_on_data_input' ) if 'clear_input_buffer_after_process' not in parameters['buffer_options']: raise MissingParameterError( module=self._MODULE_NAME,name=self.name, parameter='buffer_options.clear_input_buffer_after_process' ) if 'clear_output_buffer_after_process' not in parameters['buffer_options']: raise MissingParameterError( module=self._MODULE_NAME,name=self.name, parameter='buffer_options.clear_output_buffer_after_process' ) @abc.abstractmethod def _initialize_parameter_fields(self, parameters: dict): super()._initialize_parameter_fields(parameters) def _initialize_buffer_options(self, buffer_options: dict) -> None: """Processing node implementation of buffer behaviour options initialization :param buffer_options: Buffer behaviour options. Should contain bool attributes 'clear_output_buffer_on_data_input', 'clear_input_buffer_after_process' and 'clear_output_buffer_after_process' :type buffer_options: dict """ self._clear_output_buffer_on_data_input = buffer_options['clear_output_buffer_on_data_input'] self._clear_input_buffer_after_process = buffer_options['clear_input_buffer_after_process'] self._clear_output_buffer_after_process = buffer_options['clear_output_buffer_after_process'] def _run(self, data: FrameworkData, input_name: str) -> None: self.print(f'Inserting data in input buffer {input_name}') self._insert_new_input_data(data, input_name) if self._clear_output_buffer_on_data_input: self._clear_output_buffer() self._process_input_buffer() def _process_input_buffer(self): if not self._is_processing_condition_satisfied(): return self.print('Starting processing of input buffer') processed_data = self._process(self._input_buffer) if self._clear_input_buffer_after_process: self._clear_input_buffer() if self._clear_output_buffer_after_process: self._clear_output_buffer() self.print('Outputting data') for output_name in processed_data.keys(): self._insert_new_output_data(processed_data[output_name], output_name) @abc.abstractmethod def _is_next_node_call_enabled(self) -> bool: raise NotImplementedError() @abc.abstractmethod def _is_processing_condition_satisfied(self) -> bool: raise NotImplementedError() @abc.abstractmethod def _process(self, data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: """Node self implementation of data processing, relating input and outputs. :param data: data to be processed. :type data: Dict[str,FrameworkData] """ raise NotImplementedError() @abc.abstractmethod def _get_inputs(self) -> List[str]: raise NotImplementedError() @abc.abstractmethod def _get_outputs(self) -> List[str]: raise NotImplementedError()
[docs] def dispose(self) -> None: self._clear_input_buffer() self._clear_output_buffer() return