API¶
etlhelper¶
Library to simplify data transfer between databases
- class etlhelper.DbParams(dbtype='dbtype not set', **kwargs)[source]¶
Generic data holder class for database connection parameters.
- connect(password_variable=None, **kwargs)[source]¶
Return database connection.
- Parameters:
password_variable – str, name of environment variable with password
kwargs – connection specific keyword arguments e.g. row_factory
- Returns:
Connection object
- copy()[source]¶
Return a shallow copy of DbParams object.
- Return DbParams:
DbParams object with same attributes as original.
- classmethod from_environment(prefix='ETLHelper_')[source]¶
Create DbParams object from parameters specified by environment variables e.g. ETLHelper_dbtype, ETLHelper_host, ETLHelper_port, etc. :param prefix: str, prefix to environment variable names
- get_connection_string(password_variable=None)[source]¶
Return the connection string.
- Parameters:
password_variable – str, name of environment variable with password
- Returns:
str, Connection string
- get_sqlalchemy_connection_string(password_variable=None)[source]¶
Get a SQLAlchemy connection string.
- Parameters:
password_variable – str, name of environment variable with password
- Returns:
str, Connection string
- is_reachable()[source]¶
Test whether network allows opening of tcp/ip connection to database. No username or password are required.
- Return bool:
- property paramstyle¶
The DBAPI2 paramstyle attribute for database type
- etlhelper.abort_etlhelper_threads()[source]¶
Abort the ETL Helper process at the end of the current chunk. During threaded operation, this will cause an ETL HelperAbort error to be raised.
- etlhelper.connect(db_params, password_variable=None, **kwargs)[source]¶
Return database connection.
- Parameters:
db_params – DbParams object or similar with appropriate attributes
password_variable – str, name of environment variable with password
kwargs – connection specific keyword arguments e.g. row_factory
- Returns:
Connection object
- etlhelper.copy_rows(select_query: str, source_conn: ~etlhelper.types.Connection, insert_query: str, dest_conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, on_error: ~typing.Callable | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int] [source]¶
Copy rows from source_conn to dest_conn. select_query and insert_query specify the data to be transferred.
Note: ODBC driver requires separate connections for source_conn and dest_conn, even if they represent the same database.
- Geometry columns from Oracle spatial should be selected with:
SDO_UTIL.TO_WKTGEOMETRY(shape_bng) AS geom_wkt
- and inserted into PostGIS with:
ST_GeomFromText(%s, 27700)
Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.
- Parameters:
select_query – SQL query to select data
source_conn – dbapi connection
insert_query – SQL query to insert data
dest_conn – dbapi connection
parameters – bind variables to insert in the select query
row_factory – function that accepts a cursor and returns a function for parsing each row
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
on_error – function to be applied to failed rows in each chunk
commit_chunks – commit after each chunk (see executemany)
chunk_size – size of chunks to group data by
- Returns:
the number of rows processed and the number of rows failed
- etlhelper.copy_table_rows(table: str, source_conn: ~etlhelper.types.Connection, dest_conn: ~etlhelper.types.Connection, target: str | None = None, row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, on_error: ~typing.Callable | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int] [source]¶
Copy rows from ‘table’ in source_conn to same or target table in dest_conn. This is a simple copy of all columns and rows using load to insert data. It is possible to apply a transform e.g. to change the case of table names. For more control, use copy_rows.
Note: ODBC driver requires separate connections for source_conn and dest_conn, even if they represent the same database.
Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.
- Parameters:
table – name of table
source_conn – dbapi connection
dest_conn – dbapi connection
target – name of target table, if different from source
row_factory – function that accepts a cursor and returns a function for parsing each row
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
on_error – function to be applied to failed rows in each chunk
commit_chunks – commit after each chunk (see executemany)
chunk_size – size of chunks to group data by
- Returns:
the number of rows processed and the number of rows failed
- etlhelper.execute(query: str, conn: Connection, parameters: Collection[Any] = ()) None [source]¶
Run SQL query against connection.
- Parameters:
query – SQL query to execute
conn – dbapi connection
parameters – bind variables to insert in the query
- Raises:
ETLHelperQueryError – if SQL raises an error
- etlhelper.executemany(query: str, conn: Connection, rows: Iterable[Collection[Any]], transform: Callable[[list[Collection[Any]]], list[Collection[Any]]] | None = None, on_error: Callable[[list[FailedRow]], Any] | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int] [source]¶
Use query to insert/update data from rows to database at conn. This method uses the executemany or execute_batch (PostgreSQL) commands to process the data in chunks and avoid creating a new database connection for each row. Row data are passed as parameters into query.
Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.
commit_chunks controls if chunks the transaction should be committed after each chunk has been inserted. Committing chunks means that errors during a long-running insert do not require all data to be loaded again. The disadvantage is that investigation may be required to determine exactly which records have been successfully transferred.
- Parameters:
query – SQL insert command with placeholders for data
conn – dbapi connection
rows – an iterable of rows containing data to be inserted/updated
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
on_error – function to be applied to failed rows in each chunk
commit_chunks – commit after each chunk has been inserted/updated
chunk_size – size of chunks to group data by
- Returns:
the number of rows processed and the number of rows failed
- Raises:
ETLHelperInsertError – if SQL raises an error
- etlhelper.fetchall(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 5000) list[Collection[Any]] [source]¶
Get all results of query as a list. See iter_rows for details.
- Parameters:
select_query – SQL query to execute
conn – dbapi connection
parameters – bind variables to insert in the query
row_factory – function that accepts a cursor and returns a function for parsing each row
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
chunk_size – size of chunks to group data by
- Returns:
a row of data using the given row_factory
- etlhelper.fetchone(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 1) Collection[Any] | None [source]¶
Get first result of query. See iter_rows for details. Note: iter_rows is recommended for looping over rows individually.
- Parameters:
select_query – SQL query to execute
conn – dbapi connection
parameters – bind variables to insert in the query
row_factory – function that accepts a cursor and returns a function for parsing each row
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
chunk_size – size of chunks to group data by
- Returns:
None or a row of data using the given row_factory
- etlhelper.generate_insert_sql(table: str, row: Collection[Any], conn: Connection) str [source]¶
Generate insert SQL for table, getting column names from row and the Generate insert SQL for table, getting column names from row and the placeholder style from the connection. row is either a namedtuple or a dictionary.
- Parameters:
table – name of table
row – a single row as a namedtuple or dict
conn – dbapi connection
- Returns:
SQL statement to insert data into the given table
- Raises:
ETLHelperInsertError – if ‘row’ is not a namedtuple or a dict, or if the database connection encounters a parameter error
- etlhelper.get_connection_string(db_params, password_variable)[source]¶
Get a connection string
- Parameters:
db_params – DbParams object or similar with appropriate attributes
password_variable – str, name of environment variable with password
- Returns:
str, Connection string
- etlhelper.get_sqlalchemy_connection_string(db_params, password_variable)[source]¶
Get a SQLAlchemy connection string.
- Parameters:
db_params – DbParams object or similar with appropriate attributes
password_variable – str, name of environment variable with password
- Returns:
str, Connection string
- etlhelper.iter_chunks(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 5000) Iterator[list[Collection[Any]]] [source]¶
Run SQL query against connection and return iterator object to loop over results in batches of chunksize (default 5000).
The row_factory changes the output format of the results. Other row factories e.g. dict_row_factory are available.
The transform function is applied to chunks of data as they are extracted from the database.
All data extraction functions call this function, directly or indirectly.
- Parameters:
select_query – SQL query to execute
conn – dbapi connection
parameters – bind variables to insert in the query
row_factory – function that accepts a cursor and returns a function for parsing each row
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
chunk_size – size of chunks to group data by
- Returns:
generator returning a list of objects which each represent a row of data using the given row_factory
- Raises:
ETLHelperExtractError – if SQL raises an error
- etlhelper.iter_rows(select_query: str, conn: ~etlhelper.types.Connection, parameters: tuple = (), row_factory: ~typing.Callable = <function dict_row_factory>, transform: ~typing.Callable[[list[~typing.Collection[~typing.Any]]], list[~typing.Collection[~typing.Any]]] | None = None, chunk_size: int = 5000) Iterator[Collection[Any]] [source]¶
Run SQL query against connection and return iterator object to loop over results, row-by-row.
- Parameters:
select_query – SQL query to execute
conn – dbapi connection
parameters – bind variables to insert in the query
row_factory – function that accepts a cursor and returns a function for parsing each row
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
chunk_size – size of chunks to group data by
- Returns:
generator returning a list of objects which each represent a row of data using the given row_factory
- etlhelper.load(table: str, conn: Connection, rows: Iterable[Collection[Any]], transform: Callable[[list[Collection[Any]]], list[Collection[Any]]] | None = None, on_error: Callable | None = None, commit_chunks: bool = True, chunk_size: int = 5000) tuple[int, int] [source]¶
Load data from iterable of named tuples or dictionaries into pre-existing table in database on conn.
Default behaviour is to raise an exception in the case of SQL errors such as primary key violations. If the on_error parameter is specified, the exception will be caught then then rows of each chunk re-tried individually. Further errors will be caught and appended to a list of (row, exception) tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument.
- Parameters:
table – name of table
conn – dbapi connection
rows – iterable of named tuples or dictionaries of data
transform – function that accepts a list of rows and returns an list of rows (possibly of different shape)
on_error – function to be applied to failed rows in each chunk
commit_chunks – commit after each chunk (see executemany)
chunk_size – size of chunks to group data by
- Returns:
the number of rows processed and the number of rows failed
- etlhelper.log_to_console(level: int = 20, output: ~typing.TextIO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>) None [source]¶
Log ETL Helper messages to the given output.
- Parameters:
level – logger level
output – the output location of the logger messages
- etlhelper.table_info(table, conn, schema=None)[source]¶
Return basic metadata for each of the columns of ‘table’ on ‘conn’.
- Parameters:
table – str, the table to describe
conn – dbapi connection
schema – str, optional name of schema for table
- Returns columns:
list, tuples of (name, type, not_null, has_default)
etlhelper.row_factories¶
Row factories are functions that process row data as it comes from the database. These are applied by the iter_rows function.
- A row_factory function must:
accept a cursor object as an input
only use methods on the cursor that are described by DBAPI
return a function that takes a tuple
- etlhelper.row_factories.dict_row_factory(cursor)[source]¶
Return function to convert output row to a dictionary.
Dictionaries allow access to attributes via name (using key notation, e.g. row[“id”]. They are mutable, so are convenient to modify directly in transform functions. Insert statements based on dictionaries must use named placeholders for parameters (e.g. :id, %(id)s).
- etlhelper.row_factories.list_row_factory(cursor)[source]¶
Return function to convert output row to a list.
Lists allow access to attributes via position (e.g. row[0]). They are mutable, so are convient to modify directly in transform functions. Insert statements based on lists must use positional placeholders for parameters (e.g. ?, :1, %s).
- etlhelper.row_factories.namedtuple_row_factory(cursor)[source]¶
Return function to convert output row to a named tuple.
Named tuples allow access to attributes via both position (e.g. row[0]) or name (using dot notation, e.g. row.id). They are immutable, so cannot be modified directly in transform functions. Insert statements based on named tuples must use positional placeholders for parameters (e.g. ?, :1, %s).
- etlhelper.row_factories.tuple_row_factory(cursor)[source]¶
Return function to convert output row to a tuple.
Tuples allow access to attributes via position (e.g. row[0]). They are immutable, so cannot be modified directly in transform functions. Insert statements based on tuples must use positional placeholders for parameters (e.g. ?, :1, %s).
As the DBAPI default is already to return rows as tuples, using the tuple row factory minimises processing overhead.
DB Helpers¶
DbHelper classes are used behind-the-scenes for customising the behaviour of different database drivers. They are not normally called directly.
For more details, see the source code in each module.
Exceptions¶
ETLHelper exceptions are derived from the ETLHelperError base class.
- class etlhelper.exceptions.ETLHelperConnectionError[source]¶
Exception raised for bad database connections
- class etlhelper.exceptions.ETLHelperQueryError[source]¶
Exception raised for SQL query errors and similar
- class etlhelper.exceptions.ETLHelperDbParamsError[source]¶
Exception raised for bad database parameters