Saturday, February 21

Load ABS files to MySQL

When I am working in R, I tend to have my working data in a MySQL database. I found that R did not always play nicely (and quickly) with complex Microsoft Excel files.

Previously, I had quite a complex bit of python code to read Excel files and upload them to MySQL. I have now retooled the way in which I load files from the Australian Bureau of Statistics (ABS) to MySQL using Python pandas. The code is much simpler.

First, I store my MySQL database username and password in a file MysqlConnect.py (it is used by a number of different programs). It lives in the bin directory (../bin from where I do my work). And, just in case you are wondering: no, it is not my password.

host     = 'localhost'
user     = 'root'
password = 'BigRedCar'
database = 'dbase1'

Now let's move on to the function to load ABS files into MySQL. It lives in the bin directory (../bin from where I do my work), in a file named LoadABSToMySQL.py

import pandas as pd
import pymysql
from sqlalchemy import create_engine
import os.path
import re

# local imports - a file that contains database login details
import MysqlConnect as MSC

def LoadABSToMySQL(pathName):
    """ Read an Excel file from the Australian Bureau of Statistics
        and load it into a MySQL database"""

    # --- 1 --- open MySQL
    s = 'mysql+pymysql://'+MSC.user+':'+MSC.password+'@'+MSC.host+'/'+MSC.database
    engine = create_engine(s)

    # --- 2 --- identify proposed table name from file name
    (head,tail) = os.path.split(pathName)
    tail = re.split('\.', tail)
    tablename = tail[0]

    # --- 3 --- open the XL file
    wb = pd.ExcelFile(pathName)

    # --- 4 --- load XL workbooks into a single DataFrame
    df = pd.DataFrame()
    for name in wb.sheet_names:

        # -- ignore junk
        if not 'Data' in name:
            continue

        # -- read
        tmp = wb.parse(sheetname=name, header=9, index_col=0, na_values=['', '-', ' '])

        # -- amalgamate
        df = pd.merge(left=df, right=tmp, how='outer', left_index=True, right_index=True)
        tmp = None

    # --- 5 --- write this DataFrame to MySQL
    df.to_sql(tablename, engine, index=True, if_exists='replace')

Finally, an example code snippet to load some of the ABS National Account files to MySQL. This files sits in my national accounts directory and has the rather unimaginative name py-load.py. The ABS Microsoft Excel files live in the ./raw-data sub-directory.

import sys
sys.path.append( '../bin' )

from LoadABSToMySQL import LoadABSToMySQL

dataDirectory = './raw-data/'
dataFiles = [
    '5206001_key_aggregates',
    '5206002_expenditure_volume_measures',
    '5206003_expenditure_current_price',
    '5206004_expenditure_price_indexes',
    '5206006_industry_gva',
    '5206008_household_final_consumption_expenditure',
    '5206022_taxes',
    '5206023_social_assistance_benefits',
    '5206024_selected_analytical_series'
]
dataSuffix = '.xls'

for f in dataFiles :
    LoadABSToMySQL(dataDirectory + f + dataSuffix)

To run this python load file, I have a BASH shell script, which I use on my iMac. It has another unimaginative name: run-load.sh.

#!/bin/bash

# mac os x fix ...
cd "$(dirname "$0")"

python ./py-load.py

Friday, February 13

Wednesday, February 11

An Extra Dry Baltic Index

The Baltic Dry Index provides a window on world trade. The view out the window ain't that good at the moment.



The Baltic Dry Index was at 663 on 5 December 2008 (its post-GFC low point).

It was at 554 on 9 February 2015. But no need to worry, it rebounded back up to 556 on 10 February.

Sunday, December 28

Updated cheat sheets: python, pandas and matplotlib

I have updated my cheat sheets for python, pandas and matplotlib.

You can find them here: bit.ly/python_cs

Sunday, October 5

Sunday, August 3

Pandas 0.14.1

I have just upgraded to Pandas 0.14.1.

It was a pain. At first, none of my graphics programs worked. It looks like a change to the API for parsing Microsoft Excel files was the problem. I am not sure whether my previous approach was wrong (but worked serendipitously), or the API was deliberately changed to break old code (an unusual practice for a point release). If someone knows, I'd appreciate something in the comments below.

What follows are the classes I use to upload Australian Bureau of Statistics (ABS) and Reserve Bank of Australia (RBA) data, with the updates to the parsing stage commented.

