Режим инкрементального обновления строк
Общая информация
Инкрементальная загрузка в режиме обновления строк – это процесс добавления новых или обновления существующих строк без необходимости полной перезагрузки всех данных. Помогает ускорить загрузку данных и сэкономить ресурсы на обновлении. Данный режим инкрементальной загрузки отличается тем, что не происходит полной перезагрузки инкрементального периода, как в обычном режиме, а из источника извлекаются только новые и обновленные строки.
Ограничения у режима инкрементального обновления строк, совпадают с ограничениями режима инкрементальной загрузки. Остальные различия в режимах описаны далее.
Данный режим хорошо подходит для источника данных в котором часто появляются новые строки и периодически изменяются данные, относящиеся к недавнему периоду, при этом данные в источнике никогда не удаляются.
Параметр LastUpdated
В обычном режиме инкрементальной загрузки в запросе присутствует два параметра:
@{RangeStart}@{RangeEnd}
(смотрите информацию об этих параметрах в описании инкрементальной загрузки).
Данный режим добавляет новый параметр, который должен быть включен в запрос к источнику @{LastUpdated}. Этот параметр используется для подстановки в запрос даты последнего обновления строк в платформе.
Пример простого запроса с указанием параметров:
select * from table as t where t.created_date_row >= @{RangeStart} and t.created_date_row < @{RangeEnd} and t.last_updated_row > @{LastUpdated}
В данном примере t.last_updated_row строго больше подставляемого параметра @{LastUpdated}, что позволит извлечь из источника только последние обновления строк.
Поле в источнике last_updated_row должно иметь тип DateTime.
Любые изменения в конфигурации загрузчика или полная перезагрузка таблицы приведут к сбросу параметра @{LastUpdated}. Поскольку настройка этого параметра через UI временно недоступна, загрузка будет возвращена в стандартный режим инкремента.
RowId для идентификации строк
Чтобы активировать режим обновления строк, необходимо задать уникальный идентификатор каждой строки. Этот идентификатор должен присутствовать в исходной базе данных, т.к. служит ключом для однозначного распознавания записей. Он может представлять собой одно или несколько полей различного типа, формирующих составной идентификатор, обеспечивающий уникальность каждой записи.
Требование к источнику
Запрет на удаление записей
Записи в источнике не должны физически или логически удаляться (запрещены методы
hard deleteиsoft delete). Разрешены только операции добавления и обновления.
Стабильный идентификатор записи (RowID)
Каждая запись должна иметь уникальный и неизменяемый идентификатор. Идентификатор может быть как одиночным полем (например, первичный ключ), так и составным (комбинация нескольких полей). При этом значение идентификатора для существующей записи не должно изменяться в течение всего времени её существования.
Метка времени обновления записи
Обязательное наличие поля, фиксирующего дату и время последнего изменения записи (условное именование:
last_update). Его значение должно автоматически обновляться до текущих даты и времени при каждом изменении строки.
Ограничения
Для корректной работы механизма инкрементальной загрузки данных в режиме обновления строк необходимо выполнение следующих обязательных требований к пользовательскому запросу:
Метаданные обновления
Поле
last_update: Запрос должен включать в блокSELECTполе, содержащее метку времени последнего обновления строки.Тип данных: Указанное поле должно иметь тип
DateTimeдля корректного определения последовательности изменений.
Идентификация записей
Поле
row_id: Запрос обязательно должен включать в блокSELECTполе или набор полей, составляющих уникальный идентификатор строки (RowID). Это гарантирует точное сопоставление записей между источником и приемником.
Фильтрация по дате
Параметр
@{LastUpdated}: В условииWHEREзапроса необходимо использовать системный параметр@{LastUpdated}, который автоматически подставляет дату и время предыдущего успешного запуска. Это обеспечивает выборку только тех данных, которые были изменены после указанного момента.
Остальные ограничения и требования совпадают с режимом инкрементальной загрузки.
Условия для запуска режима
Для работы режима требуется указать:
Поле с датой обновления (
last_update): Определяет, какие записи были изменены.Поле идентификатора (
row_id): Обеспечивает точное обновление нужных строк.
Логика работы режима инкрементального обновления строк
Первая загрузка происходит полностью, как и в режиме инкрементального обновления:
Разбиение данных на партиции:
На основе указанной в настройках гранулярности (год, месяц, день) данные разбиваются на партиции (функционал ClickHouse).
Выделяется также архивный период (который никогда не меняется и не обновляется).
Выделяются партиции для инкрементального периода.
Все последующие загрузки следуют логике инкрементального обновления строк:
Генерация запросов к источнику данных:
Отправляется несколько запросов на источник для извлечения только новых/обновленных строк для каждого.
Работа с ключом партиции:
Для каждой таблицы в системе добавляется служебная колонка
partitionKey, которая используется для управления партициями в процессе загрузки.Эта колонка обеспечивает корректную идентификацию и обработку данных.
Управление уже загруженными данными:
Данные, которые уже были загружены не удаляются.
Архивный период присоединяется в новую таблицу как есть (_ATTACH PARTITION в Clickhouse_)
Инкрементальный период также присоединяется к новой таблице, после чего к ним добавляются новые строки, а строки которые были обновлены заменяются по
RowId.Когда новая таблица полностью готова, она подменяет собой новую.
Если в источнике были удалены некоторые строки (hard delete), на платформе эти строки останутся даже после перезагрузки, так как больше нет никакой информации о LastUpdate и RowId удаленной строки. Если в источнике планируется удаление строк, то следует выбрать другой режим загрузки данных.
Включение режима инкрементального обновления строк через API
В данный момент управление инкрементальным обновлением строк пока не выведено в интерфейс. Включить этот режим можно только через API.
В ситуации, когда таблицы еще нет
Настройте обычную инкрементальную загрузку, добавив условие
LastUpdate > @{LastUpdated}, и нажмите Добавить:Дождитесь завершения первой полной загрузки и убедитесь, что все необходимые данные загружены. Например:
Откройте инструменты разработчика (
F12), обновите страницу, найдите токен авторизации в любом заголовке запроса и скопируйте его:Перейдите в интерфейс API (Swagger) по адресу:
<domain>/v3/formula-engine/swagger/index.html
где<domain>– доменное имя или IP-адрес вашего сервера:Нажмите кнопку Authorize в правой стороне окна.
В открывшемся диалоговом окне в поле Value введите слово
Bearer, затем добавьте пробел и вставьте скопированный ранее токен авторизации:Перейдите в раздел Model и раскройте следующий эндпоинт:
GET /api/v{version}/workspaces/{workspaceId}/datasets/{datasetId}/modelНажмите Try it out в правой стороне окна:
На вкладке Network инструментов разработчика (F12) перейдите в query, скопируйте значения параметров
workspacesиdatasetsв окно эндпоинта и выполните запрос, нажав на кнопку Execute:Скопируйте модель из ответа:
В текстовом редакторе найдите таблицу, которую вы собираетесь обновить. В нашем примере это source_table:
Скопируете таблицу для которой хотите включить режим инкрементального обновления строк. Например:
Вставьте скопированную таблицу в текстовый редактор.
Найдите поле
sourceв объекте таблицы и в самом конце скопируйте объектIncrementalRefreshPolicyВставьте объект
IncrementalRefreshPolicyв конец объекта таблицы.Скопируйте объект и удалите экранирующие символы
\\Добавьте недостающие поля:
где
IncrementalRefreshMode– режим загрузки.0– инкрементальная загрузка;1– инкрементальное обновление строк.
LastUpdateColumnId- идентификатор столбца с датой последнего изменения (тип GUID).RowIdentityColumnIds– список идентификаторов колонок (может быть одна) которые являются идентификатором строки (массив GUID).IncrementalPeriodType– тип инкрементального периода (гранулярность):Year
Month
Day
IncrementalPeriods– количество дней, лет, месяцев (число).ArchivePeriodType- тип архивного периода (гранулярность).Year
Month
Day
ArchivePeriods– количество дней, лет, месяцев (число).Вы получите объект, который должен быть вставлен в конец объекта таблицы:
"IncrementalRefreshPolicy": { "IncrementalRefreshMode": 1, "IncrementalPeriods": 2, "ArchivePeriodType": "Month", "ArchivePeriods": 3, "IncrementalPeriodType": "Month", "LastUpdateColumnId": "a640806e-2046-4b7f-95fd-d2ce1d513d72", "RowIdentityColumnIds": [ "efc77201-af39-432c-8ae3-ae57c6408a83" ] }
Вернитесь на страницу в Swagger и перейдите в раздел Tables:
Раскройте эндпоинт
PUT /api/v{version}/workspaces/{workspaceId}/datasets/{datasetId}/tables/{tableId}
и нажмите Try it out:Вставьте в тело запроса модифицированную модель таблицы и укажите
tableIdиз этой же таблицы:Выполните запрос, нажав Execute. Код
200означает, что запрос прошел успешно:Вернитесь на платформу на страницу набора данных и обновите страницу, нажав кнопку F5 на клавиатуре.
Обновите таблицу, нажав напротив ее названия и выбрав пункт меню Обновить таблицу:
В ситуации, когда таблица уже есть
Нажмите напротив названия таблицы и выберите пункт меню Редактировать загрузчик:
Выполните все шаги, описанные в инструкции “В ситуации, когда таблицы еще нет” (см. выше), начиная с пункта 1.
Смотрите также
На этой странице
Нужна дополнительная помощь?