{"id":543,"date":"2011-11-03T19:43:06","date_gmt":"2011-11-03T19:43:06","guid":{"rendered":"http:\/\/tech.avant.net\/q\/?p=543"},"modified":"2012-12-25T22:38:42","modified_gmt":"2012-12-25T22:38:42","slug":"543","status":"publish","type":"post","link":"https:\/\/tech.avant.net\/q\/543\/","title":{"rendered":"python, analyzing csv files, part 2"},"content":{"rendered":"<p>Previously, we discussed <a href=\"\/q\/2011\/11\/python-analyzing-csv-files-part-1\/\">analyzing CSV files<\/a>, parsing the csv into a native python object that supports iteration while providing easy access to the data (such as a sum by column header).<\/p>\n<p>For very large files this can be cumbersome, especially where more advanced analytics are desired.<\/p>\n<p>I would like to keep the same simple interface but use an in-memory database connection, thus transforming the CSV files into database tables for deeper analysis.<\/p>\n<p>For example, I would like to do the following (leveraging the builtin sqlite3 module):<\/p>\n<pre class=\"sh_python\">\r\n>>> \r\n>>> reports = Reports('\/home\/user\/reports-1109')\r\n>>> reports.billing_detail.sum('Tax Amount', {'Fiscal Period':'2011-09'})\r\nDecimal('123321.1')\r\n>>> \r\n>>> reports.tax_summary.sum('Amount', {'Fiscal Period':'2011-09'})\r\nDecimal('123321.1')\r\n>>> \r\n>>> len(reports.billing_detail)\r\n719153\r\n>>> \r\n>>> curs = reports.conn.cursor()\r\n>>> curs.execute('SELECT name FROM sqlite_master WHERE type=\"table\"').fetchall()\r\n[(u'billing_detail',), (u'billing_summary',)]\r\n>>> \r\n<\/pre>\n<p>This approach can be orders of magnitude faster for even the most basic analysis. Furthermore, this allows OLAP cube analysis of the data from the CSV files, e.g.,<\/p>\n<pre class=\"sh_python\">\r\n>>> \r\n>>> curs.execute('CREATE TABLE t_fact(id TEXT UNIQUE, b INT, t INT, r INT)').fetchall()\r\n[]\r\n>>> curs.execute('CREATE INDEX IF NOT EXISTS idxt ON t_fact(id)').fetchall()\r\n[]\r\n>>> \r\n>>> ## load some data into the fact table\r\n>>> curs.execute('''INSERT OR REPLACE INTO t_fact(id,b,t,r)\r\n                SELECT bd.%(id)s as id, bd.ROWID as b, ts.ROWID as t, rf.ROWID as r\r\n                FROM billing_detail bd\r\n                LEFT OUTER JOIN tax_summary ts ON bd.%(id)s = ts.%(tax_id)s\r\n                LEFT OUTER JOIN refunds r ON bd.%(id)s = rf.%(ref_id)s\r\n                ''' % query_dict).fetchall()\r\n[]\r\n>>> \r\n>>> ## e.g., find billing records without tax summaries\r\n>>> billings_without_tax = curs.execute('SELECT id FROM t_fact WHERE t IS NULL').fetchall()\r\n>>> \r\n<\/pre>\n<p>Using the same Report and Reports objects discussed previously, the code can be modified to leverage a database connection to support this type of analytics:<\/p>\n<pre class=\"sh_python\">\r\nclass Report(collections.Mapping):\r\n    def __init__(self, filehint, table = None, conn = sqlite3.connect(':memory:')):\r\n        self.filename = Reports.find_report(filehint)\r\n        self.info = []\r\n        self.headers = []\r\n        self.table = table\r\n        self.conn = conn\r\n        self.indexes = []\r\n        self._load()\r\n\r\n    def _load(self):\r\n        logging.debug('loading %s' %(self.filename))\r\n        curs = self.conn.cursor()\r\n        fh = open(self.filename)\r\n        reader = csv.reader(fh)\r\n        self.info = reader.next()\r\n        self.headers = reader.next()\r\n        columns = ', '.join(['c'+str(x) for x in range(len(self.headers))])\r\n        columnTypes = ' TEXT, '.join(['c'+str(x) for x in range(len(self.headers))]) + ' TEXT'\r\n        try:\r\n            curs.execute('CREATE TABLE %s(%s)' %(self.table, columnTypes))\r\n        except sqlite3.OperationalError as e:\r\n            logging.debug('%s -- using existing table' %(e))\r\n        else:\r\n            curs.executemany('INSERT INTO %s (%s) VALUES(%s)' %(\r\n                self.table, columns,\r\n                '?, ' * (len(self.headers) -1) + '?'\r\n            ), reader)\r\n            self.conn.commit()\r\n        curs.close()\r\n\r\n    def _column(self, key):\r\n        if key.lower() not in [x.lower() for x in self.headers]:\r\n            raise IndexError('%s not in %s'%(key, self.table))\r\n        return 'c' + str([x.lower() for x in self.headers].index(key.lower()))\r\n\r\n    def create_index(self, col):\r\n        col = self._column(col)\r\n        icol = 'i' + col\r\n        if icol not in self.indexes:\r\n            logging.debug('adding index %s to %s(%s)' %(icol, self.table, col))\r\n            curs = self.conn.cursor()\r\n            curs.execute('CREATE INDEX IF NOT EXISTS %s ON %s(%s)' %(icol, self.table, col))\r\n            curs.close()\r\n            self.indexes.append(icol)\r\n\r\n    def __getitem__(self, key):\r\n        curs = self.conn.cursor()\r\n        res = list(curs.execute('SELECT * FROM %s WHERE ROWID = %s' %(self.table, key+1)).fetchall()[0])\r\n        curs.close()\r\n        return res\r\n\r\n    def __iter__(self):\r\n        curs = self.conn.cursor()\r\n        self.__iter = curs.execute('SELECT * FROM %s' %(self.table))\r\n        curs.close()\r\n        return self\r\n\r\n    def next(self):\r\n        return self.__iter.next()\r\n\r\n    def __len__(self):\r\n        curs = self.conn.cursor()\r\n        ret = curs.execute('SELECT COUNT(*) FROM %s' %(self.table)).fetchall()[0][0]\r\n        curs.close()\r\n        return ret\r\n\r\n    def get(self, column, value):\r\n        '''get rows where column matches value'''\r\n        curs = self.conn.cursor()\r\n        column = self._column(column)\r\n        ret = curs.execute('SELECT * FROM %s WHERE %s = \"%s\"' %(self.table, column, value)).fetchall()\r\n        curs.close()\r\n        return ret\r\n\r\n    def sum(self, col, filter = {}):\r\n        curs = self.conn.cursor()\r\n        _where = []\r\n        for k,v in filter.iteritems():\r\n            _where.append(' %s = \"%s\" ' %(self._column(k),v) )\r\n        ret = Decimal(str(curs.execute('SELECT SUM(%s) FROM %s %s' %(\r\n            self._column(col),\r\n            self.table,\r\n            ' WHERE ' + ' AND '.join(_where) if _where else ''\r\n            )).fetchall()[0][0]))\r\n        curs.close()\r\n        return ret\r\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Previously, we discussed analyzing CSV files, parsing the csv into a native python object that supports iteration while providing easy access to the data (such as a sum by column header). For very large files this can be cumbersome, especially where more advanced analytics are desired. I would like to keep the same simple interface [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[16,6],"tags":[],"_links":{"self":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts\/543"}],"collection":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/comments?post=543"}],"version-history":[{"count":5,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts\/543\/revisions"}],"predecessor-version":[{"id":707,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts\/543\/revisions\/707"}],"wp:attachment":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/media?parent=543"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/categories?post=543"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/tags?post=543"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}