Error Handling¶
This section describes Exception classes, on_error
functions and error
handling via SQL.
ETLHelperError¶
ETL Helper has a variety of Exception classes, all of which are subclasses
of the ETLHelperError
base class.
To aid debugging,
the ETLHelperQueryError
,
ETLHelperExtractError
and
ETLHelperInsertError
classes print the SQL query and the required paramstyle as well as the error
message returned by the database.
"""ETL Helper script to demonstrate an extract error."""
import sqlite3
import etlhelper as etl
db_file = "igneous_rocks.db"
select_sql = "SELECT * FROM bad_table"
with sqlite3.connect(db_file) as conn:
rows = etl.fetchall(select_sql, conn)
The output is:
etlhelper.exceptions.ETLHelperExtractError: SQL query raised an error.
SELECT * FROM bad_table
Required paramstyle: qmark
no such table: bad_table
also handling errors in SQL e.g. ON CONFLICT
on_error¶
The default behaviour of etlhelper
is to raise an exception on the
first error and abort the transfer. Sometimes it is desirable to ignore
the errors and to do something else with the failed rows. The
on_error
parameter allows a function to be passed that is applied to
the failed rows of each chunk. The input is a list of (row, exception)
tuples.
Different examples are given here. The simplest approach is to collect all the errors into a list to process at the end.
errors = []
etl.executemany(sql, conn, rows, on_error=errors.extend)
if errors:
do_something()
Errors can be logged to the etlhelper
logger.
"""ETL Helper script to demonstrate logging errors."""
import logging
import sqlite3
import etlhelper as etl
etl.log_to_console()
logger = logging.getLogger("etlhelper")
db_file = "igneous_rocks.db"
rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]
def log_errors(failed_rows: list[tuple[dict, Exception]]) -> None:
for row, exception in failed_rows:
logger.error(exception)
with sqlite3.connect(db_file) as conn:
etl.load("igneous_rock", conn, rows, on_error=log_errors)
The IDs of failed rows can be written to a file.
def write_bad_ids(failed_rows):
with open('bad_ids.txt', 'at') as out_file:
for row, exception in failed_rows:
out_file.write(f"{row.id}\n")
etl.executemany(sql, conn, rows, on_error=write_bad_ids)
executemany
, load
, copy_rows
and copy_table_rows
can all
take an on_error
parameter. They each return a tuple containing the
number of rows processed and the number of rows that failed.
Error handling via SQL¶
The on_error
functions allow individual failed rows to be processed,
however this flexibility can come at the expense of speed.
Each chunk of data that contains a bad row will be retried on a row-by-row
basis.
Databases also have methods for handling errors e.g. duplicate primary keys
using SQL.
By customising an INSERT query (which can be programmatically generated with
generate_insert_query()
) the database
can be instructed how to process such rows.
The following example for SQLite will ignore duplicate rows.
Different databases have different syntax and capabilities, including
upsert
and merge
.
"""ETL Helper script to load records to a database table."""
import sqlite3
import etlhelper as etl
db_file = "igneous_rocks.db"
create_sql = """
CREATE TABLE IF NOT EXISTS igneous_rock (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
grain_size TEXT
)"""
insert_sql = """
INSERT INTO igneous_rock (id, name, grain_size)
VALUES (:id, :name, :grain_size)
ON CONFLICT DO NOTHING
"""
igneous_rocks = [
{"id": 1, "name": "basalt", "grain_size": "fine"},
{"id": 1, "name": "basalt", "grain_size": "fine"} # duplicate row
]
with sqlite3.connect(db_file) as conn:
# Create table
etl.execute(create_sql, conn)
# Insert rows
etl.executemany(insert_sql, conn, rows=igneous_rocks)
# Confirm selection
for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
print(row)
The output is:
{'id': 1, 'name': 'basalt', 'grain_size': 'fine'}