And yes, I use Python 2.7, not Python 3 (it's what comes with the Apple Mac).

### ABSExcelLoader.py
### written in python 2.7 and pandas 0.14.1

import pandas as pd
assert( pd.__version__ >= '0.14.1' )

class ABSExcelLoader:

    def load(self, pathName, freq='M', index_name=None, verbose=False):
        """return a dictionary of pandas DataFrame objects for
           each Data work sheet in an ABS Excel spreadsheet"""

        wb = pd.ExcelFile(pathName)
        returnDict = {}

        for name in wb.sheet_names:
            if not 'Data' in name:
                continue

            # ExcelFile.parse: API behaviour change with pandas 14.1
            #df = wb.parse(sheetname=name, skiprows=8, header=9, index_col=0, na_values=['', '-', ' '])
            df = wb.parse(sheetname=name, skiprows=9, header=0, index_col=0, na_values=['', '-', ' '])

            periods = pd.PeriodIndex(pd.Series(df.index), freq=freq)
            df.set_index(keys=periods, inplace=True)
            df.index.name = index_name
            returnDict[name] = df

            if verbose:
                print ("\nFile: '{}', sheet: '{}'".format(pathName, name))
                print (df.iloc[:min(5, len(df)), :min(5, len(df.columns))])

        return returnDict

### ABSExcelLoader.py
### written in python 2.7 and pandas 0.14.1

import pandas as pd
assert( pd.__version__ >= '0.14.1' )

class RBAExcelLoader:

    def load(self, pathName, freq='M', index_name=None, verbose=False):
        """return a pandas DataFrame for an RBA Excel spreadsheet"""
        wb = pd.ExcelFile(pathName)
        sheetname = 'Data'

        # ExcelFile.parse: API behaviour change with pandas 14.1
        #df = wb.parse(sheetname, skiprows=9, header=10, index_col=0, na_values=['', '-', ' '])
        df = wb.parse(sheetname, skiprows=10, header=0, index_col=0, na_values=['', '-', ' '])

        periods = pd.PeriodIndex(pd.Series(df.index), freq=freq)
        df.set_index(keys=periods, inplace=True)

        if verbose:
            print "\nFile: '{}', sheet: '{}'".format(pathName, sheetname)
            print 'Columns: {}'.format(df.columns.tolist())
            print 'Top left hand corner ...'
            print '------------------------'
            print df.iloc[:min(5, len(df)), :min(5, len(df.columns))]

        return df

Tuesday, July 15

Seasonal adjustment and decomposition

Having moved from R to python/pandas, one of the things I really miss is the extensive statistics library available in R.

I particularly like the STL function: the seasonal decomposition of a time series by Loess (localised regression). The function allows you to decompose a time series into its seasonal, trend and irregular components using loess. Unfortunately, I could find nothing like it in pandas.

In frustration, I have cobbled together a python/pandas function to decompose a univariate pandas time series item into seasonal, trend and irregular components. The function does not use Loess. Rather it uses a set of processes similar to those used by the Australian Bureau of Statistics.

It also uses a Henderson Moving Average which I had coded previously (see here).

Anyway, I'd be interested to know whether this code works for you. Drop me a line if you end up using it.

Caution: still experimental code.

## Decompose.py
## produce a time series decomposition

import pandas as pd
import numpy as np
from Henderson import Henderson


# --- A selection of seasonal smoothing weights, from which you can select
#     Note: these are end-weights, they are reversed for the start of a series
#     Note: your own weights in this form should also work
s3x3 = (
    np.array([ 5, 11, 11]) / 27.0,
    np.array([ 3,  7, 10,  7]) / 27.0,
    np.array([ 1,  2,  3,  2,  1]) / 9.0,
)

s3x5 = (
    np.array([ 9, 17, 17, 17]) / 60.0,
    np.array([ 4, 11, 15, 15, 15]) / 60.0,
    np.array([ 4,  8, 13, 13, 13,  9]) / 60.0,
    np.array([ 1,  2,  3,  3,  3,  2,  1]) / 15.0,
)

s3x9 = (
    np.array([0.051, 0.112, 0.173, 0.197, 0.221, 0.246]),
    np.array([0.028, 0.092, 0.144, 0.160, 0.176, 0.192, 0.208]),
    np.array([0.032, 0.079, 0.123, 0.133, 0.143, 0.154, 0.163, 0.173]),
    np.array([0.034, 0.075, 0.113, 0.117, 0.123, 0.128, 0.132, 0.137, 0.141]),
    np.array([0.034, 0.073, 0.111, 0.113, 0.114, 0.116, 0.117, 0.118, 0.120, 0.084]),
    np.array([1,     2,     3,     3,     3,    3,    3,    3,    3,    2,    1]) / 27.0,
)


# --- public Decomposition function
def Decompose(s, periods=None, model='multiplicative', 
    constantSeasonal=False, seasonalSmoother=s3x5):

    """ The simple decomposition of a pandas Series s into its trend, seasonal 
        and irregular components. The default is a multiplicative model: 
        Original(t) = Trend(t) * Seasonal(t) * Irregular(t). Can specify an 
        additive model: Original(t) = Trend(t) + Seasonal(t) + Irregular(t)

        Parameters:
        -   s - the pandas Series, without any missing or NA values, 
            and sorted in ascending order
        -   periods - either a pandas Series indicating the period to 
            which each value of s belongs (of the same length as s, 
            with the same index as s), or an int for the number of periods
            into which to decompose the series
        -   model - string - either 'multiplicative' or 'additive'
        -   constantSeasonal - bool - whether the seasonal component is 
            constant or (slowly) variable
        -   seasonalSmoother - when not using a constantSeasonal, which 
            of the seasonal smoothers to use (s3x3, s3x5 or s3x9) - 
            default is s3x5 (ie over 7 years for monthly or quarterly data)

        Returns a pandas DataFrame with columns for each step in the 
        decomposition process (enables debugging). The key columns in the 
        DataFrame are:
        -   'Original' - the original series
        -   'SeasAdj' - the seasonally adjusted series
        -   'Trend' - the trend of the seasonally adjusted series
        -   'Seasonal' - the seasonal component found through the 
            decomposition process
        -   'Irregular' - the irregular component found through the 
            decomposition process

        Notes:
        Based on ideas gleaned from the Australian Bureau of Statistics:
            ABS (2005), "An Introductory Course on Times Series
            Analysis -- Electronic Delivery", Catalogue: 1346,0.55.001, online at:
            http://www.ausstats.abs.gov.au/ausstats/free.nsf/0/C2714BAD1DD803E6CA256F960072B0C0/$File/1346055001_jan%202005.pdf
        Does not adjust for moving holidays, public holidays, variable number 
        of working days in month, etc. (ie. it is quite a simple decomposition)"""

    ### --- Sanity checks and initialisation --- ###

    # --- sanity checks
    if periods is None:
        raise ValueError('The periods parameter is an integer or a Series of integers')
    if not isinstance(s, pd.core.frame.Series):
        raise TypeError('The s parameter should be a pandas Series')
    if not(s.index.is_monotonic and s.index.is_unique):
        raise ValueError('The index for the s parameter should be unique and sorted')
    if any(s.isnull()) or not all(np.isfinite(s)):
        raise ValueError('The s parameter contains NA or infinite values')

    # --- initialise
    result = pd.DataFrame(s)
    result.columns = ['Original']

    # --- determine the period
    if isinstance(periods, pd.core.frame.Series):
        if not (len(s) == len(periods) and all(s.index == periods.index)) :
            raise ValueError('The s and periods parameters must have the same index')
        result['period'] = periods
        periods = len(periods.unique())
    else:
        periods = int(periods)
        result['period'] = pd.Series(range(len(result)), index=s.index) % periods
    if periods < 2:
        raise ValueError('The periods parameter should be >= 2')
    if len(s) < (periods * 2) + 1:
        raise ValueError('The s parameter is not long enough to decompose')

    # --- settle the length of the Henderson moving average
    h = max(periods, 7) # ABS uses 13-term HMA for monthly and 7-term for quarterly
    if h % 2 == 0 :
        h += 1 # we need an odd number

    ### --- On to the decomposition process --- ###

    # --- 1 - derive an initial estimate for the trend component
    result['1stTrendEst'] = pd.rolling_mean(s, window=periods+1, 
        min_periods=periods+1, center=True)
    # Note: rolling mean leaves NA values at the start/end of the trend estimate.

    # --- 2 - preliminary estimate of the seasonal component
    if model == 'multiplicative':
        result['1stSeasonalEst'] = result['Original'] / result['1stTrendEst']
    else:
        result['1stSeasonalEst'] = result['Original'] - result['1stTrendEst']

    # --- 3 - smooth the seasonal
    result = _smoothSeasonalComponent(result, periods=periods, 
        constantSeasonal=constantSeasonal, seasonalSmoother=seasonalSmoother, 
        columnToBeSmoothed='1stSeasonalEst', newColumn='2ndSeasonalEst')

    # --- 4 - extend the smoothed seasonal estimate to full scale
    if any(result['2ndSeasonalEst'].isnull()) :
        result = _extendSeries(result, periods=periods,
            columnToBeExtended='2ndSeasonalEst', newColumn='3rdSeasonalEst')
    else:
        result['3rdSeasonalEst'] = result['2ndSeasonalEst']

    # --- 5 - preliminary estimate of the seasonally adjusted data
    if model == 'multiplicative':
        result['1stSeasAdjEst'] = result['Original'] / result['3rdSeasonalEst']
    else:
        result['1stSeasAdjEst'] = result['Original'] - result['3rdSeasonalEst']

    # --- 6 - a better estimate of the trend
    result['2ndTrendEst'] =  Henderson(result['1stSeasAdjEst'], h)

    # --- 7 - final estimate of the seasonal component
    if model == 'multiplicative':
        result['4thSeasonalEst'] = result['Original'] / result['2ndTrendEst']
    else:
        result['4thSeasonalEst'] = result['Original'] - result['2ndTrendEst']

    result = _smoothSeasonalComponent(result, periods=periods, 
        constantSeasonal=constantSeasonal, seasonalSmoother=seasonalSmoother, 
        columnToBeSmoothed='4thSeasonalEst', newColumn='Seasonal')

    # --- 8 - final estimate of the seasonally adjusted series
    if model == 'multiplicative':
        result['SeasAdj'] = result['Original'] / result['Seasonal']
    else:
        result['SeasAdj'] = result['Original'] - result['Seasonal']

    # --- 9 - final trend estimate
    result['Trend'] =  Henderson(result['SeasAdj'], h)

    # --- 10 - final irregular
    if model == 'multiplicative':
        result['Irregular'] = result['SeasAdj'] / result['Trend']
    else:
        result['Irregular'] = result['SeasAdj'] - result['Trend']

    # --- 11 - our job here is done
    return (result)


# --- apply seasonal smoother
def _smoothSeasonalComponent(result, periods, constantSeasonal, seasonalSmoother,
    columnToBeSmoothed, newColumn):

    # get the key smoothing constants
    if not constantSeasonal:
        kS = len(seasonalSmoother)
        lenS = (len(seasonalSmoother) * 2) -1
        centralS = seasonalSmoother[len(seasonalSmoother)-1]

    # establish an empty return column ...
    result[newColumn] = np.repeat(np.nan, len(result))

    # populate the return column ...
    for u in result['period'].unique() :

        # get each of of the seasonals
        thisSeason = result[result['period'] == u][columnToBeSmoothed]

        # smooth to a constant seasonal value
        if constantSeasonal:
            thisSeasonSmoothed = pd.Series(np.repeat(thisSeason.mean(skipna=True),
                len(thisSeason)), index=thisSeason.index)

        # smooth to a slowly changing seasonal value
        else:
            # drop NA values which result from step 1 in the decomp process
            thisSeason = thisSeason.dropna()

            # apply the seasonalSmoother
            thisSeasonSmoothed = pd.rolling_apply(thisSeason, window=lenS,
                func=lambda x: (x * centralS).sum(), min_periods=lenS, center=True)

            # for short series the above process results in no data ... find a simple mean
            if all(thisSeasonSmoothed.isnull()) :
                # same treatment as constant seasonal value above
                thisSeasonSmoothed = pd.Series(np.repeat(thisSeason.mean(skipna=True),
                    len(thisSeason)), index=thisSeason.index)

            # handle the end-point problem ...
            for i in range(kS-1) :
                if np.isnan(thisSeasonSmoothed.iat[i]) :
                    thisSeasonSmoothed.iat[i] = (thisSeason.iloc[0: i+kS] *
                        (seasonalSmoother[i][::-1])).sum() # note: reversed order at start

            for i in range(len(thisSeason)-1, len(thisSeason)-kS, -1) :
                if np.isnan(thisSeasonSmoothed.iat[i]) :
                    thisSeasonSmoothed.iat[i] = (
                        thisSeason.iloc[i-(kS-1):len(thisSeason)] *
                        seasonalSmoother[len(thisSeason)-1-i]).sum()

        # package up season by season ...
        result[newColumn] = result[newColumn].where(result[newColumn].notnull(), 
            other=thisSeasonSmoothed)

    return (result)


# --- extend seasonal components to the full length of series 
def _extendSeries(result, periods, columnToBeExtended, newColumn):

    result[newColumn] = result[columnToBeExtended].copy()

    def fillup(result, fill, startPoint, endPoint):
        i = startPoint
        while True:
            p = result.index[i]
            result[newColumn].iat[i] = fill[newColumn].at[result['period'].iat[i]]
            if p >= endPoint:
                break
            i += 1

    # back-cast
    if np.isnan(result.iat[0, result.columns.get_loc(newColumn)]):
        fill = pd.DataFrame(result[newColumn].dropna().iloc[0:periods])
        fill['period'] = result['period'][fill.index[0]:fill.index[len(fill)-1]]
        endPoint = fill.index[0] - 1
        fill.index = fill['period']
        fillup(result=result, fill=fill, startPoint=0, endPoint=endPoint)

    # forward-cast
    if np.isnan(result.iat[len(result)-1, result.columns.get_loc(newColumn)]):
        fill = result[newColumn].dropna()
        fill = pd.DataFrame(fill[len(fill)-periods:len(fill)])
        fill['period'] = result['period'][fill.index[0]:fill.index[len(fill)-1]]
        startPoint = result.index.get_loc(fill.index[len(fill)-1] + 1)
        fill.index = fill['period']
        endPoint = result.index[len(result)-1]
        fillup(result=result, fill=fill, startPoint=startPoint, endPoint=endPoint)

    return (result)