DBHose

DBHose(
table_dest,
connection_dest,
connection_src=None,
dq_skip_check=[],
filter_by=[],
drop_temp_table=True,
move_method=MoveMethod.replace,
custom_move=None,
compress_method=CompressionMethod.ZSTD,
timeout=DBMS_DEFAULT_TIMEOUT_SEC,
)

Основной класс DBHose для переноса данных между СУБД.

Добавлено в версии 0.1.0.

Назначение:

Класс предоставляет унифицированный интерфейс для переноса данных между различными системами управления базами данных (СУБД) в Apache Airflow DAGs. Поддерживает ClickHouse, PostgreSQL и Greenplum.

Параметры инициализации

table_dest: str

Имя целевой таблицы для записи данных.

Примеры:

  • "target_table"

  • "schema.target_table"

  • "database.schema.table"

connection_dest: str

ID соединения Airflow для целевой СУБД.

Примеры:

  • "target_postgres"

  • "clickhouse_prod"

  • "greenplum_warehouse"

connection_src: str | None = None

ID соединения Airflow для исходной СУБД. Если None, используется только целевое соединение (например, для выполнения SQL запросов в целевой БД).

Примеры:

  • "source_postgres"

  • None (если источник не нужен)

dq_skip_check: List[str] = []

Список проверок качества данных, которые нужно пропустить. См. DQCheck для доступных проверок.

Примеры:

# Пропустить проверку на дубликаты и будущие даты
dq_skip_check = ["uniq", "future"]

# Пропустить все проверки
dq_skip_check = list(DQCheck.__members__.keys())
filter_by: List[str] = []

Список колонок для фильтрации данных при использовании метода delete.

Примеры:

# Фильтрация по дате
filter_by = ["date_column"]

# Фильтрация по нескольким колонкам
filter_by = ["date_column", "region_id", "user_id"]

# Без фильтрации
filter_by = []

Примечание: Для метода delete фильтрация обязательна.

drop_temp_table: bool = True

Удаление промежуточной таблицы после завершения операции.

Значения:

  • True: Промежуточная таблица удаляется

  • False: Промежуточная таблица остается (для отладки)

move_method: MoveMethod = MoveMethod.replace

Метод перемещения данных из промежуточной таблицы в целевую. См. MoveMethod для всех доступных методов.

Примеры:

from dbhose_airflow import MoveMethod

# Заменить данные полностью (по умолчанию)
move_method = MoveMethod.replace

# Добавить новые данные
move_method = MoveMethod.append

# Удалить старые и добавить новые
move_method = MoveMethod.delete
custom_move: str | None = None

Пользовательский SQL запрос для перемещения данных. Используется только с move_method=MoveMethod.custom.

Примеры:

# Пользовательский INSERT
custom_move = """
INSERT INTO {target_table}
SELECT * FROM {temp_table}
WHERE date >= '2024-01-01'
"""

# Пользовательский MERGE
custom_move = """
MERGE INTO {target_table} AS target
USING {temp_table} AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET
  target.value = source.value,
  target.updated_at = NOW()
WHEN NOT MATCHED THEN INSERT VALUES
  (source.id, source.value, NOW())
"""
compress_method: CompressionMethod = CompressionMethod.ZSTD

Метод сжатия данных при передаче. См. CompressionMethod для всех методов.

Примеры:

from light_compressor import CompressionMethod

# Сбалансированное сжатие (по умолчанию)
compress_method = CompressionMethod.ZSTD

# Быстрое сжатие
compress_method = CompressionMethod.LZ4

# Без сжатия (не рекомендуется)
compress_method = CompressionMethod.NONE
timeout: int = DBMS_DEFAULT_TIMEOUT_SEC

Таймаут операций с СУБД в секундах. См. DBMS_DEFAULT_TIMEOUT_SEC (300 секунд).

Примеры:

# Стандартный таймаут (5 минут)
timeout = 300

# Для больших таблиц (10 минут)
timeout = 600

# Для очень больших таблиц (30 минут)
timeout = 1800

Атрибуты экземпляра

