Режим инкрементального обновления строк

Режим инкрементального обновления строк

Общая информация

Инкрементальная загрузка в режиме обновления строк – это процесс добавления новых или обновления существующих строк без необходимости полной перезагрузки всех данных. Помогает ускорить загрузку данных и сэкономить ресурсы на обновлении. Данный режим инкрементальной загрузки отличается тем, что не происходит полной перезагрузки инкрементального периода, как в обычном режиме, а из источника извлекаются только новые и обновленные строки.

Ограничения у режима инкрементального обновления строк, совпадают с ограничениями режима инкрементальной загрузки. Остальные различия в режимах описаны далее.

Данный режим хорошо подходит для источника данных в котором часто появляются новые строки и периодически изменяются данные, относящиеся к недавнему периоду, при этом данные в источнике никогда не удаляются.

Параметр LastUpdated

В обычном режиме инкрементальной загрузки в запросе присутствует два параметра:

Данный режим добавляет новый параметр, который должен быть включен в запрос к источнику @{LastUpdated}. Этот параметр используется для подстановки в запрос даты последнего обновления строк в платформе.

Пример простого запроса с указанием параметров:

select * from table as t where t.created_date_row >= @{RangeStart} and t.created_date_row < @{RangeEnd} and t.last_updated_row > @{LastUpdated}

В данном примере t.last_updated_row строго больше подставляемого параметра @{LastUpdated}, что позволит извлечь из источника только последние обновления строк.

Поле в источнике last_updated_row должно иметь тип DateTime.

Любые изменения в конфигурации загрузчика или полная перезагрузка таблицы приведут к сбросу параметра @{LastUpdated}. Поскольку настройка этого параметра через UI временно недоступна, загрузка будет возвращена в стандартный режим инкремента.

RowId для идентификации строк

Чтобы активировать режим обновления строк, необходимо задать уникальный идентификатор каждой строки. Этот идентификатор должен присутствовать в исходной базе данных, т.к. служит ключом для однозначного распознавания записей. Он может представлять собой одно или несколько полей различного типа, формирующих составной идентификатор, обеспечивающий уникальность каждой записи.

Требование к источнику

  1. Запрет на удаление записей

    • Записи в источнике не должны физически или логически удаляться (запрещены методы hard delete и soft delete). Разрешены только операции добавления и обновления.

  2. Стабильный идентификатор записи (RowID)

    • Каждая запись должна иметь уникальный и неизменяемый идентификатор. Идентификатор может быть как одиночным полем (например, первичный ключ), так и составным (комбинация нескольких полей). При этом значение идентификатора для существующей записи не должно изменяться в течение всего времени её существования.

  3. Метка времени обновления записи

    • Обязательное наличие поля, фиксирующего дату и время последнего изменения записи (условное именование: last_update). Его значение должно автоматически обновляться до текущих даты и времени при каждом изменении строки.

Ограничения

Для корректной работы механизма инкрементальной загрузки данных в режиме обновления строк необходимо выполнение следующих обязательных требований к пользовательскому запросу:

  • Метаданные обновления

    • Поле last_update: Запрос должен включать в блок SELECT поле, содержащее метку времени последнего обновления строки.

    • Тип данных: Указанное поле должно иметь тип DateTime для корректного определения последовательности изменений.

  • Идентификация записей

    • Поле row_id: Запрос обязательно должен включать в блок SELECT поле или набор полей, составляющих уникальный идентификатор строки (RowID). Это гарантирует точное сопоставление записей между источником и приемником.

  • Фильтрация по дате

    • Параметр @{LastUpdated}: В условии WHERE запроса необходимо использовать системный параметр @{LastUpdated}, который автоматически подставляет дату и время предыдущего успешного запуска. Это обеспечивает выборку только тех данных, которые были изменены после указанного момента.

Остальные ограничения и требования совпадают с режимом инкрементальной загрузки.

Условия для запуска режима

  • Для работы режима требуется указать:

    • Поле с датой обновления (last_update): Определяет, какие записи были изменены.

    • Поле идентификатора (row_id): Обеспечивает точное обновление нужных строк.

Логика работы режима инкрементального обновления строк

  • Первая загрузка происходит полностью, как и в режиме инкрементального обновления:

    • Разбиение данных на партиции:

      • На основе указанной в настройках гранулярности (год, месяц, день) данные разбиваются на партиции (функционал ClickHouse).

      • Выделяется также архивный период (который никогда не меняется и не обновляется).

      • Выделяются партиции для инкрементального периода.

  • Все последующие загрузки следуют логике инкрементального обновления строк:

    • Генерация запросов к источнику данных:

      • Отправляется несколько запросов на источник для извлечения только новых/обновленных строк для каждого.

    • Работа с ключом партиции:

      • Для каждой таблицы в системе добавляется служебная колонка partitionKey, которая используется для управления партициями в процессе загрузки.

      • Эта колонка обеспечивает корректную идентификацию и обработку данных.

    • Управление уже загруженными данными:

      • Данные, которые уже были загружены не удаляются.

      • Архивный период присоединяется в новую таблицу как есть (_ATTACH PARTITION в Clickhouse_)

      • Инкрементальный период также присоединяется к новой таблице, после чего к ним добавляются новые строки, а строки которые были обновлены заменяются по RowId.

      • Когда новая таблица полностью готова, она подменяет собой новую.

