import statistics
import copy
from bisect import bisect_left
from typing import List, Dict, Final
from models.exception.invalid_parameter_value import InvalidParameterValue
from models.exception.missing_parameter import MissingParameterError
from models.framework_data import FrameworkData
from models.node.processing.processing_node import ProcessingNode
[docs]class Synchronize(ProcessingNode):
_MODULE_NAME: Final[str] = 'node.processing.synchronize'
INPUT_MASTER_MAIN: Final[str] = 'master_main'
INPUT_MASTER_TIMESTAMP: Final[str] = 'master_timestamp'
INPUT_SLAVE_MAIN: Final[str] = 'slave_main'
INPUT_SLAVE_TIMESTAMP: Final[str] = 'slave_timestamp'
OUTPUT_SYNCHRONIZED_SLAVE: Final[str] = 'synchronized_slave'
OUTPUT_SYNCHRONIZED_MASTER: Final[str] = 'synchronized_master'
OUTPUT_SYNCHRONIZED_TIMESTAMP: Final[str] = 'synchronized_timestamp'
FILL_TYPE_ZEROFILL: Final[str] = 'zero_fill'
FILL_TYPE_SAMPLE_AND_HOLD: Final[str] = 'sample_and_hold'
def _validate_parameters(self, parameters: dict):
parameters['buffer_options']['clear_input_buffer_after_process'] = True
super()._validate_parameters(parameters)
if 'slave_filling' not in parameters:
raise MissingParameterError(module=self._MODULE_NAME, name=self.name,
parameter='slave_filling')
if parameters['slave_filling'] not in [self.FILL_TYPE_ZEROFILL, self.FILL_TYPE_SAMPLE_AND_HOLD]:
raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name,
parameter='slave_filling',
cause=f'not_in_[{self.FILL_TYPE_ZEROFILL},{self.FILL_TYPE_SAMPLE_AND_HOLD}]')
if 'statistics_enabled' in parameters and type(parameters['statistics_enabled']) is not bool:
raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name,
parameter='statistics_enabled',
cause='must_be_bool')
def _initialize_parameter_fields(self, parameters: dict):
super()._initialize_parameter_fields(parameters)
self._statistics_enabled = parameters['statistics_enabled'] if 'statistics_enabled' in parameters else False
self._zero_fill = parameters['slave_filling'] == self.FILL_TYPE_ZEROFILL
self._sample_and_hold = parameters['slave_filling'] == self.FILL_TYPE_SAMPLE_AND_HOLD
self._sync_errors: List[float] = []
self._exec_index = 0
self._last_valid_data = None
self._initialize_sync_buffer()
def _is_next_node_call_enabled(self) -> bool:
return True
def _initialize_sync_buffer(self):
"""Sets sync buffer to new empty object for each input name
"""
self._sync_buffer = {}
for input_name in self._get_inputs():
self._sync_buffer[input_name] = FrameworkData()
def _insert_data_in_sync_buffer(self, data: Dict[str, FrameworkData]):
input_data = copy.deepcopy(data)
for input_name in self._get_inputs():
self._sync_buffer[input_name].extend(input_data[input_name])
def _check_for_timestamp_intersection(self, slave_timestamp_data: List[float],
master_timestamp_data: List[float]) -> bool:
slave_start = slave_timestamp_data[0]
slave_end = slave_timestamp_data[-1]
master_start = master_timestamp_data[0]
master_end = master_timestamp_data[-1]
return slave_start < master_end and slave_end > master_start
def _move_input_buffer_to_sync_buffer(self):
slave_main_length = self._input_buffer[self.INPUT_SLAVE_MAIN].get_data_count()
slave_timestamp_length = self._input_buffer[self.INPUT_SLAVE_TIMESTAMP].get_data_count()
master_main_length = self._input_buffer[self.INPUT_MASTER_MAIN].get_data_count()
master_timestamp_length = self._input_buffer[self.INPUT_MASTER_TIMESTAMP].get_data_count()
slave_main = self._input_buffer[self.INPUT_SLAVE_MAIN]
slave_timestamp = self._input_buffer[self.INPUT_SLAVE_TIMESTAMP]
master_main = self._input_buffer[self.INPUT_MASTER_MAIN]
master_timestamp = self._input_buffer[self.INPUT_MASTER_TIMESTAMP]
self._sync_buffer[self.INPUT_SLAVE_MAIN].extend(slave_main.splice(0, min(slave_main_length, slave_timestamp_length)))
self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].extend(slave_timestamp.splice(0, min(slave_main_length, slave_timestamp_length)))
self._sync_buffer[self.INPUT_MASTER_MAIN].extend(master_main.splice(0, min(master_main_length, master_timestamp_length)))
self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].extend(master_timestamp.splice(0, min(master_main_length, master_timestamp_length)))
def _process_input_buffer(self):
self._move_input_buffer_to_sync_buffer()
if not self._is_processing_condition_satisfied():
return
processed_data = self._process(copy.deepcopy(self._sync_buffer))
if self._clear_output_buffer_after_process:
self._clear_output_buffer()
self.print('Outputting data')
for output_name in self._get_outputs():
self._insert_new_output_data(processed_data[output_name], output_name)
def _is_processing_condition_satisfied(self) -> bool:
input_data = self._sync_buffer
slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP]
master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP]
slave_main = input_data[self.INPUT_SLAVE_MAIN]
master_main = input_data[self.INPUT_MASTER_MAIN]
return (slave_timestamp.get_data_count() > 2
and master_timestamp.get_data_count() > 2
and slave_main.get_data_count() == slave_timestamp.get_data_count()
and master_main.get_data_count() == master_timestamp.get_data_count()
and self._check_for_timestamp_intersection(slave_timestamp.get_data_single_channel(), master_timestamp.get_data_single_channel()))
def _process(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]:
self._exec_index += 1
if self._exec_index == 1:
input_data = self._trim_start(input_data)
input_data = self._trim_end(input_data)
master_timestamp_data = input_data[self.INPUT_MASTER_TIMESTAMP].get_data_single_channel()
slave_main = input_data[self.INPUT_SLAVE_MAIN]
slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP]
slave_timestamp_data = slave_timestamp.get_data_single_channel()
if slave_timestamp.get_data_count() < 1 or len(master_timestamp_data) < 1:
return {
self.OUTPUT_SYNCHRONIZED_SLAVE: slave_main,
self.OUTPUT_SYNCHRONIZED_MASTER: input_data[self.INPUT_MASTER_MAIN],
self.OUTPUT_SYNCHRONIZED_TIMESTAMP: input_data[self.INPUT_MASTER_TIMESTAMP]
}
filled_slave_data = self._fill(master_timestamp_data, slave_timestamp_data, slave_main,
input_data[self.INPUT_MASTER_TIMESTAMP].sampling_frequency)
return {
self.OUTPUT_SYNCHRONIZED_SLAVE: filled_slave_data,
self.OUTPUT_SYNCHRONIZED_MASTER: input_data[self.INPUT_MASTER_MAIN],
self.OUTPUT_SYNCHRONIZED_TIMESTAMP: input_data[self.INPUT_MASTER_TIMESTAMP]
}
def _statistics(self, sync_error_microseconds: float):
if self._statistics_enabled:
self._sync_errors.append(sync_error_microseconds)
print(f'---------------------------------------------------------------'
f'\nError is:\t\t\t\t{sync_error_microseconds} uS'
f'\nMean Error is:\t\t\t{statistics.mean(self._sync_errors)} uS'
f'\n---------------------------------------------------------------')
def _get_inputs(self) -> List[str]:
return [
self.INPUT_MASTER_MAIN,
self.INPUT_MASTER_TIMESTAMP,
self.INPUT_SLAVE_MAIN,
self.INPUT_SLAVE_TIMESTAMP,
]
def _get_outputs(self) -> List[str]:
return [
self.OUTPUT_SYNCHRONIZED_SLAVE,
self.OUTPUT_SYNCHRONIZED_MASTER,
self.OUTPUT_SYNCHRONIZED_TIMESTAMP
]
def _get_closest_timestamp_index_in_master(
self,
master_timestamp: List[float],
slave_timestamp: float,
master_timestamp_avg_increment: float,
master_max_index: int) -> int:
if slave_timestamp < master_timestamp[0]:
return 0
estimated_index = int((slave_timestamp - master_timestamp[0]) / master_timestamp_avg_increment)
bisect_index = bisect_left(master_timestamp, slave_timestamp)
if estimated_index < 0:
return 0
if estimated_index < 0 and slave_timestamp <= master_timestamp[0]:
return 0
elif estimated_index > master_max_index and slave_timestamp >= master_timestamp[master_max_index]:
return master_max_index
elif estimated_index == 0 and slave_timestamp < master_timestamp[1]:
return estimated_index
elif estimated_index == master_max_index and (slave_timestamp > master_timestamp[master_max_index - 1]):
return estimated_index
while 0 < estimated_index < master_max_index:
if master_timestamp[estimated_index - 1] <= slave_timestamp <= master_timestamp[estimated_index + 1]:
return estimated_index
elif slave_timestamp < master_timestamp[estimated_index]:
estimated_index -= 1
elif slave_timestamp > master_timestamp[estimated_index]:
estimated_index += 1
closest_point: int = min(range(len(master_timestamp)),
key=lambda i: abs(master_timestamp[i] - slave_timestamp))
return closest_point
def _trim_start(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]:
index = 0
slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP]
slave_main = input_data[self.INPUT_SLAVE_MAIN]
master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP]
master_main = input_data[self.INPUT_MASTER_MAIN]
slave_timestamp_data = slave_timestamp.get_data_single_channel()
master_timestamp_data = master_timestamp.get_data_single_channel()
slave_avg_increment = (slave_timestamp_data[-1] - slave_timestamp_data[0]) / len(slave_timestamp_data)
master_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / len(master_timestamp_data)
should_trim_slave = slave_timestamp_data[index] < master_timestamp_data[index]
should_trim_master = master_timestamp_data[index] < slave_timestamp_data[index]
if should_trim_slave:
slave_index = self._get_closest_timestamp_index_in_master(
slave_timestamp_data,
master_timestamp_data[index],
slave_avg_increment,
len(slave_timestamp_data) - 1
)
start_index = 0
remove_count = slave_index
slave_timestamp.splice(start_index, remove_count)
slave_main.splice(start_index, remove_count)
self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, slave_index)
self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, slave_index)
elif should_trim_master:
master_index = self._get_closest_timestamp_index_in_master(
master_timestamp_data,
slave_timestamp_data[index],
master_avg_increment,
len(master_timestamp_data) - 1
)
start_index = 0
remove_count = master_index
master_timestamp.splice(start_index, remove_count)
master_main.splice(start_index, remove_count)
self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, master_index)
self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, master_index)
return {
self.INPUT_MASTER_MAIN: master_main,
self.INPUT_MASTER_TIMESTAMP: master_timestamp,
self.INPUT_SLAVE_MAIN: slave_main,
self.INPUT_SLAVE_TIMESTAMP: slave_timestamp
}
def _trim_end(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]:
index = -1
slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP]
slave_main = input_data[self.INPUT_SLAVE_MAIN]
master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP]
master_main = input_data[self.INPUT_MASTER_MAIN]
slave_timestamp_data = slave_timestamp.get_data_single_channel()
master_timestamp_data = master_timestamp.get_data_single_channel()
slave_avg_increment = (slave_timestamp_data[-1] - slave_timestamp_data[0]) / len(slave_timestamp_data)
master_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / len(master_timestamp_data)
should_trim_slave = slave_timestamp_data[index] > master_timestamp_data[index]
should_trim_master = master_timestamp_data[index] > slave_timestamp_data[index]
if should_trim_slave:
while int((slave_timestamp_data[index] - master_timestamp_data[0]) / master_avg_increment) > len(
master_timestamp_data) - 1:
index -= 1
if index < -len(slave_timestamp_data):
return {
self.INPUT_MASTER_MAIN: FrameworkData(sampling_frequency_hz=master_main.sampling_frequency,
channels=master_main.channels),
self.INPUT_MASTER_TIMESTAMP: FrameworkData(
sampling_frequency_hz=master_timestamp.sampling_frequency,
channels=master_timestamp.channels),
self.INPUT_SLAVE_MAIN: FrameworkData(sampling_frequency_hz=slave_main.sampling_frequency,
channels=slave_main.channels),
self.INPUT_SLAVE_TIMESTAMP: FrameworkData(
sampling_frequency_hz=slave_timestamp.sampling_frequency, channels=slave_timestamp.channels)
}
master_index = self._get_closest_timestamp_index_in_master(
master_timestamp_data,
slave_timestamp_data[index],
master_avg_increment,
len(master_timestamp_data) - 1
)
# keep slave data from index to end in sync buffer and remove the rest
self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, len(slave_timestamp_data) + index + 1)
self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, len(slave_timestamp_data) + index + 1)
# process slave data from 0 to index
slave_main.splice(len(slave_timestamp_data) + index + 1, -index)
slave_timestamp.splice(len(slave_timestamp_data) + index + 1, -index)
# keep master data from master_index to end in sync buffer and remove the rest
self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, master_index + 1)
self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, master_index + 1)
# process master data from 0 to master_index
master_main.splice(master_index + 1, len(master_timestamp_data) - master_index)
master_timestamp.splice(master_index + 1, len(master_timestamp_data) - master_index)
elif should_trim_master:
while int((master_timestamp_data[index] - slave_timestamp_data[0]) / slave_avg_increment) > len(
slave_timestamp_data) - 1:
index -= 1
if index < -len(master_timestamp_data):
# return empty data
return {
self.INPUT_MASTER_MAIN: FrameworkData(sampling_frequency_hz=master_main.sampling_frequency,
channels=master_main.channels),
self.INPUT_MASTER_TIMESTAMP: FrameworkData(
sampling_frequency_hz=master_timestamp.sampling_frequency,
channels=master_timestamp.channels),
self.INPUT_SLAVE_MAIN: FrameworkData(sampling_frequency_hz=slave_main.sampling_frequency,
channels=slave_main.channels),
self.INPUT_SLAVE_TIMESTAMP: FrameworkData(
sampling_frequency_hz=slave_timestamp.sampling_frequency, channels=slave_timestamp.channels)
}
slave_index = self._get_closest_timestamp_index_in_master(
slave_timestamp_data,
master_timestamp_data[index],
slave_avg_increment,
len(slave_timestamp_data) - 1
)
# keep master data from index to end in sync buffer and remove the rest
self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, len(master_timestamp_data) + index + 1)
self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, len(master_timestamp_data) + index + 1)
# process master data from 0 to index
master_main.splice(len(master_timestamp_data) + index + 1, -index)
master_timestamp.splice(len(master_timestamp_data) + index + 1, -index)
# keep slave data from slave_index to end in sync buffer and remove the rest
self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, slave_index + 1)
self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, slave_index + 1)
# process slave data from 0 to slave_index
slave_main.splice(slave_index + 1, len(master_timestamp_data) - slave_index)
slave_timestamp.splice(slave_index + 1, len(master_timestamp_data) - slave_index)
return {
self.INPUT_MASTER_MAIN: master_main,
self.INPUT_MASTER_TIMESTAMP: master_timestamp,
self.INPUT_SLAVE_MAIN: slave_main,
self.INPUT_SLAVE_TIMESTAMP: slave_timestamp
}
def _fill(self, master_timestamp: List[float], slave_timestamp: List[float],
slave_main: FrameworkData, master_sampling_frequency: float) -> FrameworkData:
"""Fills slave data to align with master timestamps using sample-and-hold."""
# Create FrameworkData to store filled data
fill_data = FrameworkData(master_sampling_frequency, slave_main.channels)
# Calculate average increment between master timestamps
max_slave_index = len(slave_timestamp) - 1
max_master_index = len(master_timestamp) - 1
previous_master_index = -1 # Start before the first index
# Iterate over slave timestamps
for current_slave_index, slave_time in enumerate(slave_timestamp):
# Ignore repeated slave timestamps
if current_slave_index > 0 and slave_time == slave_timestamp[current_slave_index - 1]:
continue
# Use binary search to find the closest master timestamp index
master_index = bisect_left(master_timestamp, slave_time)
# Ensure master_index does not exceed valid range
if master_index > max_master_index:
master_index = max_master_index
# Calculate how many master timestamps need to be filled
fill_size = master_index - previous_master_index - 1
# If there's a gap, fill it using the last valid data or 0
if fill_size > 0 and self._last_valid_data is not None:
for channel in slave_main.channels:
fill_content = [self._last_valid_data[channel]] * fill_size if self._sample_and_hold else [
0] * fill_size
fill_data.input_data_on_channel(fill_content, channel)
# If aligning slave data, or it's the first valid data point
if (
self._last_valid_data is None or master_index >= previous_master_index) and current_slave_index <= max_slave_index:
# Update last valid data with current slave data
self._last_valid_data = slave_main.get_data_at_index(current_slave_index)
# Store slave data for all channels at the aligned master timestamp
for channel in slave_main.channels:
fill_data.input_data_on_channel([self._last_valid_data[channel]], channel)
# Update the previous master index for the next iteration
previous_master_index = master_index
# Fill any remaining master timestamps with sample-and-hold or 0
remaining_fill_size = max_master_index - previous_master_index
if remaining_fill_size > 0 and self._last_valid_data is not None:
for channel in slave_main.channels:
fill_content = [self._last_valid_data[channel]] * remaining_fill_size if self._sample_and_hold else [
0] * remaining_fill_size
fill_data.input_data_on_channel(fill_content, channel)
return fill_data