Source code for etlhelper
"""
Library to simplify data transfer between databases
"""
import logging
import sys
from importlib.metadata import (
PackageNotFoundError,
version,
)
from typing import TextIO
# Import helper functions here for more convenient access
from etlhelper.abort import abort_etlhelper_threads
from etlhelper.db_params import DbParams
from etlhelper.etl import (
copy_rows,
execute,
executemany,
fetchone,
fetchall,
generate_insert_sql,
iter_chunks,
iter_rows,
load,
copy_table_rows,
)
from etlhelper.connect import (
connect,
get_connection_string,
get_sqlalchemy_connection_string,
)
from etlhelper.utils import (
table_info,
)
from etlhelper import (
row_factories,
)
try:
__version__ = version(__name__)
except PackageNotFoundError:
__version__ = "0.0.0"
# Create etlhelper logger and clear the logger handlers
# This prevents a new logger from being created when running 'logging.getLogger("etlhelper")'
# with default handlers
logging.getLogger("etlhelper").handlers.clear()
[docs]
def log_to_console(
level: int = logging.INFO,
output: TextIO = sys.stderr,
) -> None:
"""
Log ETL Helper messages to the given output.
:param level: logger level
:param output: the output location of the logger messages
"""
logger = logging.getLogger('etlhelper')
# Clear all existing handlers to prevent duplicate output
logger.handlers.clear()
# Prepare log handler. See this StackOverflow answer for details:
# https://stackoverflow.com/a/27835318/3508733
class CleanDebugMessageFormatter(logging.Formatter):
default_fmt = logging.Formatter('%(asctime)s %(funcName)s: %(message)s')
debug_fmt = logging.Formatter('%(message)s')
def format(self, record: logging.LogRecord) -> str:
if record.levelno < logging.INFO:
return self.debug_fmt.format(record)
else:
return self.default_fmt.format(record)
handler = logging.StreamHandler(output)
handler.setFormatter(CleanDebugMessageFormatter())
# Configure logger
logger.addHandler(handler)
logger.setLevel(level=level)
__all__ = [
"DbParams",
"abort_etlhelper_threads",
"connect",
"copy_rows",
"copy_table_rows",
"execute",
"executemany",
"fetchall",
"fetchone",
"generate_insert_sql",
"get_connection_string",
"get_sqlalchemy_connection_string",
"iter_chunks",
"iter_rows",
"load",
"log_to_console",
"row_factories",
"table_info",
]