API

etlhelper

Library to simplify data transfer between databases

class etlhelper.DbParams(dbtype='dbtype not set', **kwargs)[source]

Generic data holder class for database connection parameters.

connect(password_variable=None, **kwargs)[source]

Return database connection.

Parameters:
  • password_variable – str, name of environment variable with password

  • kwargs – connection specific keyword arguments e.g. row_factory

Returns:

Connection object

copy()[source]

Return a shallow copy of DbParams object.

Return DbParams:

DbParams object with same attributes as original.

classmethod from_environment(prefix='ETLHelper_')[source]

Create DbParams object from parameters specified by environment variables e.g. ETLHelper_dbtype, ETLHelper_host, ETLHelper_port, etc. :param prefix: str, prefix to environment variable names

get_connection_string(password_variable=None)[source]

Return the connection string.

Parameters:

password_variable – str, name of environment variable with password

Returns:

str, Connection string

get_sqlalchemy_connection_string(password_variable=None)[source]

Get a SQLAlchemy connection string.

Parameters:

password_variable – str, name of environment variable with password

Returns:

str, Connection string

is_reachable()[source]

Test whether network allows opening of tcp/ip connection to database. No username or password are required.

Return bool:

property paramstyle

The DBAPI2 paramstyle attribute for database type

validate_params()[source]

Validate database parameters.

Should validate that a dbtype is a valid one and that the appropriate params have been passed for a particular db_type.

Raises:

ETLHelperParamsError – Error if params are invalid

etlhelper.abort_etlhelper_threads()[source]

Abort the ETL Helper process at the end of the current chunk. During threaded operation, this will cause an ETL HelperAbort error to be raised.

etlhelper.connect(db_params, password_variable=None, **kwargs)[source]

Return database connection.

Parameters:
  • db_params – DbParams object or similar with appropriate attributes

  • password_variable – str, name of environment variable with password

  • kwargs – connection specific keyword arguments e.g. row_factory

Returns:

Connection object

etlhelper.copy_rows(select_query: str, source_conn: ~etlhelper.types.Connection, insert_query: str, dest_conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, on_error: ~typing.Callable | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int][source]

Copy rows from source_conn to dest_conn. select_query and insert_query specify the data to be transferred.

Note: ODBC driver requires separate connections for source_conn and dest_conn, even if they represent the same database.

Geometry columns from Oracle spatial should be selected with:

SDO_UTIL.TO_WKTGEOMETRY(shape_bng) AS geom_wkt

and inserted into PostGIS with:

ST_GeomFromText(%s, 27700)

Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.

Parameters:
  • select_query – SQL query to select data

  • source_conn – dbapi connection

  • insert_query – SQL query to insert data

  • dest_conn – dbapi connection

  • parameters – bind variables to insert in the select query

  • row_factory – function that accepts a cursor and returns a function for parsing each row

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • on_error – function to be applied to failed rows in each chunk

  • commit_chunks – commit after each chunk (see executemany)

  • chunk_size – size of chunks to group data by

Returns:

the number of rows processed and the number of rows failed

etlhelper.copy_table_rows(table: str, source_conn: ~etlhelper.types.Connection, dest_conn: ~etlhelper.types.Connection, target: str | None = None, row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, on_error: ~typing.Callable | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int][source]

Copy rows from ‘table’ in source_conn to same or target table in dest_conn. This is a simple copy of all columns and rows using load to insert data. It is possible to apply a transform e.g. to change the case of table names. For more control, use copy_rows.

Note: ODBC driver requires separate connections for source_conn and dest_conn, even if they represent the same database.

Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.

Parameters:
  • table – name of table

  • source_conn – dbapi connection

  • dest_conn – dbapi connection

  • target – name of target table, if different from source

  • row_factory – function that accepts a cursor and returns a function for parsing each row

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • on_error – function to be applied to failed rows in each chunk

  • commit_chunks – commit after each chunk (see executemany)

  • chunk_size – size of chunks to group data by

Returns:

the number of rows processed and the number of rows failed

etlhelper.execute(query: str, conn: Connection, parameters: Collection[Any] = ()) None[source]

Run SQL query against connection.

Parameters:
  • query – SQL query to execute

  • conn – dbapi connection

  • parameters – bind variables to insert in the query

Raises:

ETLHelperQueryError – if SQL raises an error

