to_table¶
- DBHose.to_table()¶
Переносит данные из промежуточной таблицы в целевую.
Описание:
Метод выполняет финальный этап переноса данных - перемещение данных из промежуточной таблицы (
table_temp) в целевую (table_dest). В зависимости от выбранногоmove_methodприменяется различная логика переноса данных.Сигнатура:
def to_table(self) -> None: """Move data to destination table."""
Типы методов переноса:
Типы методов переноса данных¶ Метод
SQL
Фильтр
Логика выполнения
append
❌ Нет
❌ Нет
Прямой перенос
write_betweendelete
✅ Да
✅ Да
Удаление по фильтру + вставка
replace
✅ Да
❌ Нет
Полная замена таблицы
rewrite
❌ Нет
❌ Нет
TRUNCATE + прямой перенос
custom
❌ Нет
❌ Нет
Пользовательские SQL запросы
Валидация параметров:
Метод выполняет проверки перед выполнением:
# 1. Проверка фильтрации для методов delete if self.move_method.need_filter and not self.filter_by: raise ValueError("You must specify columns in filter_by") # 2. Проверка custom запроса для custom метода if self.move_method.is_custom and not self.custom_move: raise ValueError("You must specify custom query") # 3. Проверка количества колонок для ClickHouse delete if (self.move_method is MoveMethod.delete and self.dumper_dest.__class__ is NativeDumper and len(self.filter_by.split(", ")) > 4): raise ValueError("Too many columns in filter_by (> 4)")
Примеры использования:
Базовый пример - метод replace¶from dbhose_airflow import DBHose, MoveMethod dbhose = DBHose( table_dest="public.users", connection_dest="postgres_target", connection_src="postgres_source", move_method=MoveMethod.replace, # Полная замена ) dbhose.create_temp() # ... загрузка данных ... dbhose.to_table() # Логи: # INFO:root:Move data with method replace # INFO:root:Data moved into public.users
Пример с фильтрацией - метод delete¶dbhose = DBHose( table_dest="analytics.daily_sales", connection_dest="clickhouse_analytics", connection_src="postgres_sales", move_method=MoveMethod.delete, # Удаление с фильтрацией filter_by=["sale_date", "region"], # Обязательно для delete ) dbhose.create_temp() # ... загрузка данных ... dbhose.to_table() # Удалит старые данные по фильтру и вставит новые
Пример с пользовательским запросом¶dbhose = DBHose( table_dest="data_warehouse.fact_orders", connection_dest="dw_postgres", connection_src="oltp_postgres", move_method=MoveMethod.custom, custom_move=""" -- Удалить старые данные DELETE FROM {table_dest} WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'; -- Вставить новые данные INSERT INTO {table_dest} SELECT *, NOW() as loaded_at FROM {table_temp}; """, ) dbhose.create_temp() # ... загрузка данных ... dbhose.to_table() # Выполнит пользовательские запросы
Пример с методом rewrite (пересоздание)¶dbhose = DBHose( table_dest="cache.report_data", connection_dest="postgres_cache", connection_src="postgres_source", move_method=MoveMethod.rewrite, # Полная перезапись ) dbhose.create_temp() # ... загрузка данных ... dbhose.to_table() # TRUNCATE таблицы и прямой перенос
Особенности для разных СУБД:
Логирование:
Метод детально логирует процесс:
# Начало операции INFO:root:╔══════════════════════════════════════════════════════════╗ INFO:root:║ Move data with method replace ║ INFO:root:╚══════════════════════════════════════════════════════════╝ # Для метода rewrite INFO:root:Clear table operation start INFO:root:Clear table operation done # Успешное завершение INFO:root:╔══════════════════════════════════════════════════════════╗ INFO:root:║ Data moved into public.users ║ INFO:root:╚══════════════════════════════════════════════════════════╝
Обработка ошибок:
Метод выбрасывает исключения при обнаружении проблем:
Типы ошибок¶ Исключение
Условие возникновения
ValueErrormove_method.need_filter=True, ноfilter_byпустойValueErrormove_method.is_custom=True, ноcustom_moveне указанValueErrorClickHouse +
delete+ >4 колонок вfilter_byValueErrorМетод недоступен для таблицы (проверка
is_avaliable)Исключения СУБД
Ошибки выполнения SQL запросов
SQL шаблоны методов:
Методы с
have_sql=Trueиспользуют шаблоны из файлов:# Формат: {dbname}/{method_name}.sql mv_path = "path/to/sql/templates/{}/{}.sql" # Пример для PostgreSQL delete: move_query = read_text("postgres/delete.sql")
Примеры SQL шаблонов:
Метод `write_between`:
Для методов без SQL (
append,rewriteпосле TRUNCATE) используется:self.dumper_dest.write_between(self.table_dest, self.table_temp)
Эта операция:
Читает данные из промежуточной таблицы
Конвертирует в нативный формат СУБД
Записывает в целевую таблицу
Оптимизирована для больших объемов данных
Завершающие операции:
После успешного переноса данных:
# Удаление промежуточной таблицы self.drop_temp()
Рекомендации по выбору метода:
def select_move_method(table_type: str, data_freshness: str) -> MoveMethod: """Автоматический выбор метода переноса.""" if table_type == 'fact_table' and data_freshness == 'incremental': return MoveMethod.delete # Инкрементальное обновление elif table_type == 'dimension_table': return MoveMethod.replace # Полная замена elif table_type == 'cache_table': return MoveMethod.rewrite # Пересоздание elif table_type == 'staging_table': return MoveMethod.append # Простое добавление else: return MoveMethod.replace # По умолчанию
Связанные методы:
Создание промежуточной таблицы
Удаление промежуточной таблицы
Проверка качества данных
Примечания:
Производительность: Методы с SQL обычно медленнее, но точнее
Безопасность: Все операции выполняются в рамках транзакций (для PostgreSQL)
Атомарность: При ошибке данные не переносятся
Очистка: Промежуточная таблица удаляется автоматически
См. также:
MoveMethod - Объект перечислений MoveMethod
create_temp - Создание промежуточной таблицы
drop_temp - Удаление промежуточной таблицы
dq_check - Проверка качества данных