Если в источнике были удалены некоторые строки (hard delete), на платформе эти строки останутся даже после перезагрузки, так как больше нет никакой информации о LastUpdate и RowId удаленной строки. Если в источнике планируется удаление строк, то следует выбрать другой режим загрузки данных.

Включение режима инкрементального обновления строк через API

В данный момент управление инкрементальным обновлением строк пока не выведено в интерфейс. Включить этот режим можно только через API.

В ситуации, когда таблицы еще нет

  1. Настройте обычную инкрементальную загрузку, добавив условие LastUpdate > @{LastUpdated}, и нажмите Добавить:

    image-20250522-100733.png

     

  2. Дождитесь завершения первой полной загрузки и убедитесь, что все необходимые данные загружены. Например:

    image-20250514-124310.png

     

  3. Откройте инструменты разработчика (F12), обновите страницу, найдите токен авторизации в любом заголовке запроса и скопируйте его:

    image-20250514-124343 (1).png

     

  4. Перейдите в интерфейс API (Swagger) по адресу:
    <domain>/v3/formula-engine/swagger/index.html
    где <domain> – доменное имя или IP-адрес вашего сервера:

    authorize.png
  5. Нажмите кнопку Authorize в правой стороне окна.

  6. В открывшемся диалоговом окне в поле Value введите слово Bearer, затем добавьте пробел и вставьте скопированный ранее токен авторизации:

    bearer.png

     

  7. Перейдите в раздел Model и раскройте следующий эндпоинт:
    GET /api/v{version}/workspaces/{workspaceId}/datasets/{datasetId}/model

    endpoint.png

     

  8. Нажмите Try it out в правой стороне окна:

    try-out.png
  9. На вкладке Network инструментов разработчика (F12) перейдите в query, скопируйте значения параметров workspaces и datasets в окно эндпоинта и выполните запрос, нажав на кнопку Execute:

    query.png

     

  10. Скопируйте модель из ответа:

    response.png

     

  11. В текстовом редакторе найдите таблицу, которую вы собираетесь обновить. В нашем примере это source_table:

    value.png

     

  12. Скопируете таблицу для которой хотите включить режим инкрементального обновления строк. Например:

    copy.png
  13. Вставьте скопированную таблицу в текстовый редактор.

  14. Найдите поле source в объекте таблицы и в самом конце скопируйте объект IncrementalRefreshPolicy

  15. Вставьте объект IncrementalRefreshPolicy в конец объекта таблицы.

  16. Скопируйте объект и удалите экранирующие символы \\

    image-20250522-044812.png
    image-20250522-044832.png

     

  17. Добавьте недостающие поля:

    image-20250522-044916.png

    где

    • IncrementalRefreshMode – режим загрузки.

      • 0 – инкрементальная загрузка;

      • 1 – инкрементальное обновление строк.

    • LastUpdateColumnId - идентификатор столбца с датой последнего изменения (тип GUID).

    • RowIdentityColumnIds – список идентификаторов колонок (может быть одна) которые являются идентификатором строки (массив GUID).

    • IncrementalPeriodType – тип инкрементального периода (гранулярность):

      • Year

      • Month

      • Day

    • IncrementalPeriods – количество дней, лет, месяцев (число).

    • ArchivePeriodType - тип архивного периода (гранулярность).

      • Year

      • Month

      • Day

    • ArchivePeriods  – количество дней, лет, месяцев (число).

      image-20250522-045119.png

      Вы получите объект, который должен быть вставлен в конец объекта таблицы:

      "IncrementalRefreshPolicy": {                 "IncrementalRefreshMode": 1,                 "IncrementalPeriods": 2,                 "ArchivePeriodType": "Month",                 "ArchivePeriods": 3,                 "IncrementalPeriodType": "Month",                 "LastUpdateColumnId": "a640806e-2046-4b7f-95fd-d2ce1d513d72",                 "RowIdentityColumnIds": [                                "efc77201-af39-432c-8ae3-ae57c6408a83"                 ] }
  18. Вернитесь на страницу в Swagger и перейдите в раздел Tables:

    tables.png

     

  19. Раскройте эндпоинт
    PUT /api/v{version}/workspaces/{workspaceId}/datasets/{datasetId}/tables/{tableId}
    и нажмите Try it out:

    put-tryitout.png

     

  20. Вставьте в тело запроса модифицированную модель таблицы и укажите tableId из этой же таблицы:

    image-20250514-124938.png

     

  21. Выполните запрос, нажав Execute. Код 200 означает, что запрос прошел успешно:

    image-20250514-125006.png

     

  22. Вернитесь на платформу на страницу набора данных и обновите страницу, нажав кнопку F5 на клавиатуре.

  23. Обновите таблицу, нажав напротив ее названия и выбрав пункт меню Обновить таблицу:

    image-20250514-125025.png

     

В ситуации, когда таблица уже есть

  1. Нажмите напротив названия таблицы и выберите пункт меню Редактировать загрузчик:

    image-20250520-061453.png

     

  2. Выполните все шаги, описанные в инструкции “В ситуации, когда таблицы еще нет” (см. выше), начиная с пункта 1.


Смотрите также

Инкрементальная загрузка данных