etlhelper.executemany(query: str, conn: Connection, rows: Iterable[Collection[Any]], transform: Callable[[list[Collection[Any]]], list[Collection[Any]]] | None = None, on_error: Callable[[list[FailedRow]], Any] | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int][source]

Use query to insert/update data from rows to database at conn. This method uses the executemany or execute_batch (PostgreSQL) commands to process the data in chunks and avoid creating a new database connection for each row. Row data are passed as parameters into query.

Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.

commit_chunks controls if chunks the transaction should be committed after each chunk has been inserted. Committing chunks means that errors during a long-running insert do not require all data to be loaded again. The disadvantage is that investigation may be required to determine exactly which records have been successfully transferred.

Parameters:
  • query – SQL insert command with placeholders for data

  • conn – dbapi connection

  • rows – an iterable of rows containing data to be inserted/updated

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • on_error – function to be applied to failed rows in each chunk

  • commit_chunks – commit after each chunk has been inserted/updated

  • chunk_size – size of chunks to group data by

Returns:

the number of rows processed and the number of rows failed

Raises:

ETLHelperInsertError – if SQL raises an error

etlhelper.fetchall(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 5000) list[Collection[Any]][source]

Get all results of query as a list. See iter_rows for details.

Parameters:
  • select_query – SQL query to execute

  • conn – dbapi connection

  • parameters – bind variables to insert in the query

  • row_factory – function that accepts a cursor and returns a function for parsing each row

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • chunk_size – size of chunks to group data by

Returns:

a row of data using the given row_factory

etlhelper.fetchone(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 1) Collection[Any] | None[source]

Get first result of query. See iter_rows for details. Note: iter_rows is recommended for looping over rows individually.

Parameters:
  • select_query – SQL query to execute

  • conn – dbapi connection

  • parameters – bind variables to insert in the query

  • row_factory – function that accepts a cursor and returns a function for parsing each row

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • chunk_size – size of chunks to group data by

Returns:

None or a row of data using the given row_factory

etlhelper.generate_insert_sql(table: str, row: Collection[Any], conn: Connection) str[source]

Generate insert SQL for table, getting column names from row and the Generate insert SQL for table, getting column names from row and the placeholder style from the connection. row is either a namedtuple or a dictionary.

Parameters:
  • table – name of table

  • row – a single row as a namedtuple or dict

  • conn – dbapi connection

Returns:

SQL statement to insert data into the given table

Raises:

ETLHelperInsertError – if ‘row’ is not a namedtuple or a dict, or if the database connection encounters a parameter error

etlhelper.get_connection_string(db_params, password_variable)[source]

Get a connection string

Parameters:
  • db_params – DbParams object or similar with appropriate attributes

  • password_variable – str, name of environment variable with password

Returns:

str, Connection string

etlhelper.get_sqlalchemy_connection_string(db_params, password_variable)[source]

Get a SQLAlchemy connection string.

Parameters:
  • db_params – DbParams object or similar with appropriate attributes

  • password_variable – str, name of environment variable with password

Returns:

str, Connection string

etlhelper.iter_chunks(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 5000) Iterator[list[Collection[Any]]][source]

Run SQL query against connection and return iterator object to loop over results in batches of chunksize (default 5000).

The row_factory changes the output format of the results. Other row factories e.g. dict_row_factory are available.

The transform function is applied to chunks of data as they are extracted from the database.

All data extraction functions call this function, directly or indirectly.

Parameters:
  • select_query – SQL query to execute

  • conn – dbapi connection

  • parameters – bind variables to insert in the query

  • row_factory – function that accepts a cursor and returns a function for parsing each row

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • chunk_size – size of chunks to group data by

Returns:

generator returning a list of objects which each represent a row of data using the given row_factory

Raises:

ETLHelperExtractError – if SQL raises an error

etlhelper.iter_rows(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 5000) Iterator[Collection[Any]][source]

Run SQL query against connection and return iterator object to loop over results, row-by-row.

Parameters:
  • select_query – SQL query to execute

  • conn – dbapi connection

  • parameters – bind variables to insert in the query

  • row_factory – function that accepts a cursor and returns a function for parsing each row

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • chunk_size – size of chunks to group data by

Returns:

generator returning a list of objects which each represent a row of data using the given row_factory

etlhelper.load(table: str, conn: Connection, rows: Iterable[Collection[Any]], transform: Callable[[list[Collection[Any]]], list[Collection[Any]]] | None = None, on_error: Callable | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int][source]

