from_rows¶
- NativeDumper.from_rows(
- dtype_data,
- table_name,
- source=None,
- )
- Параметры:
dtype_data (Iterable[Any]) – Итерируемый объект с данными для записи
table_name (str) – Имя таблицы в ClickHouse для записи данных
source (DBMetadata | None) – Метаданные источника данных
- Исключение:
NativeDumperValueError – Если не указано имя таблицы
ClickhouseServerError – Ошибка сервера ClickHouse при загрузке
NativeDumperWriteError – Другие ошибки записи данных
TypeError – Если данные имеют неправильный формат
Запись данных из Python итерируемого объекта в таблицу ClickHouse.
Описание:
Метод выполняет загрузку данных из Python итерируемых объектов (списков, генераторов, итераторов) непосредственно в таблицу ClickHouse с автоматическим преобразованием типов и валидацией.
Метод особенно полезен для: * Загрузки данных из Python структур в ClickHouse * Интеграции с другими Python библиотеками * Постепенной загрузки данных из генераторов * Миграции данных из памяти Python в ClickHouse
Основные этапы работы:
Валидация параметров - проверка имени таблицы
Получение метаданных таблицы - структура колонок через
cursor.metadata()Создание NativeWriter - для преобразования Python данных в Native формат
Преобразование данных -
writer.from_rows()+ сжатиеЛогирование диаграммы передачи -
transfer_diagram()Загрузка в ClickHouse -
cursor.upload_data()Обновление сессии -
refresh()
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
|
Обязательный. Итерируемый объект с данными для загрузки. Поддерживаются списки, кортежи, генераторы. |
|
|
Обязательный. Имя таблицы в ClickHouse. Формат: |
|
|
Метаданные источника данных. Если |
Форматы входных данных:
Список словарей (рекомендуется):
[
{"id": 1, "name": "Alice", "age": 25},
{"id": 2, "name": "Bob", "age": 30},
]
Список кортежей:
[
(1, "Alice", 25),
(2, "Bob", 30),
]
Генератор (для больших объемов):
def data_generator():
for i in range(1000000):
yield {"id": i, "value": i * 2}
Примеры использования:
# Пример 1: Загрузка списка словарей
from native_dumper import NativeDumper, CHConnector
connector = CHConnector(host="localhost", port=8123)
dumper = NativeDumper(connector)
data = [
{"user_id": 1, "username": "alice", "score": 95.5},
{"user_id": 2, "username": "bob", "score": 88.0},
{"user_id": 3, "username": "charlie", "score": 92.3}
]
dumper.from_rows(
dtype_data=data,
table_name="analytics.user_scores"
)
# Пример 2: Загрузка с кастомными метаданными источника
from native_dumper import DBMetadata
source_meta = DBMetadata(
name="postgresql",
version="15.0",
columns={"id": "integer", "name": "text", "created_at": "timestamp"}
)
dumper.from_rows(
dtype_data=data,
table_name="migrated.users",
source=source_meta
)
# Пример 3: Загрузка из генератора (большие данные)
def generate_large_dataset(count):
for i in range(count):
yield {
"timestamp": f"2024-01-{i%30+1:02d} 10:00:00",
"sensor_id": i % 100,
"value": i * 0.5,
"status": "ok" if i % 1000 != 0 else "error"
}
# Загрузка 1 миллиона записей
dumper.from_rows(
dtype_data=generate_large_dataset(1_000_000),
table_name="iot.sensor_readings"
)
# Пример 4: Загрузка из CSV файла
import csv
with open("data.csv", "r") as f:
reader = csv.DictReader(f)
dumper.from_rows(
dtype_data=reader,
table_name="imported.csv_data"
)
# Пример 5: Загрузка списка кортежей
# Важно: порядок кортежей должен соответствовать порядку колонок в таблице
tuple_data = [
(1, "Product A", 19.99, 100),
(2, "Product B", 29.99, 50),
(3, "Product C", 9.99, 200)
]
dumper.from_rows(
dtype_data=tuple_data,
table_name="warehouse.products"
)
Логирование диаграммы передачи:
Метод генерирует информационную диаграмму передачи данных:
INFO: python [iterable object] → clickhouse [23.8.1.2473]
INFO: Start write into localhost.analytics.user_scores.
INFO: Write into localhost.analytics.user_scores done.
Преобразование типов данных:
Метод автоматически преобразует Python типы в типы ClickHouse:
Python тип |
ClickHouse тип |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Обработка ошибок:
# Пример 1: Ошибка валидации параметров
try:
dumper.from_rows(
dtype_data=[{"id": 1}],
table_name="" # Пустое имя таблицы
)
except NativeDumperValueError as e:
print(f"Ошибка валидации: {e}")
# NativeDumperValueError: Table name not defined.
# Пример 2: Ошибка типа данных
try:
# Неправильный формат данных
dumper.from_rows(
dtype_data="not an iterable", # Ошибка
table_name="test.table"
)
except TypeError as e:
print(f"Ошибка типа данных: {e}")
# Пример 3: Ошибка несоответствия структуры
try:
# Данные не соответствуют структуре таблицы
data = [{"wrong_column": 1}] # Колонка не существует
dumper.from_rows(data, "existing.table")
except ClickhouseServerError as e:
print(f"Ошибка ClickHouse: {e.message}")
Производительность и оптимизация:
Используйте генераторы для больших объемов данных
Минимизируйте преобразования - передавайте данные в правильном формате
Пакетная обработка - для лучшей производительности используйте пакеты по 1000-10000 записей
Память - метод использует потоковую обработку, не загружая все данные в память
Внутренняя архитектура:
Python данные → NativeWriter.from_rows() → Native формат →
define_writer() → Сжатие → cursor.upload_data() → ClickHouse
Примечания:
Таблица должна существовать в ClickHouse перед загрузкой
Порядок колонок в кортежах должен соответствовать порядку в таблице
Для словарей порядок ключей не важен
Метод автоматически обновляет сессию после загрузки (
refresh())
Расширенные сценарии:
# Пример 1: Загрузка с кастомной обработкой
class DataProcessor:
def __init__(self, source_data):
self.source_data = source_data
def __iter__(self):
for item in self.source_data:
# Кастомная трансформация
yield {
"id": item["id"],
"processed_value": item["value"] * 2,
"timestamp": datetime.now().isoformat()
}
processor = DataProcessor(raw_data)
dumper.from_rows(processor, "processed.data")
# Пример 2: Параллельная загрузка
from concurrent.futures import ThreadPoolExecutor
def load_chunk(chunk_data, table_suffix):
dumper_local = NativeDumper(connector) # Новый экземпляр
dumper_local.from_rows(chunk_data, f"data.chunk_{table_suffix}")
dumper_local.close()
# Разделение данных на чанки
chunks = [data[i:i+10000] for i in range(0, len(data), 10000)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for i, chunk in enumerate(chunks):
future = executor.submit(load_chunk, chunk, i)
futures.append(future)
# Ожидание завершения
for future in futures:
future.result()
См. также:
from_pandas - Загрузка данных из pandas DataFrame
from_polars - Загрузка данных из polars DataFrame
nativewriter - Класс для преобразования Python данных в Native формат
dbmetadata - Метаданные источников данных