from __future__ import annotations
import abc
import copy
import traceback
import time
from queue import Queue
from threading import Thread, Event, Condition
from typing import List, Dict, Final, Any
from models.exception.invalid_parameter_value import InvalidParameterValue
from models.exception.missing_parameter import MissingParameterError
from models.framework_data import FrameworkData
[docs]class Node:
_MODULE_NAME: Final[str] = 'models.node'
"""Abstract base class for processing pipeline execution on this framework.
A node is a component that receives data from its inputs, process it and send it to its outputs.
"""
def __init__(self, parameters=None) -> None:
super().__init__()
self.name: Final[str] = parameters['name']
self._enable_log = True
self.print("Initializing")
self._validate_parameters(parameters)
self.parameters = parameters
self._initialize_buffer_options(parameters['buffer_options'])
self._type: Final[str] = parameters['type']
self._initialize_parameter_fields(parameters)
self._clear_input_buffer()
self._clear_output_buffer()
self._initialize_children()
self._child_input_relation: Dict[Node, List[str]] = {}
self._is_disposed = False
# Threading attributes
self.local_storage = Queue()
self.running = False
self.thread = None
self.new_data_available = False
self.condition = Condition()
self._stop_event = Event()
self.is_running_main_process = False
self.thread = Thread(target=self._thread_runner, name=self.name)
self.thread.start()
def _build_graph_inputs(self):
return f"""
<TR>
<TD BORDER="0">
<TABLE BORDER="0" CELLBORDER="" CELLSPACING="0" CELLPADDING="0">
<TR>
<TD WIDTH="20"></TD>
{[f'<TD PORT="in_{input_name}" BORDER="1" CELLPADDING="1"><FONT POINT-SIZE="8">{input_name}</FONT></TD><TD WIDTH="10"></TD>' for input_name in self._get_inputs()]}
<TD WIDTH="10"></TD>
</TR>
</TABLE>
</TD>
</TR>
"""
def _build_graph_outputs(self):
return f"""
<TR>
<TD BORDER="0">
<TABLE BORDER="0" CELLBORDER="0" CELLSPACING="0" CELLPADDING="0">
<TR>
<TD WIDTH="20"></TD>
{[f'<TD PORT="out_{output_name}" BORDER="1" CELLPADDING="1"><FONT POINT-SIZE="8">{output_name}</FONT></TD><TD WIDTH="10"></TD>' for output_name in self._get_outputs()]}
<TD WIDTH="10"></TD>
</TR>
</TABLE>
</TD>
</TR>
"""
[docs] def build_graphviz_representation(self):
return f"""
{self.name} [
shape=plaintext
tooltip="{self.parameters}"
label=<
<TABLE BORDER="0" CELLBORDER="0" CELLSPACING="0" CELLPADDING="0">
{self._build_graph_inputs()}
<TR>
<TD BORDER="1" STYLE="ROUNDED" CELLPADDING="4" COLOR="black">{self.name}<BR/><FONT POINT-SIZE="5">{self._MODULE_NAME}</FONT></TD>
</TR>
{self._build_graph_outputs()}
</TABLE>
>
];
"""
@abc.abstractmethod
def _validate_parameters(self, parameters: dict):
"""
Validates parameters passed to this node.
:param parameters: Parameters passed to this node.
:type parameters: dict
:raises MissingParameterError: If a required parameter is missing.
"""
if 'module' not in parameters:
raise MissingParameterError(
module=self._MODULE_NAME, name=self.name,
parameter='module'
)
if 'models.node.' not in parameters['module']:
raise InvalidParameterValue(
module=self._MODULE_NAME, name=self.name,
parameter='module',
cause='must_be_part_of_[models.node]_module'
)
if 'type' not in parameters:
raise MissingParameterError(
module=self._MODULE_NAME, name=self.name,
parameter='type'
)
if 'enable_log' not in parameters:
parameters['enable_log'] = False
if 'buffer_options' not in parameters:
raise MissingParameterError(
module=self._MODULE_NAME, name=self.name,
parameter='buffer_options'
)
if 'outputs' not in parameters:
raise MissingParameterError(
module=self._MODULE_NAME, name=self.name,
parameter='outputs'
)
if 'name' not in parameters:
raise MissingParameterError(
module=self._MODULE_NAME, name=self.name,
parameter='name'
)
if 'print_buffer_size' not in parameters['buffer_options']:
parameters['buffer_options']['print_buffer_size'] = False
elif type(parameters['buffer_options']['print_buffer_size']) is not bool:
raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name,
parameter='buffer_options.print_buffer_size',
cause='must_be_bool')
@abc.abstractmethod
def _initialize_parameter_fields(self, parameters: dict):
"""
Initializes parameter fields of this node. This is an abstract method and should be implemented by subclasses.
:param parameters: Parameters passed to this node.
:type parameters: dict
"""
self._enable_log = parameters['enable_log']
self._should_print_buffer_size = parameters['buffer_options']['print_buffer_size']
return
def _clear_input_buffer(self):
"""Sets input buffer to new empty object for each input name
"""
self._input_buffer = {}
for input_name in self._get_inputs():
self._input_buffer[input_name] = FrameworkData()
def _clear_output_buffer(self):
"""Sets output buffer to new empty object for each output name
"""
self._output_buffer = {}
for output_name in self._get_outputs():
self._output_buffer[output_name] = FrameworkData()
@staticmethod
def _insert_data_in_buffer(data: FrameworkData, buffer_data_name: str, buffer: Dict[str, FrameworkData]):
buffer[buffer_data_name].extend(copy.deepcopy(data))
def _print_buffer_size(self, buffer_name: str, buffer: Dict[str, FrameworkData]):
formatted_buffer_sizes = f'Buffer:{buffer_name}\t'
for key in buffer:
formatted_buffer_sizes += f'###Key:{key}=Length:{buffer[key].get_data_count()}### '
self.print(formatted_buffer_sizes)
def _insert_new_input_data(self, data: FrameworkData, input_name: str):
"""Appends new data to the end of already existing input buffer
:param data: Data to be added. Should be in channel X sample format
:type data: FrameworkData
:param input_name: Node input name.
:type input_name: str
"""
self._input_buffer[input_name].extend(data)
if self._should_print_buffer_size:
self._print_buffer_size('input', self._input_buffer)
def _insert_new_output_data(self, data: FrameworkData, output_name: str):
"""
Appends new data to the end of already existing output buffer
:param data: Data to be added. Should be in channel X sample format
:type data: FrameworkData
:param output_name: Node output name.
:type output_name: str
"""
self._output_buffer[output_name].extend(data)
if self._should_print_buffer_size:
self._print_buffer_size('output', self._output_buffer)
def _initialize_children(self):
"""Sets child nodes dictionary to a new, empty dict
"""
self._children: Dict[str, List[Dict[str, Any]]] = {}
for output_name in self._get_outputs():
self._children[output_name] = []
[docs] def add_child(self, output_name: str, node: Node, input_name: str):
"""Adds a new child node to child nodes dictionary
:param output_name: Current node output name, used as key.
:type output_name: str
:param node: Child node object.
:type node: Node
:param input_name: Child node input name.
:type input_name: str
"""
# TODO Melhorar o objeto guardado em self._children
if node not in self._child_input_relation:
self._child_input_relation[node] = []
if input_name in self._child_input_relation[node]:
raise InvalidParameterValue(module='node', parameter=f'outputs.{output_name}', cause='already_added',
name=self.name)
self._children[output_name].append(
{
'input_name': input_name,
'node': node,
'run': lambda data: node.run(data, input_name),
'run_': lambda data: node.run(),
'dispose': lambda x: node.dispose_all()
}
)
def _dispose_all_children(self):
for output_name in self._get_outputs():
output_children = self._children[output_name]
for child in output_children:
child['dispose'](child)
def _call_children(self):
"""Calls child nodes to execute their processing given current node output buffer content.
"""
for output_name in self._get_outputs():
output = self._output_buffer[output_name]
if output.get_data_count()==0:
continue
output_children = self._children[output_name]
for child in output_children:
self.print(f'Output {output_name} calling child {child["node"].name} input {child["input_name"]} ({output.get_data_count()} samples)')
child['run'](output)
def _thread_runner(self):
while not self._stop_event.is_set():
with self.condition:
self.condition.wait()
while not self.local_storage.empty():
input_name, data = self.local_storage.get()
try:
self.is_running_main_process = True
self._run(data, input_name)
except Exception as e:
self.print(f'Error: {e}', exception=e)
raise e
finally:
self.is_running_main_process = False
if self._is_next_node_call_enabled():
self._call_children()
self.new_data_available = False
[docs] def run(self, data: FrameworkData = None, input_name: str = None) -> None:
self.local_storage.put((input_name, data))
with self.condition:
self.new_data_available = True
self.condition.notify()
[docs] def check_output(self, output_name: str) -> None:
if output_name not in self._get_outputs():
raise ValueError('error'
'.invalid'
'.value'
'.node'
'.output')
[docs] @classmethod
def from_config_json(cls, parameters: dict):
"""Returns node instance from given parameters in dict form
:param parameters: Node parameters in dict form.
:type parameters: dict
"""
return cls(parameters)
@abc.abstractmethod
def _run(self, data: FrameworkData, input_name: str) -> None:
"""Node self implementation of processing on input data
:param data: Node input data.
:type data: FrameworkData
:param input_name: Node input name.
:type input_name: str
"""
raise NotImplementedError()
@abc.abstractmethod
def _is_next_node_call_enabled(self) -> bool:
"""Node self implementation to check if child nodes should be called.
"""
raise NotImplementedError()
@abc.abstractmethod
def _initialize_buffer_options(self, buffer_options: dict) -> None:
"""Node self implementation of buffer behaviour options initialization
:param buffer_options: Buffer behaviour options.
:type buffer_options: dict
"""
raise NotImplementedError()
@abc.abstractmethod
def _get_inputs(self) -> List[str]:
"""Returns the input names in list form.
"""
raise NotImplementedError()
@abc.abstractmethod
def _get_outputs(self) -> List[str]:
"""Returns the output names in list form.
"""
raise NotImplementedError()
[docs] def dispose_all(self) -> None:
"""Disposes itself and all its children nodes
"""
if self._is_disposed:
return
self._is_disposed = True
self._dispose_all_children()
self.dispose()
self._dispose()
def _dispose(self) -> None:
self.print('Disposing...')
self._stop_event.set()
with self.condition:
self.condition.notify()
if self.thread:
self.thread.join()
self.running = False
return
[docs] @abc.abstractmethod
def dispose(self) -> None:
"""Node self implementation of disposal of allocated resources.
"""
raise NotImplementedError()
[docs] def print(self, message: str, exception: Exception = None) -> None:
if self._enable_log or not exception is None:
print(f'{time.time()} - {self._MODULE_NAME}.{self.name} - {message}\n')
if exception:
print('Stack trace:')
traceback.print_exc()
@property
def module_name(self):
return self._MODULE_NAME