from_polars¶
- NativeDumper.from_polars(data_frame, table_name)¶
- Параметры:
data_frame (polars.DataFrame) – DataFrame polars для загрузки в ClickHouse
table_name (str) – Имя таблицы в ClickHouse для записи данных
- Исключение:
NativeDumperValueError – Если не указано имя таблицы
ClickhouseServerError – Ошибка сервера ClickHouse при загрузке
NativeDumperWriteError – Ошибки записи данных
ImportError – Если polars не установлен
Загрузка данных из polars DataFrame в таблицу ClickHouse.
Описание:
Метод выполняет загрузку данных из polars DataFrame непосредственно в таблицу ClickHouse.
Внутренне использует метод from_rows(), преобразуя DataFrame в итерируемый объект
с использованием iter_rows() и сохранением информации о типах данных колонок.
Преимущества polars:
Высокая производительность - polars оптимизирован для работы с большими данными
Потоковая обработка - эффективная работа с данными, не помещающимися в память
Богатая система типов - строгая типизация и поддержка сложных типов данных
Ленивые вычисления - возможность оптимизации запросов перед выполнением
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
|
Обязательный. DataFrame polars, содержащий данные для загрузки. |
|
|
Обязательный. Имя таблицы в ClickHouse. Формат: |
Примеры использования:
# Пример 1: Базовая загрузка DataFrame
import polars as pl
from native_dumper import NativeDumper, CHConnector
# Создание подключения
connector = CHConnector(host="localhost", port=8123)
dumper = NativeDumper(connector)
# Создание тестового DataFrame
df = pl.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 32],
'salary': [50000.0, 60000.5, 75000.0, 55000.0, 65000.0],
'active': [True, True, False, True, False]
})
# Загрузка данных
dumper.from_polars(
data_frame=df,
table_name="analytics.employees"
)
# Пример 2: Загрузка LazyFrame с оптимизацией
import polars as pl
# Создание LazyFrame (ленивого вычисления)
lazy_df = (
pl.scan_csv('large_data.csv')
.filter(pl.col('value') > 100)
.group_by('category')
.agg([
pl.mean('value').alias('avg_value'),
pl.count().alias('count')
])
)
# Выполнение ленивого запроса и загрузка результата
result_df = lazy_df.collect()
dumper.from_polars(
data_frame=result_df,
table_name="analytics.aggregated_data"
)
# Пример 3: Загрузка данных с временными типами
import polars as pl
df_temporal = pl.DataFrame({
'event_id': range(1, 1001),
'event_time': pl.datetime_range(
start=pl.datetime(2024, 1, 1),
end=pl.datetime(2024, 1, 10),
interval='15m',
eager=True
)[:1000],
'metric_value': pl.Series(range(1000)).cast(pl.Float64) * 1.5,
'status': pl.Series(['ok'] * 800 + ['warning'] * 150 + ['error'] * 50)
})
dumper.from_polars(
data_frame=df_temporal,
table_name="monitoring.events"
)
# Пример 4: Загрузка с использованием сложных типов polars
import polars as pl
df_complex = pl.DataFrame({
'id': [1, 2, 3],
'tags': [['python', 'data'], ['rust', 'systems'], ['go', 'backend']],
'metadata': [
{'version': '1.0', 'enabled': True},
{'version': '2.0', 'enabled': False},
{'version': '1.5', 'enabled': True}
],
'scores': [
[85.5, 90.0, 78.5],
[92.0, 88.5, 95.0],
[76.0, 82.5, 79.0]
]
})
dumper.from_polars(
data_frame=df_complex,
table_name="analytics.complex_data"
)
# Пример 5: Загрузка больших данных с потоковой обработкой
import polars as pl
# Генерация большого DataFrame с потоковой обработкой
large_df = (
pl.LazyFrame({
'id': range(1_000_000),
'value': pl.arange(0, 1_000_000, eager=False) * 0.01,
'group': pl.Series(range(1_000_000)) % 100
})
.filter(pl.col('value') < 5000)
.group_by('group')
.agg([
pl.mean('value').alias('avg_value'),
pl.std('value').alias('std_value'),
pl.count().alias('count')
])
.collect(streaming=True) # Использование streaming API
)
dumper.from_polars(
data_frame=large_df,
table_name="analytics.streaming_results"
)
Логирование:
Метод генерирует информационные сообщения о загрузке, включая диаграмму передачи с указанием polars как источника данных:
INFO: polars [DataFrame] → clickhouse [23.8.1.2473]
INFO: Start write into localhost.analytics.employees.
INFO: Write into localhost.analytics.employees done.
Преобразование типов polars → ClickHouse:
Метод автоматически преобразует типы данных polars в соответствующие типы ClickHouse
Обработка специальных значений:
null значения - преобразуются в NULL ClickHouse
NaN/Inf - требуют специальной обработки в зависимости от типа
пустые списки/структуры - сохраняются как значения по умолчанию
Обработка ошибок:
# Пример 1: Ошибка валидации параметров
try:
dumper.from_polars(
data_frame=df,
table_name="" # Пустое имя таблицы
)
except NativeDumperValueError as e:
print(f"Ошибка валидации: {e}")
# Пример 2: Polars не установлен
try:
# Попытка использовать без polars
from native_dumper import NativeDumper
dumper = NativeDumper(connector)
# Метод вызовет ошибку при использовании
except ImportError as e:
print(f"Требуется установка polars: {e}")
# Пример 3: Ошибка несоответствия типов
try:
# DataFrame с неподдерживаемым типом
df_problematic = pl.DataFrame({
'binary_col': [b'data1', b'data2'] # Бинарные данные
})
dumper.from_polars(df_problematic, "test.table")
except Exception as e:
print(f"Ошибка преобразования типов: {e}")
Оптимизация производительности с polars:
# Пример 1: Использование правильных типов данных
import polars as pl
df_optimized = pl.DataFrame({
'id': pl.Series(range(1000000), dtype=pl.UInt32), # Экономия памяти
'name': pl.Series(['user'] * 1000000),
'score': pl.Series(range(1000000), dtype=pl.Float32), # Float32 вместо Float64
'category': pl.Series(['A', 'B', 'C'] * 333334, dtype=pl.Categorical), # Категории
'timestamp': pl.datetime_range(
start=pl.datetime(2024, 1, 1),
end=pl.datetime(2024, 1, 12),
interval='1s',
eager=True
)[:1000000]
})
dumper.from_polars(df_optimized, "optimized.table")
# Пример 2: Пакетная обработка очень больших данных
import polars as pl
def load_large_data_in_batches(dumper, lazy_frame, table_name, batch_size=100000):
"""Загрузка больших данных пакетами."""
total_rows = lazy_frame.select(pl.count()).collect().item()
batches = total_rows // batch_size + (1 if total_rows % batch_size > 0 else 0)
for batch_num in range(batches):
offset = batch_num * batch_size
batch_df = (
lazy_frame
.slice(offset, batch_size)
.collect()
)
if len(batch_df) > 0:
dumper.from_polars(
data_frame=batch_df,
table_name=table_name
)
print(f"Загружен пакет {batch_num + 1}/{batches} "
f"({len(batch_df)} строк)")
print(f"Всего загружено {total_rows} строк")
# Использование
large_lazy = pl.scan_parquet('huge_data.parquet')
load_large_data_in_batches(dumper, large_lazy, "huge.table", batch_size=50000)
Сравнение с pandas:
Производительность: * polars обычно быстрее pandas, особенно на больших данных * polars использует многопоточность по умолчанию * polars эффективнее работает с памятью
Потоковая обработка: * polars имеет встроенную поддержку streaming API * pandas требует ручной реализации чанкирования
API: * polars API более функциональный и экспрессивный * pandas API более императивный и знакомый
Использование iter_rows():
Метод использует data_frame.iter_rows() для эффективной итерации по строкам:
# Эквивалент того, что происходит внутри метода
for row in data_frame.iter_rows():
# Каждая строка как tuple значений
# Автоматическое преобразование типов
process_row(row)
Рекомендации по использованию:
Оптимизируйте типы данных - выбирайте минимально достаточные типы
Используйте streaming=True для очень больших данных
Обрабатывайте null значения - убедитесь в корректности преобразования
Проверяйте совместимость типов - сложные типы могут требовать дополнительной обработки
Ограничения:
Сложные вложенные типы - требуют дополнительного преобразования
Десериализация - некоторые типы polars не имеют прямых аналогов в ClickHouse
Производительность iter_rows() - для некоторых сценариев могут быть более эффективные подходы
Примечания:
Для лучшей производительности используйте
LazyFrameсовместно сcollect(streaming=True)для больших данныхТипы данных сохраняются в метаданных, что помогает в отладке
Рекомендуется использовать ленивые вычисления для предобработки данных
См. также:
from_rows - Базовый метод для загрузки итерируемых объектов
from_pandas - Загрузка данных из pandas DataFrame