Previously, we discussed analyzing CSV files, parsing the csv into a native python object that supports iteration while providing easy access to the data (such as a sum by column header).
For very large files this can be cumbersome, especially where more advanced analytics are desired.
I would like to keep the same simple interface but use an in-memory database connection, thus transforming the CSV files into database tables for deeper analysis.
For example, I would like to do the following (leveraging the builtin sqlite3 module):
>>>
>>> reports = Reports('/home/user/reports-1109')
>>> reports.billing_detail.sum('Tax Amount', {'Fiscal Period':'2011-09'})
Decimal('123321.1')
>>>
>>> reports.tax_summary.sum('Amount', {'Fiscal Period':'2011-09'})
Decimal('123321.1')
>>>
>>> len(reports.billing_detail)
719153
>>>
>>> curs = reports.conn.cursor()
>>> curs.execute('SELECT name FROM sqlite_master WHERE type="table"').fetchall()
[(u'billing_detail',), (u'billing_summary',)]
>>>
This approach can be orders of magnitude faster for even the most basic analysis. Furthermore, this allows OLAP cube analysis of the data from the CSV files, e.g.,
>>>
>>> curs.execute('CREATE TABLE t_fact(id TEXT UNIQUE, b INT, t INT, r INT)').fetchall()
[]
>>> curs.execute('CREATE INDEX IF NOT EXISTS idxt ON t_fact(id)').fetchall()
[]
>>>
>>> ## load some data into the fact table
>>> curs.execute('''INSERT OR REPLACE INTO t_fact(id,b,t,r)
SELECT bd.%(id)s as id, bd.ROWID as b, ts.ROWID as t, rf.ROWID as r
FROM billing_detail bd
LEFT OUTER JOIN tax_summary ts ON bd.%(id)s = ts.%(tax_id)s
LEFT OUTER JOIN refunds r ON bd.%(id)s = rf.%(ref_id)s
''' % query_dict).fetchall()
[]
>>>
>>> ## e.g., find billing records without tax summaries
>>> billings_without_tax = curs.execute('SELECT id FROM t_fact WHERE t IS NULL').fetchall()
>>>
Using the same Report and Reports objects discussed previously, the code can be modified to leverage a database connection to support this type of analytics:
class Report(collections.Mapping):
def __init__(self, filehint, table = None, conn = sqlite3.connect(':memory:')):
self.filename = Reports.find_report(filehint)
self.info = []
self.headers = []
self.table = table
self.conn = conn
self.indexes = []
self._load()
def _load(self):
logging.debug('loading %s' %(self.filename))
curs = self.conn.cursor()
fh = open(self.filename)
reader = csv.reader(fh)
self.info = reader.next()
self.headers = reader.next()
columns = ', '.join(['c'+str(x) for x in range(len(self.headers))])
columnTypes = ' TEXT, '.join(['c'+str(x) for x in range(len(self.headers))]) + ' TEXT'
try:
curs.execute('CREATE TABLE %s(%s)' %(self.table, columnTypes))
except sqlite3.OperationalError as e:
logging.debug('%s -- using existing table' %(e))
else:
curs.executemany('INSERT INTO %s (%s) VALUES(%s)' %(
self.table, columns,
'?, ' * (len(self.headers) -1) + '?'
), reader)
self.conn.commit()
curs.close()
def _column(self, key):
if key.lower() not in [x.lower() for x in self.headers]:
raise IndexError('%s not in %s'%(key, self.table))
return 'c' + str([x.lower() for x in self.headers].index(key.lower()))
def create_index(self, col):
col = self._column(col)
icol = 'i' + col
if icol not in self.indexes:
logging.debug('adding index %s to %s(%s)' %(icol, self.table, col))
curs = self.conn.cursor()
curs.execute('CREATE INDEX IF NOT EXISTS %s ON %s(%s)' %(icol, self.table, col))
curs.close()
self.indexes.append(icol)
def __getitem__(self, key):
curs = self.conn.cursor()
res = list(curs.execute('SELECT * FROM %s WHERE ROWID = %s' %(self.table, key+1)).fetchall()[0])
curs.close()
return res
def __iter__(self):
curs = self.conn.cursor()
self.__iter = curs.execute('SELECT * FROM %s' %(self.table))
curs.close()
return self
def next(self):
return self.__iter.next()
def __len__(self):
curs = self.conn.cursor()
ret = curs.execute('SELECT COUNT(*) FROM %s' %(self.table)).fetchall()[0][0]
curs.close()
return ret
def get(self, column, value):
'''get rows where column matches value'''
curs = self.conn.cursor()
column = self._column(column)
ret = curs.execute('SELECT * FROM %s WHERE %s = "%s"' %(self.table, column, value)).fetchall()
curs.close()
return ret
def sum(self, col, filter = {}):
curs = self.conn.cursor()
_where = []
for k,v in filter.iteritems():
_where.append(' %s = "%s" ' %(self._column(k),v) )
ret = Decimal(str(curs.execute('SELECT SUM(%s) FROM %s %s' %(
self._column(col),
self.table,
' WHERE ' + ' AND '.join(_where) if _where else ''
)).fetchall()[0][0]))
curs.close()
return ret