I would like to analyze a collection of CSV (comma-separated-values) files in python. Ideally, I would like to treat the csv data as a native python object.
For example,
>>> financial_detail = Report('financial-detail.csv') >>> transactions = {} >>> for row in financial_detail: ... transactions.append(row['Transaction']) ... >>> financial_detail.sum('Tax Amount') Decimal('123456.10') >>>
Additionally, I would like to easily retrieve row data by column name, e.g.,
>>> financial_detail = Report('financial-detail.csv') >>> >>> financial_detail[0]['Transaction'] '6324565' >>> >>> ## the above should be equivalent to >>> financial_detail.headers.index('Transaction') 7 >>> financial_detail[0][7] '6324565' >>>
Given a directory of CSV files, I would like to retrieve these files (magically) by name, e.g.,
>>> reports = Reports('/directory/of/reports/') >>> len(reports.sales_tax) 4884 >>> >>> len(reports.finance_detail) 512916 >>> reports.sales_tax.sum('Amount') - reports.financial_detail.sum('Tax Amount') Decimal('0.0') >>>
This can be accomplished by leveraging the python builtin csv module, the collections.Mapping abstract base class, as well as the python magic __getattr__ methods, i.e.,
!/usr/bin/env python # vim: set tabstop=4 shiftwidth=4 autoindent smartindent: import csv, glob, os, sys import collections from decimal import Decimal ## set the logging level import logging logging.basicConfig( level=logging.DEBUG, format='%(levelname)s:%(filename)s:%(lineno)d -- %(message)s' ) class Report(collections.Mapping): ''' Interface into a CSV Report Parses the csv into an iterable collection and provides memory of pivot sums ''' def __init__(self, *filehints): self.filename = Reports.find_report(*filehints) self.info = [] self.headers = [] self.data = [] self.pivot_sums = {} self.load() def load(self): logging.debug('loading %s' %(self.filename)) fh = open(self.filename) reader = csv.reader(fh) self.info = reader.next() self.headers = reader.next() for row in reader: self.data.append(row) self.curval = 0 def __getitem__(self, key): return VirtualRecord(self.data[key], self) def __iter__(self): self.curval = 0 return self def next(self): if self.curval < 0 or self.curval >= (len(self.data) - 1): raise StopIteration else: self.curval += 1 return VirtualRecord(self.data[self.curval], self) def __len__(self): return len(self.data) def sum(self, col): if col not in self.pivot_sums: logging.debug('computing sum for %s in %s' %(col, ', '.join(self.info))) sum = Decimal(0) index = col if isinstance(index, str): index = [x.lower() for x in self.headers].index(index.lower()) for i in self.data: sum += Decimal(i[index]) if i[index] != '' else 0 self.pivot_sums[col] = sum return self.pivot_sums[col] class VirtualRecord(): ''' A virtual record within a Report object, only one instance exists at a time (not-thread safe) to avoid the memory crunch from large report files ''' _instance = None def __new__(self, *args, **kwargs): if not self._instance: self._instance = super(VirtualRecord, self).__new__(self) return self._instance def __init__(self, row, parent): self.row = row self.parent = parent def __getitem__(self, key): index = key if isinstance(key, str): index = [x.lower() for x in self.parent.headers].index(key.lower()) return self.row[index] def __len__(self): return len(self.row) class Reports: ''' Magic collection of billing reports ''' reports = {} def __init__(self, reportdir = '.'): self.reportdir = reportdir def __getattr__(self, report): ''' magically find reports within self.reportdir For example, if there is a report 'sales-tax.2011-09.20111018.csv' You can access this with an attribute 'sales_tax', e.g., >>> >>> reports = Reports() >>> len(reports.sales_tax) 4884 >>> ''' filename = Reports.find_report(self.reportdir, report.replace('_', '?') + '.*.csv') if filename is None or not os.stat(filename): return None if report not in self.reports: self.reports[report] = Report(filename, table=report) return self.reports[report] @classmethod def find_report(self, *hints): files = glob.glob( os.path.join(*hints) ) if len(files) == 1: return files[0] elif len(files) == 0: logging.error('No reports found: find_report(%s)' %(os.path.join(*hints))) else: logging.error('Found too many matching reports: find_report(%s) => %s' %(os.path.join(*hints), ', '.join(files) ))