from ..base import StreamElement
import logging
import csv
import os
[docs]class Writer(StreamElement):
log = logging.getLogger('pewpew.csv.writer')
[docs] def on_start(self):
self.output_name = self.config.get("filename", "output")
self.n_events = self.config.get("events_per_file", None)
self.output_path = self.config.get("output_path", ".")
self.output_file = None
self.writer = None
self.columns = None
self.current_cycle = 0
self.current_event = 0
@property
def filename(self):
return "{}_{}.csv".format(self.output_name, self.current_cycle)
@property
def path(self):
return os.path.join(self.output_path, self.filename)
[docs] def on_completion(self):
self.log.info("Closing out file and stream")
self.output_file.close()
self.output_file = None
self.writer = None
[docs] def process(self, data):
if self.n_events is not None:
if self.current_event == self.n_events:
msg = "closing out file: {} with events {}/{}"
msg = msg.format(self.path, self.current_event,
self.n_events)
self.log.debug(msg)
self.output_file.close()
self.output_file = None
self.current_event = 0
if self.output_file is None:
if os.path.exists(self.path):
msg = "output path already exists. replacing {}"
self.log.warning(msg.format(self.path))
os.remove(self.path)
self.output_file = open(self.path, 'w')
self.current_cycle += 1
field_names = [i for i in data['data'].keys()]
self.writer = csv.DictWriter(self.output_file,
fieldnames=field_names)
self.writer.writeheader()
self.writer.writerow(data['data'])
self.current_event += 1