Source code for etlhelper.db_helpers.postgres

"""
Database helper for PostgreSQL
"""
import warnings
from textwrap import dedent

from etlhelper.db_helpers.db_helper import DbHelper


[docs] class PostgresDbHelper(DbHelper): """ Postgres db helper class """ table_info_query = dedent(""" SELECT pg_attribute.attname AS name, pg_catalog.format_type(pg_attribute.atttypid, pg_attribute.atttypmod) AS type, (case when pg_attribute.attnotnull then 1 else 0 end) as not_null, (case when pg_attribute.atthasdef then 1 else 0 end) as has_default FROM pg_catalog.pg_attribute INNER JOIN pg_catalog.pg_class ON pg_class.oid = pg_attribute.attrelid INNER JOIN pg_catalog.pg_namespace ON pg_namespace.oid = pg_class.relnamespace WHERE pg_attribute.attnum > 0 AND NOT pg_attribute.attisdropped AND pg_class.relname = %s AND pg_namespace.nspname ~ COALESCE(%s, '.*') ORDER BY attnum ASC; """).strip() def __init__(self): super().__init__() self.required_params = {'host', 'port', 'dbname', 'user'} self.missing_driver_msg = ( "Could not import psycopg2 module required for PostgreSQL connections. " "See https://github.com/BritishGeologicalSurvey/etlhelper for installation instructions") self.named_paramstyle = 'pyformat' self.positional_paramstyle = 'format' try: import psycopg2 self.sql_exceptions = (psycopg2.DatabaseError, psycopg2.InterfaceError) self.connect_exceptions = (psycopg2.DatabaseError, psycopg2.InterfaceError) self.paramstyle = psycopg2.paramstyle self._connect_func = psycopg2.connect except ImportError: warnings.warn(self.missing_driver_msg) def get_connection_string(self, db_params, password_variable): """ Return a connection string :param db_params: DbParams :param password_variable: str, password :return: str """ # Prepare connection string password = self.get_password(password_variable) return (f'host={db_params.host} port={db_params.port} ' f'dbname={db_params.dbname} ' f'user={db_params.user} password={password}') def get_sqlalchemy_connection_string(self, db_params, password_variable): """ Returns connection string for SQLAlchemy engine. """ password = self.get_password(password_variable) return (f'postgresql://{db_params.user}:{password}@' f'{db_params.host}:{db_params.port}/{db_params.dbname}') @staticmethod def executemany(cursor, query, chunk): """ Call execute_batch method for PostGres. :param cursor: Open database cursor. :param query: str, SQL query :param chunk: list, Rows of parameters. """ # Here we use execute_batch to send multiple inserts to db at once. # This is faster than executemany() because it results in fewer db # calls. execute_values() or preparing single statement with # mogrify() were not used because resulting input statement is less # clear and selective formatting of inputs for spatial vs non-spatial # tables adds significant code complexity. # See following for background: # https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038 # https://www.compose.com/articles/formatted-sql-in-python-with-psycopgs-mogrify/ from psycopg2.extras import execute_batch execute_batch(cursor, query, chunk, page_size=len(chunk))