dbhose_dumper¶
- dbhose_dumper(
- airflow_connection,
- compress_method=CompressionMethod.ZSTD,
- timeout=DBMS_DEFAULT_TIMEOUT_SEC,
- )
Создает объект дампера из строки соединения Airflow.
- Параметры:
airflow_connection (str) – ID соединения Airflow
compress_method (CompressionMethod) – Метод сжатия, по умолчанию ZSTD
timeout (int) – Таймаут в секундах, по умолчанию 300 (5 минут)
- Результат:
Объект
NativeDumperилиPGPackDumper- Тип результата:
Union[NativeDumper, PGPackDumper]
- Исключение:
KeyError – Если тип соединения не поддерживается
AirflowNotFoundException – Если соединение не найдено
Описание:
Функция создает объект дампера для работы с СУБД на основе конфигурации соединения Airflow. Автоматически определяет тип СУБД (ClickHouse или PostgreSQL/Greenplum) и создает соответствующий дампер с указанными параметрами сжатия и таймаута.
Примеры использования:
from dbhose_airflow import dbhose_dumper, CompressionMethod
# Простое использование со стандартными параметрами
dumper = dbhose_dumper("my_postgres_connection")
# С кастомным сжатием и таймаутом
dumper = dbhose_dumper(
airflow_connection="my_clickhouse_connection",
compress_method=CompressionMethod.LZ4, # Быстрое сжатие
timeout=600, # 10 минут
)
Детали реализации:
Получает объект соединения Airflow по ID
Определяет тип СУБД по
conn_typeСоздает соответствующий коннектор и дампер
Применяет параметры сжатия и таймаута
Возвращает готовый к использованию объект дампера
Практические примеры:
# Пример 1: Создание дамперов для миграции данных
def create_migration_dumpers(source_conn_id: str, target_conn_id: str):
"""Создает пару дамперов для миграции."""
source_dumper = dbhose_dumper(
airflow_connection=source_conn_id,
compress_method=CompressionMethod.ZSTD,
timeout=300
)
target_dumper = dbhose_dumper(
airflow_connection=target_conn_id,
compress_method=CompressionMethod.NONE, # ClickHouse не поддерживает сжатие
timeout=600 # Увеличенный таймаут для ClickHouse
)
return source_dumper, target_dumper
# Пример 2: Фабрика дамперов с валидацией
class DumperFactory:
"""Фабрика для создания и валидации дамперов."""
@staticmethod
def create_validated_dumper(conn_id: str, **kwargs):
"""Создает дампер с валидацией соединения."""
try:
dumper = dbhose_dumper(conn_id, **kwargs)
# Дополнительная валидация
if hasattr(dumper, 'test_connection'):
dumper.test_connection()
return dumper
except KeyError as e:
raise ValueError(f"Неизвестный тип соединения: {conn_id}") from e
except Exception as e:
raise ConnectionError(f"Ошибка создания дампера для {conn_id}: {e}") from e
# Пример 3: Пул дамперов для многопоточной работы
from concurrent.futures import ThreadPoolExecutor
def create_dumper_pool(conn_ids: list[str], max_workers: int = 5):
"""Создает пул дамперов в многопоточном режиме."""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
dumpers = list(executor.map(dbhose_dumper, conn_ids))
return dict(zip(conn_ids, dumpers))
Примечание
Для ClickHouse соединений используется порт 8123 либо 443
Порт 9000 автоматически меняется на 8123
Таймаут нужен только для Clickhouse соединения
См. также
DBHoseDumpParams- Перечисление конфигураций дамперовCompressionMethod- Методы сжатия данныхDBMS_DEFAULT_TIMEOUT_SEC- Стандартный таймаутNativeDumper - Дампер для ClickHouse
PGPackDumper - Дампер для PostgreSQL/Greenplum