DBHose¶
- DBHose(
- table_dest,
- connection_dest,
- connection_src=None,
- dq_skip_check=[],
- filter_by=[],
- drop_temp_table=True,
- move_method=MoveMethod.replace,
- custom_move=None,
- compress_method=CompressionMethod.ZSTD,
- timeout=DBMS_DEFAULT_TIMEOUT_SEC,
- )
Основной класс DBHose для переноса данных между СУБД.
Добавлено в версии 0.1.0.
Назначение:
Класс предоставляет унифицированный интерфейс для переноса данных между различными системами управления базами данных (СУБД) в Apache Airflow DAGs. Поддерживает ClickHouse, PostgreSQL и Greenplum.
Параметры инициализации
- table_dest: str¶
Имя целевой таблицы для записи данных.
Примеры:
"target_table""schema.target_table""database.schema.table"
- connection_dest: str¶
ID соединения Airflow для целевой СУБД.
Примеры:
"target_postgres""clickhouse_prod""greenplum_warehouse"
- connection_src: str | None = None¶
ID соединения Airflow для исходной СУБД. Если
None, используется только целевое соединение (например, для выполнения SQL запросов в целевой БД).Примеры:
"source_postgres"None(если источник не нужен)
- dq_skip_check: List[str] = []¶
Список проверок качества данных, которые нужно пропустить. См.
DQCheckдля доступных проверок.Примеры:
# Пропустить проверку на дубликаты и будущие даты dq_skip_check = ["uniq", "future"] # Пропустить все проверки dq_skip_check = list(DQCheck.__members__.keys())
- filter_by: List[str] = []¶
Список колонок для фильтрации данных при использовании метода
delete.Примеры:
# Фильтрация по дате filter_by = ["date_column"] # Фильтрация по нескольким колонкам filter_by = ["date_column", "region_id", "user_id"] # Без фильтрации filter_by = []
Примечание: Для метода
deleteфильтрация обязательна.
- drop_temp_table: bool = True¶
Удаление промежуточной таблицы после завершения операции.
Значения:
True: Промежуточная таблица удаляетсяFalse: Промежуточная таблица остается (для отладки)
- move_method: MoveMethod = MoveMethod.replace¶
Метод перемещения данных из промежуточной таблицы в целевую. См.
MoveMethodдля всех доступных методов.Примеры:
from dbhose_airflow import MoveMethod # Заменить данные полностью (по умолчанию) move_method = MoveMethod.replace # Добавить новые данные move_method = MoveMethod.append # Удалить старые и добавить новые move_method = MoveMethod.delete
- custom_move: str | None = None¶
Пользовательский SQL запрос для перемещения данных. Используется только с
move_method=MoveMethod.custom.Примеры:
# Пользовательский INSERT custom_move = """ INSERT INTO {target_table} SELECT * FROM {temp_table} WHERE date >= '2024-01-01' """ # Пользовательский MERGE custom_move = """ MERGE INTO {target_table} AS target USING {temp_table} AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.value = source.value, target.updated_at = NOW() WHEN NOT MATCHED THEN INSERT VALUES (source.id, source.value, NOW()) """
- compress_method: CompressionMethod = CompressionMethod.ZSTD¶
Метод сжатия данных при передаче. См.
CompressionMethodдля всех методов.Примеры:
from light_compressor import CompressionMethod # Сбалансированное сжатие (по умолчанию) compress_method = CompressionMethod.ZSTD # Быстрое сжатие compress_method = CompressionMethod.LZ4 # Без сжатия (не рекомендуется) compress_method = CompressionMethod.NONE
- timeout: int = DBMS_DEFAULT_TIMEOUT_SEC¶
Таймаут операций с СУБД в секундах. См.
DBMS_DEFAULT_TIMEOUT_SEC(300 секунд).Примеры:
# Стандартный таймаут (5 минут) timeout = 300 # Для больших таблиц (10 минут) timeout = 600 # Для очень больших таблиц (30 минут) timeout = 1800
Атрибуты экземпляра
После инициализации создаются следующие атрибуты:
- logger: Logger¶
Логгер для записи событий и ошибок.
- dumper_dest: NativeDumper | PGPackDumper¶
Объект дампера для целевой СУБД. Создается функцией
dbhose_dumper().
- dumper_src: NativeDumper | PGPackDumper | None¶
Объект дампера для исходной СУБД (если указан
connection_src).
- ddl: str | None¶
DDL (Data Definition Language) целевой таблицы. Заполняется при вызове методов класса.
- temp_ddl: str | None¶
DDL промежуточной таблицы. Заполняется при вызове методов класса.
- table_temp: str | None¶
Имя промежуточной таблицы. Генерируется автоматически.
- filter_by: str¶
Строковое представление колонок фильтрации (преобразуется из списка).
Примеры инициализации
from dbhose_airflow import DBHose, MoveMethod from light_compressor import CompressionMethod # Пример 1: Простая инициализация для PostgreSQL → PostgreSQL dbhose = DBHose( table_dest="public.target_table", connection_dest="target_postgres", connection_src="source_postgres", move_method=MoveMethod.replace, ) # Пример 2: Инициализация с фильтрацией для ClickHouse dbhose = DBHose( table_dest="default.analytics_data", connection_dest="clickhouse_prod", connection_src="postgres_source", filter_by=["event_date", "user_id"], # Фильтрация по дате и пользователю move_method=MoveMethod.delete, # Удаление с фильтрацией compress_method=CompressionMethod.LZ4, # Быстрое сжатие timeout=600, # 10 минут таймаут ) # Пример 3: Только целевое соединение (без источника) dbhose = DBHose( table_dest="staging.temp_data", connection_dest="staging_postgres", connection_src=None, # Только целевая БД drop_temp_table=False, # Оставить таблицу для отладки dq_skip_check=["future", "nan"], # Пропустить некоторые проверки ) # Пример 4: Пользовательский метод перемещения dbhose = DBHose( table_dest="data_warehouse.fact_sales", connection_dest="dw_postgres", connection_src="oltp_postgres", move_method=MoveMethod.custom, # Пользовательский метод custom_move=""" INSERT INTO {target_table} SELECT s.*, NOW() as loaded_at FROM {temp_table} s WHERE s.sale_date >= CURRENT_DATE - INTERVAL '7 days' """, filter_by=["sale_date"], # Фильтрация для оптимизации )
Рекомендации по инициализации
Валидация параметров
def validate_dbhose_params(params: dict) -> bool: """Валидация параметров DBHose.""" # Проверка обязательных параметров required = ['table_dest', 'connection_dest'] for param in required: if param not in params or not params[param]: raise ValueError(f"Обязательный параметр {param} отсутствует") # Проверка move_method и custom_move if params.get('move_method') == MoveMethod.custom: if not params.get('custom_move'): raise ValueError("custom_move обязателен для MoveMethod.custom") # Проверка filter_by для delete метода if params.get('move_method') == MoveMethod.delete: if not params.get('filter_by'): raise ValueError("filter_by обязателен для MoveMethod.delete") return True # Использование params = { 'table_dest': 'my_table', 'connection_dest': 'my_conn', 'move_method': MoveMethod.delete, 'filter_by': ['date_column'], } if validate_dbhose_params(params): dbhose = DBHose(**params)
Логирование
При инициализации класса выводится логотип DBHose и информация о подключенных СУБД.
Примечание
Для метода
deleteобязательна фильтрация (filter_by)Промежуточная таблица имеет название ``имя основной таблицы``_temp
Предупреждение
Не используйте
drop_temp_table=Falseв production без необходимостиДля долгих запросов в Clickhouse требуется изменить
timeoutпри инициализацииИспользуйте пропуск проверок качества (
dq_skip_check) если данная проверка не требуется для конкретной таблицы
См. также
MoveMethod- Методы перемещения данныхDQCheck- Проверки качества данныхCompressionMethod- Методы сжатияDBMS_DEFAULT_TIMEOUT_SEC- Стандартный таймаутdbhose_dumper - Создание дамперов из соединений