После инициализации создаются следующие атрибуты:

logger: Logger

Логгер для записи событий и ошибок.

dumper_dest: NativeDumper | PGPackDumper

Объект дампера для целевой СУБД. Создается функцией dbhose_dumper().

dumper_src: NativeDumper | PGPackDumper | None

Объект дампера для исходной СУБД (если указан connection_src).

ddl: str | None

DDL (Data Definition Language) целевой таблицы. Заполняется при вызове методов класса.

temp_ddl: str | None

DDL промежуточной таблицы. Заполняется при вызове методов класса.

table_temp: str | None

Имя промежуточной таблицы. Генерируется автоматически.

filter_by: str

Строковое представление колонок фильтрации (преобразуется из списка).

Примеры инициализации

from dbhose_airflow import DBHose, MoveMethod
from light_compressor import CompressionMethod

# Пример 1: Простая инициализация для PostgreSQL → PostgreSQL
dbhose = DBHose(
    table_dest="public.target_table",
    connection_dest="target_postgres",
    connection_src="source_postgres",
    move_method=MoveMethod.replace,
)

# Пример 2: Инициализация с фильтрацией для ClickHouse
dbhose = DBHose(
    table_dest="default.analytics_data",
    connection_dest="clickhouse_prod",
    connection_src="postgres_source",
    filter_by=["event_date", "user_id"],  # Фильтрация по дате и пользователю
    move_method=MoveMethod.delete,  # Удаление с фильтрацией
    compress_method=CompressionMethod.LZ4,  # Быстрое сжатие
    timeout=600,  # 10 минут таймаут
)

# Пример 3: Только целевое соединение (без источника)
dbhose = DBHose(
    table_dest="staging.temp_data",
    connection_dest="staging_postgres",
    connection_src=None,  # Только целевая БД
    drop_temp_table=False,  # Оставить таблицу для отладки
    dq_skip_check=["future", "nan"],  # Пропустить некоторые проверки
)

# Пример 4: Пользовательский метод перемещения
dbhose = DBHose(
    table_dest="data_warehouse.fact_sales",
    connection_dest="dw_postgres",
    connection_src="oltp_postgres",
    move_method=MoveMethod.custom,  # Пользовательский метод
    custom_move="""
    INSERT INTO {target_table}
    SELECT
        s.*,
        NOW() as loaded_at
    FROM {temp_table} s
    WHERE s.sale_date >= CURRENT_DATE - INTERVAL '7 days'
    """,
    filter_by=["sale_date"],  # Фильтрация для оптимизации
)

Рекомендации по инициализации

Валидация параметров

def validate_dbhose_params(params: dict) -> bool:
    """Валидация параметров DBHose."""

    # Проверка обязательных параметров
    required = ['table_dest', 'connection_dest']
    for param in required:
        if param not in params or not params[param]:
            raise ValueError(f"Обязательный параметр {param} отсутствует")

    # Проверка move_method и custom_move
    if params.get('move_method') == MoveMethod.custom:
        if not params.get('custom_move'):
            raise ValueError("custom_move обязателен для MoveMethod.custom")

    # Проверка filter_by для delete метода
    if params.get('move_method') == MoveMethod.delete:
        if not params.get('filter_by'):
            raise ValueError("filter_by обязателен для MoveMethod.delete")

    return True

# Использование
params = {
    'table_dest': 'my_table',
    'connection_dest': 'my_conn',
    'move_method': MoveMethod.delete,
    'filter_by': ['date_column'],
}

if validate_dbhose_params(params):
    dbhose = DBHose(**params)

Логирование

При инициализации класса выводится логотип DBHose и информация о подключенных СУБД.

Примечание

  • Для метода delete обязательна фильтрация (filter_by)

  • Промежуточная таблица имеет название ``имя основной таблицы``_temp

Предупреждение

  • Не используйте drop_temp_table=False в production без необходимости

  • Для долгих запросов в Clickhouse требуется изменить timeout при инициализации

  • Используйте пропуск проверок качества (dq_skip_check) если данная проверка не требуется для конкретной таблицы

См. также

Методы класса