Load data from iterable of named tuples or dictionaries into pre-existing table in database on conn.

Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.

Parameters:
  • table – name of table

  • conn – dbapi connection

  • rows – iterable of named tuples or dictionaries of data

  • transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)

  • on_error – function to be applied to failed rows in each chunk

  • commit_chunks – commit after each chunk (see executemany)

  • chunk_size – size of chunks to group data by

Returns:

the number of rows processed and the number of rows failed

etlhelper.log_to_console(level: int = 20, output: ~typing.TextIO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>) None[source]

Log ETL Helper messages to the given output.

Parameters:
  • level – logger level

  • output – the output location of the logger messages

etlhelper.table_info(table, conn, schema=None)[source]

Return basic metadata for each of the columns of ‘table’ on ‘conn’.

Parameters:
  • table – str, the table to describe

  • conn – dbapi connection

  • schema – str, optional name of schema for table

Returns columns:

list, tuples of (name, type, not_null, has_default)

etlhelper.row_factories

Row factories are functions that process row data as it comes from the database. These are applied by the iter_rows function.

A row_factory function must:
  • accept a cursor object as an input

  • only use methods on the cursor that are described by DBAPI

  • return a function that takes a tuple

etlhelper.row_factories.dict_row_factory(cursor)[source]

Return function to convert output row to a dictionary.

Dictionaries allow access to attributes via name (using key notation, e.g. row[“id”]. They are mutable, so are convenient to modify directly in transform functions. Insert statements based on dictionaries must use named placeholders for parameters (e.g. :id, %(id)s).

etlhelper.row_factories.list_row_factory(cursor)[source]

Return function to convert output row to a list.

Lists allow access to attributes via position (e.g. row[0]). They are mutable, so are convient to modify directly in transform functions. Insert statements based on lists must use positional placeholders for parameters (e.g. ?, :1, %s).

etlhelper.row_factories.namedtuple_row_factory(cursor)[source]

Return function to convert output row to a named tuple.

Named tuples allow access to attributes via both position (e.g. row[0]) or name (using dot notation, e.g. row.id). They are immutable, so cannot be modified directly in transform functions. Insert statements based on named tuples must use positional placeholders for parameters (e.g. ?, :1, %s).

etlhelper.row_factories.tuple_row_factory(cursor)[source]

Return function to convert output row to a tuple.

Tuples allow access to attributes via position (e.g. row[0]). They are immutable, so cannot be modified directly in transform functions. Insert statements based on tuples must use positional placeholders for parameters (e.g. ?, :1, %s).

As the DBAPI default is already to return rows as tuples, using the tuple row factory minimises processing overhead.

DB Helpers

DbHelper classes are used behind-the-scenes for customising the behaviour of different database drivers. They are not normally called directly.

For more details, see the source code in each module.

class etlhelper.db_helpers.DbHelper[source]

Abstract Base Class for DBHelpers

class etlhelper.db_helpers.SQLiteDbHelper[source]

SQLite DB helper class

class etlhelper.db_helpers.PostgresDbHelper[source]

Postgres db helper class

class etlhelper.db_helpers.OracleDbHelper[source]

Oracle DB helper class

class etlhelper.db_helpers.MSSQLDbHelper[source]

MS Sql server helper class

Exceptions

ETLHelper exceptions are derived from the ETLHelperError base class.

class etlhelper.exceptions.ETLHelperError[source]

Base class for exceptions in this module

class etlhelper.exceptions.ETLHelperConnectionError[source]

Exception raised for bad database connections

class etlhelper.exceptions.ETLHelperQueryError[source]

Exception raised for SQL query errors and similar

class etlhelper.exceptions.ETLHelperDbParamsError[source]

Exception raised for bad database parameters

class etlhelper.exceptions.ETLHelperExtractError[source]

Exception raised when extracting data.

class etlhelper.exceptions.ETLHelperInsertError[source]

Exception raised when inserting data.

class etlhelper.exceptions.ETLHelperAbort[source]

Exception raised when abort is called.

class etlhelper.exceptions.ETLHelperHelperError[source]

Exception raised when helper selection fails.

class etlhelper.exceptions.ETLHelperBadIdentifierError[source]

Exception raised when identifier contains invalid characters. This may indicate an attempt at SQL injection.