Exercise 3.3 - Solution
(a) Raising Exceptions
# fileparse.py
import csv
def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=','):
'''
Parse a CSV file into a list of records with type conversion.
'''
if select and not has_headers:
raise RuntimeError('select requires column headers')
f = open(filename)
f_csv = csv.reader(f, delimiter=delimiter)
# Read the file headers (if any)
headers = next(f_csv) if has_headers else []
# If specific columns have been selected, make indices for filtering and set output columns
if select:
indices = [ headers.index(colname) for colname in select ]
output_columns = select
else:
indices = []
output_columns = headers
records = []
for row in f_csv:
if not row: # Skip rows with no data
continue
# If specific column indices are selected, pick them out
if indices:
row = [ row[index] for index in indices]
# Apply type conversion to the row
if types:
row = [func(val) for func, val in zip(types, row)]
# Make a dictionary or a tuple
if output_columns:
record = dict(zip(output_columns, row))
else:
record = tuple(row)
records.append(record)
f.close()
return records
(b) Catching Exceptions
# fileparse.py
import csv
def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=','):
'''
Parse a CSV file into a list of records with type conversion.
'''
if select and not has_headers:
raise RuntimeError('select requires column headers')
f = open(filename)
f_csv = csv.reader(f, delimiter=delimiter)
# Read the file headers (if any)
headers = next(f_csv) if has_headers else []
# If specific columns have been selected, make indices for filtering and set output columns
if select:
indices = [ headers.index(colname) for colname in select ]
output_columns = select
else:
indices = []
output_columns = headers
records = []
for rowno, row in enumerate(f_csv, 1):
if not row: # Skip rows with no data
continue
# If specific column indices are selected, pick them out
if indices:
row = [ row[index] for index in indices]
# Apply type conversion to the row
if types:
try:
row = [func(val) for func, val in zip(types, row)]
except ValueError as e:
print "Row %d: Couldn't convert %s" % (rowno, row)
print "Row %d: Reason %s" % (rowno, e)
continue
# Make a dictionary or a tuple
if output_columns:
record = dict(zip(output_columns, row))
else:
record = tuple(row)
records.append(record)
f.close()
return records
(c) Silencing Errors
# fileparse.py
import csv
def parse_csv(filename, select=None, types=None, has_headers=True, delimiter=',', silence_errors=False):
'''
Parse a CSV file into a list of records with type conversion.
'''
if select and not has_headers:
raise RuntimeError('select requires column headers')
f = open(filename)
f_csv = csv.reader(f, delimiter=delimiter)
# Read the file headers (if any)
headers = next(f_csv) if has_headers else []
# If specific columns have been selected, make indices for filtering and set output columns
if select:
indices = [ headers.index(colname) for colname in select ]
output_columns = select
else:
indices = []
output_columns = headers
records = []
for rowno, row in enumerate(f_csv, 1):
if not row: # Skip rows with no data
continue
# If specific column indices are selected, pick them out
if indices:
row = [ row[index] for index in indices]
# Apply type conversion to the row
if types:
try:
row = [func(val) for func, val in zip(types, row)]
except ValueError as e:
if not silence_errors:
print "Row %d: Couldn't convert %s" % (rowno, row)
print "Row %d: Reason %s" % (rowno, e)
continue
# Make a dictionary or a tuple
if output_columns:
record = dict(zip(output_columns, row))
else:
record = tuple(row)
records.append(record)
f.close()
return records
[ Back ]