I would like to analyze a collection of CSV (comma-separated-values) files in python. Ideally, I would like to treat the csv data as a native python object.
For example,
>>> financial_detail = Report('financial-detail.csv')
>>> transactions = {}
>>> for row in financial_detail:
... transactions.append(row['Transaction'])
...
>>> financial_detail.sum('Tax Amount')
Decimal('123456.10')
>>>
Additionally, I would like to easily retrieve row data by column name, e.g.,
>>> financial_detail = Report('financial-detail.csv')
>>>
>>> financial_detail[0]['Transaction']
'6324565'
>>>
>>> ## the above should be equivalent to
>>> financial_detail.headers.index('Transaction')
7
>>> financial_detail[0][7]
'6324565'
>>>
Given a directory of CSV files, I would like to retrieve these files (magically) by name, e.g.,
>>> reports = Reports('/directory/of/reports/')
>>> len(reports.sales_tax)
4884
>>>
>>> len(reports.finance_detail)
512916
>>> reports.sales_tax.sum('Amount') - reports.financial_detail.sum('Tax Amount')
Decimal('0.0')
>>>
This can be accomplished by leveraging the python builtin csv module, the collections.Mapping abstract base class, as well as the python magic __getattr__ methods, i.e.,
!/usr/bin/env python
# vim: set tabstop=4 shiftwidth=4 autoindent smartindent:
import csv, glob, os, sys
import collections
from decimal import Decimal
## set the logging level
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)s:%(filename)s:%(lineno)d -- %(message)s'
)
class Report(collections.Mapping):
'''
Interface into a CSV Report
Parses the csv into an iterable collection and provides memory of pivot sums
'''
def __init__(self, *filehints):
self.filename = Reports.find_report(*filehints)
self.info = []
self.headers = []
self.data = []
self.pivot_sums = {}
self.load()
def load(self):
logging.debug('loading %s' %(self.filename))
fh = open(self.filename)
reader = csv.reader(fh)
self.info = reader.next()
self.headers = reader.next()
for row in reader:
self.data.append(row)
self.curval = 0
def __getitem__(self, key):
return VirtualRecord(self.data[key], self)
def __iter__(self):
self.curval = 0
return self
def next(self):
if self.curval < 0 or self.curval >= (len(self.data) - 1):
raise StopIteration
else:
self.curval += 1
return VirtualRecord(self.data[self.curval], self)
def __len__(self):
return len(self.data)
def sum(self, col):
if col not in self.pivot_sums:
logging.debug('computing sum for %s in %s' %(col, ', '.join(self.info)))
sum = Decimal(0)
index = col
if isinstance(index, str):
index = [x.lower() for x in self.headers].index(index.lower())
for i in self.data:
sum += Decimal(i[index]) if i[index] != '' else 0
self.pivot_sums[col] = sum
return self.pivot_sums[col]
class VirtualRecord():
'''
A virtual record within a Report object, only one instance exists at a time (not-thread safe)
to avoid the memory crunch from large report files
'''
_instance = None
def __new__(self, *args, **kwargs):
if not self._instance:
self._instance = super(VirtualRecord, self).__new__(self)
return self._instance
def __init__(self, row, parent):
self.row = row
self.parent = parent
def __getitem__(self, key):
index = key
if isinstance(key, str):
index = [x.lower() for x in self.parent.headers].index(key.lower())
return self.row[index]
def __len__(self):
return len(self.row)
class Reports:
'''
Magic collection of billing reports
'''
reports = {}
def __init__(self, reportdir = '.'):
self.reportdir = reportdir
def __getattr__(self, report):
'''
magically find reports within self.reportdir
For example, if there is a report 'sales-tax.2011-09.20111018.csv'
You can access this with an attribute 'sales_tax', e.g.,
>>>
>>> reports = Reports()
>>> len(reports.sales_tax)
4884
>>>
'''
filename = Reports.find_report(self.reportdir, report.replace('_', '?') + '.*.csv')
if filename is None or not os.stat(filename):
return None
if report not in self.reports:
self.reports[report] = Report(filename, table=report)
return self.reports[report]
@classmethod
def find_report(self, *hints):
files = glob.glob( os.path.join(*hints) )
if len(files) == 1:
return files[0]
elif len(files) == 0:
logging.error('No reports found: find_report(%s)' %(os.path.join(*hints)))
else:
logging.error('Found too many matching reports: find_report(%s) => %s' %(os.path.join(*hints), ', '.join(files) ))