Source code for models.node.processing.synchronize

import statistics
import copy
from bisect import bisect_left
from typing import List, Dict, Final

from models.exception.invalid_parameter_value import InvalidParameterValue
from models.exception.missing_parameter import MissingParameterError
from models.framework_data import FrameworkData
from models.node.processing.processing_node import ProcessingNode


[docs]class Synchronize(ProcessingNode): _MODULE_NAME: Final[str] = 'node.processing.synchronize' INPUT_MASTER_MAIN: Final[str] = 'master_main' INPUT_MASTER_TIMESTAMP: Final[str] = 'master_timestamp' INPUT_SLAVE_MAIN: Final[str] = 'slave_main' INPUT_SLAVE_TIMESTAMP: Final[str] = 'slave_timestamp' OUTPUT_SYNCHRONIZED_SLAVE: Final[str] = 'synchronized_slave' OUTPUT_SYNCHRONIZED_MASTER: Final[str] = 'synchronized_master' OUTPUT_SYNCHRONIZED_TIMESTAMP: Final[str] = 'synchronized_timestamp' FILL_TYPE_ZEROFILL: Final[str] = 'zero_fill' FILL_TYPE_SAMPLE_AND_HOLD: Final[str] = 'sample_and_hold' def _validate_parameters(self, parameters: dict): parameters['buffer_options']['clear_input_buffer_after_process'] = True super()._validate_parameters(parameters) if 'slave_filling' not in parameters: raise MissingParameterError(module=self._MODULE_NAME, name=self.name, parameter='slave_filling') if parameters['slave_filling'] not in [self.FILL_TYPE_ZEROFILL, self.FILL_TYPE_SAMPLE_AND_HOLD]: raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name, parameter='slave_filling', cause=f'not_in_[{self.FILL_TYPE_ZEROFILL},{self.FILL_TYPE_SAMPLE_AND_HOLD}]') if 'statistics_enabled' in parameters and type(parameters['statistics_enabled']) is not bool: raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name, parameter='statistics_enabled', cause='must_be_bool') def _initialize_parameter_fields(self, parameters: dict): super()._initialize_parameter_fields(parameters) self._statistics_enabled = parameters['statistics_enabled'] if 'statistics_enabled' in parameters else False self._zero_fill = parameters['slave_filling'] == self.FILL_TYPE_ZEROFILL self._sample_and_hold = parameters['slave_filling'] == self.FILL_TYPE_SAMPLE_AND_HOLD self._sync_errors: List[float] = [] self._exec_index = 0 self._last_valid_data = None self._initialize_sync_buffer() def _is_next_node_call_enabled(self) -> bool: return True def _initialize_sync_buffer(self): """Sets sync buffer to new empty object for each input name """ self._sync_buffer = {} for input_name in self._get_inputs(): self._sync_buffer[input_name] = FrameworkData() def _insert_data_in_sync_buffer(self, data: Dict[str, FrameworkData]): input_data = copy.deepcopy(data) for input_name in self._get_inputs(): self._sync_buffer[input_name].extend(input_data[input_name]) def _check_for_timestamp_intersection(self, slave_timestamp_data: List[float], master_timestamp_data: List[float]) -> bool: slave_start = slave_timestamp_data[0] slave_end = slave_timestamp_data[-1] master_start = master_timestamp_data[0] master_end = master_timestamp_data[-1] return slave_start < master_end and slave_end > master_start def _move_input_buffer_to_sync_buffer(self): slave_main_length = self._input_buffer[self.INPUT_SLAVE_MAIN].get_data_count() slave_timestamp_length = self._input_buffer[self.INPUT_SLAVE_TIMESTAMP].get_data_count() master_main_length = self._input_buffer[self.INPUT_MASTER_MAIN].get_data_count() master_timestamp_length = self._input_buffer[self.INPUT_MASTER_TIMESTAMP].get_data_count() slave_main = self._input_buffer[self.INPUT_SLAVE_MAIN] slave_timestamp = self._input_buffer[self.INPUT_SLAVE_TIMESTAMP] master_main = self._input_buffer[self.INPUT_MASTER_MAIN] master_timestamp = self._input_buffer[self.INPUT_MASTER_TIMESTAMP] self._sync_buffer[self.INPUT_SLAVE_MAIN].extend(slave_main.splice(0, min(slave_main_length, slave_timestamp_length))) self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].extend(slave_timestamp.splice(0, min(slave_main_length, slave_timestamp_length))) self._sync_buffer[self.INPUT_MASTER_MAIN].extend(master_main.splice(0, min(master_main_length, master_timestamp_length))) self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].extend(master_timestamp.splice(0, min(master_main_length, master_timestamp_length))) def _process_input_buffer(self): self._move_input_buffer_to_sync_buffer() if not self._is_processing_condition_satisfied(): return processed_data = self._process(copy.deepcopy(self._sync_buffer)) if self._clear_output_buffer_after_process: self._clear_output_buffer() self.print('Outputting data') for output_name in self._get_outputs(): self._insert_new_output_data(processed_data[output_name], output_name) def _is_processing_condition_satisfied(self) -> bool: input_data = self._sync_buffer slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP] slave_main = input_data[self.INPUT_SLAVE_MAIN] master_main = input_data[self.INPUT_MASTER_MAIN] return (slave_timestamp.get_data_count() > 2 and master_timestamp.get_data_count() > 2 and slave_main.get_data_count() == slave_timestamp.get_data_count() and master_main.get_data_count() == master_timestamp.get_data_count() and self._check_for_timestamp_intersection(slave_timestamp.get_data_single_channel(), master_timestamp.get_data_single_channel())) def _process(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: self._exec_index += 1 if self._exec_index == 1: input_data = self._trim_start(input_data) input_data = self._trim_end(input_data) master_timestamp_data = input_data[self.INPUT_MASTER_TIMESTAMP].get_data_single_channel() slave_main = input_data[self.INPUT_SLAVE_MAIN] slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] slave_timestamp_data = slave_timestamp.get_data_single_channel() if slave_timestamp.get_data_count() < 1 or len(master_timestamp_data) < 1: return { self.OUTPUT_SYNCHRONIZED_SLAVE: slave_main, self.OUTPUT_SYNCHRONIZED_MASTER: input_data[self.INPUT_MASTER_MAIN], self.OUTPUT_SYNCHRONIZED_TIMESTAMP: input_data[self.INPUT_MASTER_TIMESTAMP] } filled_slave_data = self._fill(master_timestamp_data, slave_timestamp_data, slave_main, input_data[self.INPUT_MASTER_TIMESTAMP].sampling_frequency) return { self.OUTPUT_SYNCHRONIZED_SLAVE: filled_slave_data, self.OUTPUT_SYNCHRONIZED_MASTER: input_data[self.INPUT_MASTER_MAIN], self.OUTPUT_SYNCHRONIZED_TIMESTAMP: input_data[self.INPUT_MASTER_TIMESTAMP] } def _statistics(self, sync_error_microseconds: float): if self._statistics_enabled: self._sync_errors.append(sync_error_microseconds) print(f'---------------------------------------------------------------' f'\nError is:\t\t\t\t{sync_error_microseconds} uS' f'\nMean Error is:\t\t\t{statistics.mean(self._sync_errors)} uS' f'\n---------------------------------------------------------------') def _get_inputs(self) -> List[str]: return [ self.INPUT_MASTER_MAIN, self.INPUT_MASTER_TIMESTAMP, self.INPUT_SLAVE_MAIN, self.INPUT_SLAVE_TIMESTAMP, ] def _get_outputs(self) -> List[str]: return [ self.OUTPUT_SYNCHRONIZED_SLAVE, self.OUTPUT_SYNCHRONIZED_MASTER, self.OUTPUT_SYNCHRONIZED_TIMESTAMP ] def _get_closest_timestamp_index_in_master( self, master_timestamp: List[float], slave_timestamp: float, master_timestamp_avg_increment: float, master_max_index: int) -> int: if slave_timestamp < master_timestamp[0]: return 0 estimated_index = int((slave_timestamp - master_timestamp[0]) / master_timestamp_avg_increment) bisect_index = bisect_left(master_timestamp, slave_timestamp) if estimated_index < 0: return 0 if estimated_index < 0 and slave_timestamp <= master_timestamp[0]: return 0 elif estimated_index > master_max_index and slave_timestamp >= master_timestamp[master_max_index]: return master_max_index elif estimated_index == 0 and slave_timestamp < master_timestamp[1]: return estimated_index elif estimated_index == master_max_index and (slave_timestamp > master_timestamp[master_max_index - 1]): return estimated_index while 0 < estimated_index < master_max_index: if master_timestamp[estimated_index - 1] <= slave_timestamp <= master_timestamp[estimated_index + 1]: return estimated_index elif slave_timestamp < master_timestamp[estimated_index]: estimated_index -= 1 elif slave_timestamp > master_timestamp[estimated_index]: estimated_index += 1 closest_point: int = min(range(len(master_timestamp)), key=lambda i: abs(master_timestamp[i] - slave_timestamp)) return closest_point def _trim_start(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: index = 0 slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] slave_main = input_data[self.INPUT_SLAVE_MAIN] master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP] master_main = input_data[self.INPUT_MASTER_MAIN] slave_timestamp_data = slave_timestamp.get_data_single_channel() master_timestamp_data = master_timestamp.get_data_single_channel() slave_avg_increment = (slave_timestamp_data[-1] - slave_timestamp_data[0]) / len(slave_timestamp_data) master_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / len(master_timestamp_data) should_trim_slave = slave_timestamp_data[index] < master_timestamp_data[index] should_trim_master = master_timestamp_data[index] < slave_timestamp_data[index] if should_trim_slave: slave_index = self._get_closest_timestamp_index_in_master( slave_timestamp_data, master_timestamp_data[index], slave_avg_increment, len(slave_timestamp_data) - 1 ) start_index = 0 remove_count = slave_index slave_timestamp.splice(start_index, remove_count) slave_main.splice(start_index, remove_count) self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, slave_index) self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, slave_index) elif should_trim_master: master_index = self._get_closest_timestamp_index_in_master( master_timestamp_data, slave_timestamp_data[index], master_avg_increment, len(master_timestamp_data) - 1 ) start_index = 0 remove_count = master_index master_timestamp.splice(start_index, remove_count) master_main.splice(start_index, remove_count) self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, master_index) self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, master_index) return { self.INPUT_MASTER_MAIN: master_main, self.INPUT_MASTER_TIMESTAMP: master_timestamp, self.INPUT_SLAVE_MAIN: slave_main, self.INPUT_SLAVE_TIMESTAMP: slave_timestamp } def _trim_end(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: index = -1 slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] slave_main = input_data[self.INPUT_SLAVE_MAIN] master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP] master_main = input_data[self.INPUT_MASTER_MAIN] slave_timestamp_data = slave_timestamp.get_data_single_channel() master_timestamp_data = master_timestamp.get_data_single_channel() slave_avg_increment = (slave_timestamp_data[-1] - slave_timestamp_data[0]) / len(slave_timestamp_data) master_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / len(master_timestamp_data) should_trim_slave = slave_timestamp_data[index] > master_timestamp_data[index] should_trim_master = master_timestamp_data[index] > slave_timestamp_data[index] if should_trim_slave: while int((slave_timestamp_data[index] - master_timestamp_data[0]) / master_avg_increment) > len( master_timestamp_data) - 1: index -= 1 if index < -len(slave_timestamp_data): return { self.INPUT_MASTER_MAIN: FrameworkData(sampling_frequency_hz=master_main.sampling_frequency, channels=master_main.channels), self.INPUT_MASTER_TIMESTAMP: FrameworkData( sampling_frequency_hz=master_timestamp.sampling_frequency, channels=master_timestamp.channels), self.INPUT_SLAVE_MAIN: FrameworkData(sampling_frequency_hz=slave_main.sampling_frequency, channels=slave_main.channels), self.INPUT_SLAVE_TIMESTAMP: FrameworkData( sampling_frequency_hz=slave_timestamp.sampling_frequency, channels=slave_timestamp.channels) } master_index = self._get_closest_timestamp_index_in_master( master_timestamp_data, slave_timestamp_data[index], master_avg_increment, len(master_timestamp_data) - 1 ) # keep slave data from index to end in sync buffer and remove the rest self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, len(slave_timestamp_data) + index + 1) self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, len(slave_timestamp_data) + index + 1) # process slave data from 0 to index slave_main.splice(len(slave_timestamp_data) + index + 1, -index) slave_timestamp.splice(len(slave_timestamp_data) + index + 1, -index) # keep master data from master_index to end in sync buffer and remove the rest self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, master_index + 1) self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, master_index + 1) # process master data from 0 to master_index master_main.splice(master_index + 1, len(master_timestamp_data) - master_index) master_timestamp.splice(master_index + 1, len(master_timestamp_data) - master_index) elif should_trim_master: while int((master_timestamp_data[index] - slave_timestamp_data[0]) / slave_avg_increment) > len( slave_timestamp_data) - 1: index -= 1 if index < -len(master_timestamp_data): # return empty data return { self.INPUT_MASTER_MAIN: FrameworkData(sampling_frequency_hz=master_main.sampling_frequency, channels=master_main.channels), self.INPUT_MASTER_TIMESTAMP: FrameworkData( sampling_frequency_hz=master_timestamp.sampling_frequency, channels=master_timestamp.channels), self.INPUT_SLAVE_MAIN: FrameworkData(sampling_frequency_hz=slave_main.sampling_frequency, channels=slave_main.channels), self.INPUT_SLAVE_TIMESTAMP: FrameworkData( sampling_frequency_hz=slave_timestamp.sampling_frequency, channels=slave_timestamp.channels) } slave_index = self._get_closest_timestamp_index_in_master( slave_timestamp_data, master_timestamp_data[index], slave_avg_increment, len(slave_timestamp_data) - 1 ) # keep master data from index to end in sync buffer and remove the rest self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, len(master_timestamp_data) + index + 1) self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, len(master_timestamp_data) + index + 1) # process master data from 0 to index master_main.splice(len(master_timestamp_data) + index + 1, -index) master_timestamp.splice(len(master_timestamp_data) + index + 1, -index) # keep slave data from slave_index to end in sync buffer and remove the rest self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, slave_index + 1) self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, slave_index + 1) # process slave data from 0 to slave_index slave_main.splice(slave_index + 1, len(master_timestamp_data) - slave_index) slave_timestamp.splice(slave_index + 1, len(master_timestamp_data) - slave_index) return { self.INPUT_MASTER_MAIN: master_main, self.INPUT_MASTER_TIMESTAMP: master_timestamp, self.INPUT_SLAVE_MAIN: slave_main, self.INPUT_SLAVE_TIMESTAMP: slave_timestamp } def _fill(self, master_timestamp: List[float], slave_timestamp: List[float], slave_main: FrameworkData, master_sampling_frequency: float) -> FrameworkData: """Fills slave data to align with master timestamps using sample-and-hold.""" # Create FrameworkData to store filled data fill_data = FrameworkData(master_sampling_frequency, slave_main.channels) # Calculate average increment between master timestamps max_slave_index = len(slave_timestamp) - 1 max_master_index = len(master_timestamp) - 1 previous_master_index = -1 # Start before the first index # Iterate over slave timestamps for current_slave_index, slave_time in enumerate(slave_timestamp): # Ignore repeated slave timestamps if current_slave_index > 0 and slave_time == slave_timestamp[current_slave_index - 1]: continue # Use binary search to find the closest master timestamp index master_index = bisect_left(master_timestamp, slave_time) # Ensure master_index does not exceed valid range if master_index > max_master_index: master_index = max_master_index # Calculate how many master timestamps need to be filled fill_size = master_index - previous_master_index - 1 # If there's a gap, fill it using the last valid data or 0 if fill_size > 0 and self._last_valid_data is not None: for channel in slave_main.channels: fill_content = [self._last_valid_data[channel]] * fill_size if self._sample_and_hold else [ 0] * fill_size fill_data.input_data_on_channel(fill_content, channel) # If aligning slave data, or it's the first valid data point if ( self._last_valid_data is None or master_index >= previous_master_index) and current_slave_index <= max_slave_index: # Update last valid data with current slave data self._last_valid_data = slave_main.get_data_at_index(current_slave_index) # Store slave data for all channels at the aligned master timestamp for channel in slave_main.channels: fill_data.input_data_on_channel([self._last_valid_data[channel]], channel) # Update the previous master index for the next iteration previous_master_index = master_index # Fill any remaining master timestamps with sample-and-hold or 0 remaining_fill_size = max_master_index - previous_master_index if remaining_fill_size > 0 and self._last_valid_data is not None: for channel in slave_main.channels: fill_content = [self._last_valid_data[channel]] * remaining_fill_size if self._sample_and_hold else [ 0] * remaining_fill_size fill_data.input_data_on_channel(fill_content, channel) return fill_data