Exercise 3.3 - Solution

(a) Raising Exceptions

# fileparse.py
import csv

def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=','):
    '''
    Parse a CSV file into a list of records with type conversion.
    '''
    if select and not has_headers:
        raise RuntimeError('select requires column headers')

    f = open(filename)
    f_csv = csv.reader(f, delimiter=delimiter)

    # Read the file headers (if any)
    headers = next(f_csv) if has_headers else []

    # If specific columns have been selected, make indices for filtering and set output columns
    if select:
        indices = [ headers.index(colname) for colname in select ]
        output_columns = select
    else:
        indices = []
        output_columns = headers

    records = []
    for row in f_csv:
        if not row:     # Skip rows with no data
            continue

        # If specific column indices are selected, pick them out
        if indices:
            row = [ row[index] for index in indices]

        # Apply type conversion to the row
        if types:
            row = [func(val) for func, val in zip(types, row)]

        # Make a dictionary or a tuple
        if output_columns:
            record = dict(zip(output_columns, row))
        else:
            record = tuple(row)
        records.append(record)

    f.close()
    return records

(b) Catching Exceptions

# fileparse.py
import csv

def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=','):
    '''
    Parse a CSV file into a list of records with type conversion.
    '''
    if select and not has_headers:
        raise RuntimeError('select requires column headers')

    f = open(filename)
    f_csv = csv.reader(f, delimiter=delimiter)

    # Read the file headers (if any)
    headers = next(f_csv) if has_headers else []

    # If specific columns have been selected, make indices for filtering and set output columns
    if select:
        indices = [ headers.index(colname) for colname in select ]
        output_columns = select
    else:
        indices = []
        output_columns = headers

    records = []
    for rowno, row in enumerate(f_csv, 1):
        if not row:     # Skip rows with no data
            continue

        # If specific column indices are selected, pick them out
        if indices:
            row = [ row[index] for index in indices]

        # Apply type conversion to the row
        if types:
            try:
                row = [func(val) for func, val in zip(types, row)]
            except ValueError as e:
                print "Row %d: Couldn't convert %s" % (rowno, row)
                print "Row %d: Reason %s" % (rowno, e)
                continue

        # Make a dictionary or a tuple
        if output_columns:
            record = dict(zip(output_columns, row))
        else:
            record = tuple(row)
        records.append(record)

    f.close()
    return records

(c) Silencing Errors

# fileparse.py
import csv

def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=',', silence_errors=False):
    '''
    Parse a CSV file into a list of records with type conversion.
    '''
    if select and not has_headers:
        raise RuntimeError('select requires column headers')

    f = open(filename)
    f_csv = csv.reader(f, delimiter=delimiter)

    # Read the file headers (if any)
    headers = next(f_csv) if has_headers else []

    # If specific columns have been selected, make indices for filtering and set output columns
    if select:
        indices = [ headers.index(colname) for colname in select ]
        output_columns = select
    else:
        indices = []
        output_columns = headers

    records = []
    for rowno, row in enumerate(f_csv, 1):
        if not row:     # Skip rows with no data
            continue

        # If specific column indices are selected, pick them out
        if indices:
            row = [ row[index] for index in indices]

        # Apply type conversion to the row
        if types:
            try:
                row = [func(val) for func, val in zip(types, row)]
            except ValueError as e:
                if not silence_errors:
                    print "Row %d: Couldn't convert %s" % (rowno, row)
                    print "Row %d: Reason %s" % (rowno, e)
                continue

        # Make a dictionary or a tuple
        if output_columns:
            record = dict(zip(output_columns, row))
        else:
            record = tuple(row)
        records.append(record)

    f.close()
    return records

[ Back ]