import csv
import os
from typing import List, Final, Dict
from models.exception.invalid_parameter_value import InvalidParameterValue
from models.exception.missing_parameter import MissingParameterError
from models.framework_data import FrameworkData
from models.node.output.output_node import OutputNode
[docs]class CSVFile(OutputNode):
""" This node is capable of creating/writing the output data to a CSV file.
Attributes:
_MODULE_NAME (str): The name of this module (in this case, 'node.output.file.csvfile').
INPUT_MAIN (str): The name of the main input (in this case, 'main').
``configuration.json`` usage example:
**module**: Current module name (in this case ``models.node.output.file``).\n
**name**: Current node instance name (in this case, ``CSVFile``).\n
**file_path** (str): The path to the CSV file that will be created/written to.\n
**buffer_options** (dict): The buffer options.\n
**clear_output_buffer_on_data_input** (bool): Whether to clear the output buffer when data is inputted.\n
**clear_input_buffer_after_process** (bool): Whether to clear the input buffer after the process method is called.\n
**clear_output_buffer_after_process** (bool): Whether to clear the output buffer after the process method is called.\n
"""
def _is_processing_condition_satisfied(self) -> bool:
return True
_MODULE_NAME: Final[str] = 'node.output.file.csvfile'
INPUT_MAIN: Final[str] = 'main'
def _validate_parameters(self, parameters: dict):
""" Validates the parameters that were passed to the node.
:param parameters: The parameters that were passed to the node.
:type parameters: dict
:raises MissingParameterError: The ``file_path`` parameter is required.
:raises InvalidParameterValue: The ``file_path`` parameter must be a string.
:raises InvalidParameterValue: The ``file_path`` parameter must be a CSV file.
"""
if 'file_path' not in parameters:
raise MissingParameterError(module=self._MODULE_NAME, name=self.name,
parameter='file_path')
if type(parameters['file_path']) is not str:
raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name,
parameter='file_path',
cause='must_be_string')
if os.path.splitext(parameters['file_path'])[1] != '.csv':
raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name,
parameter='file_path',
cause='must_be_csv_file')
def _initialize_parameter_fields(self, parameters: dict):
""" Initializes the parameters that were passed to the node.
:param parameters: The parameters that were passed to the node.
:type parameters: dict
"""
super()._initialize_parameter_fields(parameters)
self.file_path = parameters['file_path']
if os.path.exists(self.file_path):
os.remove(self.file_path)
# self.file_path = f'{self.file_path[:-4]}_{int(time.time() * 1000)}.csv'
self._csv_file = None
def _get_inputs(self) -> List[str]:
""" Returns the input names of this node.
"""
return [
self.INPUT_MAIN
]
def _init_csv_writer(self, data: FrameworkData) -> None:
""" Initializes the CSV writer.
"""
if self._csv_file is None:
if not os.path.exists(os.sep.join(self.file_path.split(os.sep)[0:-1])):
os.makedirs(os.sep.join(self.file_path.split(os.sep)[0:-1]))
self.print('Creating csv file')
self._csv_file = open(self.file_path, "w", newline='')
self._csv_writer = csv.writer(self._csv_file)
self._channels = None
def _write_csv_columns(self, channels: List[str]) -> None:
""" Writes the CSV columns labels.
:param channels: The channels to write.
:type channels: List[str]
"""
if self._channels is not None:
return
if len(channels) == 0:
return
self._channels = channels
self.print('Writing columns')
self._csv_writer.writerow(self._channels)
def _write_data(self, data: FrameworkData) -> None:
""" Writes the data to the CSV file.
:param data: The data to write.
:type data: FrameworkData
"""
self.print(f'Writing data to file')
formatted_data = zip(*data.get_data().values())
self._csv_writer.writerows(formatted_data)
self.print(f'Done')
def _process(self, data: Dict[str, FrameworkData]) -> None:
""" Runs the node.
"""
self.print(f'Writing {data[self.INPUT_MAIN].get_data_count()} samples to file')
input_data = data[self.INPUT_MAIN]
self._init_csv_writer(input_data)
self._write_csv_columns(input_data.channels)
self._write_data(input_data)
self._csv_file.flush()
[docs] def dispose(self) -> None:
""" Node self implementation of disposal of allocated resources.
"""
self._clear_output_buffer()
self._clear_input_buffer()
if self._csv_file is not None and not self._csv_file.closed:
self._csv_file.close()
self._csv_file = None