Previously, we discussed analyzing CSV files, parsing the csv into a native python object that supports iteration while providing easy access to the data (such as a sum by column header).
For very large files this can be cumbersome, especially where more advanced analytics are desired.
I would like to keep the same simple interface but use an in-memory database connection, thus transforming the CSV files into database tables for deeper analysis.
For example, I would like to do the following (leveraging the builtin sqlite3 module):
>>> >>> reports = Reports('/home/user/reports-1109') >>> reports.billing_detail.sum('Tax Amount', {'Fiscal Period':'2011-09'}) Decimal('123321.1') >>> >>> reports.tax_summary.sum('Amount', {'Fiscal Period':'2011-09'}) Decimal('123321.1') >>> >>> len(reports.billing_detail) 719153 >>> >>> curs = reports.conn.cursor() >>> curs.execute('SELECT name FROM sqlite_master WHERE type="table"').fetchall() [(u'billing_detail',), (u'billing_summary',)] >>>
This approach can be orders of magnitude faster for even the most basic analysis. Furthermore, this allows OLAP cube analysis of the data from the CSV files, e.g.,
>>> >>> curs.execute('CREATE TABLE t_fact(id TEXT UNIQUE, b INT, t INT, r INT)').fetchall() [] >>> curs.execute('CREATE INDEX IF NOT EXISTS idxt ON t_fact(id)').fetchall() [] >>> >>> ## load some data into the fact table >>> curs.execute('''INSERT OR REPLACE INTO t_fact(id,b,t,r) SELECT bd.%(id)s as id, bd.ROWID as b, ts.ROWID as t, rf.ROWID as r FROM billing_detail bd LEFT OUTER JOIN tax_summary ts ON bd.%(id)s = ts.%(tax_id)s LEFT OUTER JOIN refunds r ON bd.%(id)s = rf.%(ref_id)s ''' % query_dict).fetchall() [] >>> >>> ## e.g., find billing records without tax summaries >>> billings_without_tax = curs.execute('SELECT id FROM t_fact WHERE t IS NULL').fetchall() >>>
Using the same Report and Reports objects discussed previously, the code can be modified to leverage a database connection to support this type of analytics:
class Report(collections.Mapping): def __init__(self, filehint, table = None, conn = sqlite3.connect(':memory:')): self.filename = Reports.find_report(filehint) self.info = [] self.headers = [] self.table = table self.conn = conn self.indexes = [] self._load() def _load(self): logging.debug('loading %s' %(self.filename)) curs = self.conn.cursor() fh = open(self.filename) reader = csv.reader(fh) self.info = reader.next() self.headers = reader.next() columns = ', '.join(['c'+str(x) for x in range(len(self.headers))]) columnTypes = ' TEXT, '.join(['c'+str(x) for x in range(len(self.headers))]) + ' TEXT' try: curs.execute('CREATE TABLE %s(%s)' %(self.table, columnTypes)) except sqlite3.OperationalError as e: logging.debug('%s -- using existing table' %(e)) else: curs.executemany('INSERT INTO %s (%s) VALUES(%s)' %( self.table, columns, '?, ' * (len(self.headers) -1) + '?' ), reader) self.conn.commit() curs.close() def _column(self, key): if key.lower() not in [x.lower() for x in self.headers]: raise IndexError('%s not in %s'%(key, self.table)) return 'c' + str([x.lower() for x in self.headers].index(key.lower())) def create_index(self, col): col = self._column(col) icol = 'i' + col if icol not in self.indexes: logging.debug('adding index %s to %s(%s)' %(icol, self.table, col)) curs = self.conn.cursor() curs.execute('CREATE INDEX IF NOT EXISTS %s ON %s(%s)' %(icol, self.table, col)) curs.close() self.indexes.append(icol) def __getitem__(self, key): curs = self.conn.cursor() res = list(curs.execute('SELECT * FROM %s WHERE ROWID = %s' %(self.table, key+1)).fetchall()[0]) curs.close() return res def __iter__(self): curs = self.conn.cursor() self.__iter = curs.execute('SELECT * FROM %s' %(self.table)) curs.close() return self def next(self): return self.__iter.next() def __len__(self): curs = self.conn.cursor() ret = curs.execute('SELECT COUNT(*) FROM %s' %(self.table)).fetchall()[0][0] curs.close() return ret def get(self, column, value): '''get rows where column matches value''' curs = self.conn.cursor() column = self._column(column) ret = curs.execute('SELECT * FROM %s WHERE %s = "%s"' %(self.table, column, value)).fetchall() curs.close() return ret def sum(self, col, filter = {}): curs = self.conn.cursor() _where = [] for k,v in filter.iteritems(): _where.append(' %s = "%s" ' %(self._column(k),v) ) ret = Decimal(str(curs.execute('SELECT SUM(%s) FROM %s %s' %( self._column(col), self.table, ' WHERE ' + ' AND '.join(_where) if _where else '' )).fetchall()[0][0])) curs.close() return ret