to_table

DBHose.to_table()

Переносит данные из промежуточной таблицы в целевую.

Описание:

Метод выполняет финальный этап переноса данных - перемещение данных из промежуточной таблицы (table_temp) в целевую (table_dest). В зависимости от выбранного move_method применяется различная логика переноса данных.

Сигнатура:

def to_table(self) -> None:
    """Move data to destination table."""

Типы методов переноса:

Типы методов переноса данных

Метод

SQL

Фильтр

Логика выполнения

append

❌ Нет

❌ Нет

Прямой перенос write_between

delete

✅ Да

✅ Да

Удаление по фильтру + вставка

replace

✅ Да

❌ Нет

Полная замена таблицы

rewrite

❌ Нет

❌ Нет

TRUNCATE + прямой перенос

custom

❌ Нет

❌ Нет

Пользовательские SQL запросы

Валидация параметров:

Метод выполняет проверки перед выполнением:

# 1. Проверка фильтрации для методов delete
if self.move_method.need_filter and not self.filter_by:
    raise ValueError("You must specify columns in filter_by")

# 2. Проверка custom запроса для custom метода
if self.move_method.is_custom and not self.custom_move:
    raise ValueError("You must specify custom query")

# 3. Проверка количества колонок для ClickHouse delete
if (self.move_method is MoveMethod.delete and
    self.dumper_dest.__class__ is NativeDumper and
    len(self.filter_by.split(", ")) > 4):
    raise ValueError("Too many columns in filter_by (> 4)")

Примеры использования:

Базовый пример - метод replace
from dbhose_airflow import DBHose, MoveMethod

dbhose = DBHose(
    table_dest="public.users",
    connection_dest="postgres_target",
    connection_src="postgres_source",
    move_method=MoveMethod.replace,  # Полная замена
)
dbhose.create_temp()
# ... загрузка данных ...
dbhose.to_table()
# Логи:
# INFO:root:Move data with method replace
# INFO:root:Data moved into public.users
Пример с фильтрацией - метод delete
dbhose = DBHose(
    table_dest="analytics.daily_sales",
    connection_dest="clickhouse_analytics",
    connection_src="postgres_sales",
    move_method=MoveMethod.delete,  # Удаление с фильтрацией
    filter_by=["sale_date", "region"],  # Обязательно для delete
)
dbhose.create_temp()
# ... загрузка данных ...
dbhose.to_table()  # Удалит старые данные по фильтру и вставит новые
Пример с пользовательским запросом
dbhose = DBHose(
    table_dest="data_warehouse.fact_orders",
    connection_dest="dw_postgres",
    connection_src="oltp_postgres",
    move_method=MoveMethod.custom,
    custom_move="""
    -- Удалить старые данные
    DELETE FROM {table_dest}
    WHERE order_date >= CURRENT_DATE - INTERVAL '7 days';

    -- Вставить новые данные
    INSERT INTO {table_dest}
    SELECT *, NOW() as loaded_at
    FROM {table_temp};
    """,
)
dbhose.create_temp()
# ... загрузка данных ...
dbhose.to_table()  # Выполнит пользовательские запросы
Пример с методом rewrite (пересоздание)
dbhose = DBHose(
    table_dest="cache.report_data",
    connection_dest="postgres_cache",
    connection_src="postgres_source",
    move_method=MoveMethod.rewrite,  # Полная перезапись
)
dbhose.create_temp()
# ... загрузка данных ...
dbhose.to_table()  # TRUNCATE таблицы и прямой перенос

Особенности для разных СУБД:

Логирование:

Метод детально логирует процесс:

# Начало операции
INFO:root:╔══════════════════════════════════════════════════════════╗
INFO:root:║            Move data with method replace                 ║
INFO:root:╚══════════════════════════════════════════════════════════╝

# Для метода rewrite
INFO:root:Clear table operation start
INFO:root:Clear table operation done

# Успешное завершение
INFO:root:╔══════════════════════════════════════════════════════════╗
INFO:root:║            Data moved into public.users                  ║
INFO:root:╚══════════════════════════════════════════════════════════╝

Обработка ошибок:

Метод выбрасывает исключения при обнаружении проблем:

Типы ошибок

Исключение

Условие возникновения

ValueError

move_method.need_filter=True, но filter_by пустой

ValueError

move_method.is_custom=True, но custom_move не указан

ValueError

ClickHouse + delete + >4 колонок в filter_by

ValueError

Метод недоступен для таблицы (проверка is_avaliable)

Исключения СУБД

Ошибки выполнения SQL запросов

SQL шаблоны методов:

Методы с have_sql=True используют шаблоны из файлов:

# Формат: {dbname}/{method_name}.sql
mv_path = "path/to/sql/templates/{}/{}.sql"
# Пример для PostgreSQL delete:
move_query = read_text("postgres/delete.sql")

Примеры SQL шаблонов:

Метод `write_between`:

Для методов без SQL (append, rewrite после TRUNCATE) используется:

self.dumper_dest.write_between(self.table_dest, self.table_temp)

Эта операция:

  1. Читает данные из промежуточной таблицы

  2. Конвертирует в нативный формат СУБД

  3. Записывает в целевую таблицу

  4. Оптимизирована для больших объемов данных

Завершающие операции:

После успешного переноса данных:

# Удаление промежуточной таблицы
self.drop_temp()

Рекомендации по выбору метода:

def select_move_method(table_type: str, data_freshness: str) -> MoveMethod:
    """Автоматический выбор метода переноса."""

    if table_type == 'fact_table' and data_freshness == 'incremental':
        return MoveMethod.delete  # Инкрементальное обновление

    elif table_type == 'dimension_table':
        return MoveMethod.replace  # Полная замена

    elif table_type == 'cache_table':
        return MoveMethod.rewrite  # Пересоздание

    elif table_type == 'staging_table':
        return MoveMethod.append  # Простое добавление

    else:
        return MoveMethod.replace  # По умолчанию

Связанные методы:

    • create_temp()

    • Создание промежуточной таблицы

    • drop_temp()

    • Удаление промежуточной таблицы

    • dq_check()

    • Проверка качества данных

Примечания:

  1. Производительность: Методы с SQL обычно медленнее, но точнее

  2. Безопасность: Все операции выполняются в рамках транзакций (для PostgreSQL)

  3. Атомарность: При ошибке данные не переносятся

  4. Очистка: Промежуточная таблица удаляется автоматически

См. также:

  • MoveMethod - Объект перечислений MoveMethod

  • create_temp - Создание промежуточной таблицы

  • drop_temp - Удаление промежуточной таблицы

  • dq_check - Проверка качества данных