Режим инкрементального обновления строк

Режим инкрементального обновления строк

Общая информация

Инкрементальная загрузка в режиме обновления строк – это процесс добавления новых или обновления существующих строк без необходимости полной перезагрузки всех данных. Помогает ускорить загрузку данных и сэкономить ресурсы на обновлении. Данный режим инкрементальной загрузки отличается тем, что не происходит полной перезагрузки инкрементального периода, как в обычном режиме, а из источника извлекаются только новые и обновленные строки.

Для настройки этого режима пользователь переключает тумблер Обнаруживать изменения в положение Да, после чего необходимо выполнить следующие шаги:

  1. Указать столбец с датой
    В источнике данных должен присутствовать столбец, содержащий дату последнего обновления каждой строки. Часто он называется, например, "Последнее обновление". Этот столбец используется для отслеживания изменений: система анализирует его значения, чтобы определить, какие строки были изменены или добавлены с момента предыдущей загрузки.

  2. Указать уникальные ключи для обновления данных
    Для корректного обновления данных необходимо указать ключ, который однозначно идентифицирует каждую строку. Если в источнике данных присутствует столбец с уникальными идентификаторами строк (например, ID), укажите его - система будет использовать этот столбец для точного сопоставления строк между источником и целевой таблицей. Это гарантирует обновление существующих записей вместо создания дубликатов. Если такой столбец отсутствует в источнике, вы можете выбрать несколько столбцов, комбинация значений которых будет уникальна для каждой записи. На их основе система сформирует составной ключ.

    После настройки ключа система автоматически сравнит данные. Все новые и изменённые записи из источника будут актуализированы в целевой таблице.

    rows.png

Ограничения у режима инкрементального обновления строк, совпадают с ограничениями режима инкрементальной загрузки. Остальные различия в режимах описаны далее.

Данный режим хорошо подходит для источника данных в котором часто появляются новые строки и периодически изменяются данные, относящиеся к недавнему периоду, при этом данные в источнике никогда не удаляются.

Более подробная информация о настройке и работе этого режима представлена в разделах ниже.

Требование к источнику данных

  1. Запрет на удаление записей

    • Записи в источнике не должны физически или логически удаляться (запрещены методы hard delete и soft delete). Разрешены только операции добавления и обновления.

  2. Стабильный идентификатор записи (RowID)

    • Каждая запись должна иметь уникальный и неизменяемый идентификатор. Идентификатор может быть как одиночным полем (например, первичный ключ), так и составным (комбинация нескольких полей). При этом значение идентификатора для существующей записи не должно изменяться в течение всего времени её существования. Этот идентификатор должен присутствовать в исходной базе данных, т.к. служит ключом для однозначного распознавания записей.

Условия для работы режима

  • Для работы режима требуется указать:

    • Поле с датой обновления (например, last_updated): Определяет, какие записи были изменены.

    • Поле идентификатора (row_id): Обеспечивает точное обновление нужных строк.

Логика работы режима инкрементального обновления строк

  • Первая загрузка происходит полностью, как и в режиме инкрементального обновления:

    • Разбиение данных на партиции:

      • На основе указанной в настройках гранулярности (год, месяц, день) данные разбиваются на партиции (функционал ClickHouse).

      • Выделяется также архивный период (который никогда не меняется и не обновляется).

      • Выделяются партиции для инкрементального периода.

  • Все последующие загрузки следуют логике инкрементального обновления строк:

    • Генерация запросов к источнику данных:

      • Отправляется несколько запросов на источник для извлечения только новых/обновленных строк для каждого.

    • Работа с ключом партиции:

      • Для каждой таблицы в системе добавляется служебная колонка partitionKey, которая используется для управления партициями в процессе загрузки.

      • Эта колонка обеспечивает корректную идентификацию и обработку данных.

    • Управление уже загруженными данными:

      • Данные, которые уже были загружены не удаляются.

      • Архивный период присоединяется в новую таблицу как есть (_ATTACH PARTITION в Clickhouse_)

      • Инкрементальный период также присоединяется к новой таблице, после чего к ним добавляются новые строки, а строки которые были обновлены заменяются по RowId.

      • Когда новая таблица полностью готова, она подменяет собой новую.

Если в источнике были удалены некоторые строки (hard delete), на платформе эти строки останутся даже после перезагрузки, так как больше нет никакой информации о LastUpdate и RowId удаленной строки. Если в источнике планируется удаление строк, то следует выбрать другой режим загрузки данных.

Параметр LastUpdate

В обычном режиме инкрементальной загрузки (с помощью SQL-запроса) в запросе присутствует два параметра:

  • @{RangeStart}

  • @{RangeEnd}

Параметр LastUpdate настраивается в окне настроек инкрементального обновления:

row.png

Ограничения

Для корректной работы механизма инкрементальной загрузки данных в режиме обновления строк необходимо выполнение следующих обязательных требований к пользовательскому запросу:

  • Метаданные обновления

    • Поле last_updated: Запрос должен включать в блок SELECT поле, содержащее метку времени последнего обновления строки.

    • Тип данных: Указанное поле должно иметь тип DateTime для корректного определения последовательности изменений.

  • Идентификация записей

    • Поле row_id: Запрос обязательно должен включать в блок SELECT поле или набор полей, составляющих уникальный идентификатор строки (RowID). Это гарантирует точное сопоставление записей между источником и приемником.

  • Фильтрация по дате

    • Параметр @{LastUpdated}: В условии WHERE запроса необходимо использовать системный параметр @{LastUpdated}, который автоматически подставляет дату и время предыдущего успешного запуска. Это обеспечивает выборку только тех данных, которые были изменены после указанного момента.

  • Поддержка баз данных

    • На данный момент база данных Oracle не поддерживается в режиме инкрементального обновления строк из-за особенностей конвертации дат из источника Oracle через JDBC Bridge.

    • На данный момент источник данных FirebirdSQL не поддерживается в режиме инкрементального обновления строк из-за особенностей преобразовании значений времени.

Остальные ограничения и требования совпадают с режимом инкрементальной загрузки.

Включение режима инкрементального обновления строк через API

В ситуации, когда таблицы еще нет

  1. Настройте обычную инкрементальную загрузку (с помощью SQL-запроса), добавив условие LastUpdate > @{LastUpdated}, и нажмите Добавить:

    api.png
  2. Дождитесь завершения первой полной загрузки и убедитесь, что все необходимые данные загружены. Например:

    image-20250514-124310.png

     

  3. Откройте инструменты разработчика (F12), обновите страницу, найдите токен авторизации в любом заголовке запроса и скопируйте его:

    image-20250514-124343 (1).png

     

  4. Перейдите в интерфейс API (Swagger) по адресу:
    <domain>/v3/formula-engine/swagger/index.html
    где <domain> – доменное имя или IP-адрес вашего сервера:

    authorize.png
  5. Нажмите кнопку Authorize в правой стороне окна.

  6. В открывшемся диалоговом окне в поле Value введите слово Bearer, затем добавьте пробел и вставьте скопированный ранее токен авторизации:

    bearer.png

     

  7. Перейдите в раздел Model и раскройте следующий эндпоинт:
    GET /api/v{version}/workspaces/{workspaceId}/datasets/{datasetId}/model

    endpoint.png

     

  8. Нажмите Try it out в правой стороне окна:

    try-out.png
  9. На вкладке Network инструментов разработчика (F12) перейдите в query, скопируйте значения параметров workspaces и datasets в окно эндпоинта и выполните запрос, нажав на кнопку Execute:

    query.png

     

  10. Скопируйте модель из ответа:

    response.png

     

  11. В текстовом редакторе найдите таблицу, которую вы собираетесь обновить. В нашем примере это source_table:

    value.png

     

  12. Скопируете таблицу для которой хотите включить режим инкрементального обновления строк. Например:

    copy.png
  13. Вставьте скопированную таблицу в текстовый редактор.

  14. Найдите поле source в объекте таблицы и в самом конце скопируйте объект IncrementalRefreshPolicy

  15. Вставьте объект IncrementalRefreshPolicy в конец объекта таблицы.

  16. Скопируйте объект и удалите экранирующие символы \\

    image-20250522-044812.png
    image-20250522-044832.png

     

  17. Добавьте недостающие поля:

    image-20250522-044916.png

    где

    • IncrementalRefreshMode – режим загрузки.

      • 0 – инкрементальная загрузка;

      • 1 – инкрементальное обновление строк.

    • LastUpdateColumnId - идентификатор столбца с датой последнего изменения (тип GUID).

    • RowIdentityColumnIds – список идентификаторов колонок (может быть одна) которые являются идентификатором строки (массив GUID).

    • IncrementalPeriodType – тип инкрементального периода (гранулярность):

      • Year

      • Month

      • Day

    • IncrementalPeriods – количество дней, лет, месяцев (число).

    • ArchivePeriodType - тип архивного периода (гранулярность).

      • Year

      • Month

      • Day

    • ArchivePeriods  – количество дней, лет, месяцев (число).

      image-20250522-045119.png

      Вы получите объект, который должен быть вставлен в конец объекта таблицы:

      "IncrementalRefreshPolicy": {                 "IncrementalRefreshMode": 1,                 "IncrementalPeriods": 2,                 "ArchivePeriodType": "Month",                 "ArchivePeriods": 3,                 "IncrementalPeriodType": "Month",                 "LastUpdateColumnId": "a640806e-2046-4b7f-95fd-d2ce1d513d72",                 "RowIdentityColumnIds": [                                "efc77201-af39-432c-8ae3-ae57c6408a83"                 ] }
  18. Вернитесь на страницу в Swagger и перейдите в раздел Tables:

    tables.png

     

  19. Раскройте эндпоинт
    PUT /api/v{version}/workspaces/{workspaceId}/datasets/{datasetId}/tables/{tableId}
    и нажмите Try it out:

    put-tryitout.png

     

  20. Вставьте в тело запроса модифицированную модель таблицы и укажите tableId из этой же таблицы:

    image-20250514-124938.png

     

  21. Выполните запрос, нажав Execute. Код 200 означает, что запрос прошел успешно:

    image-20250514-125006.png

     

  22. Вернитесь на платформу на страницу набора данных и обновите страницу, нажав кнопку F5 на клавиатуре.

  23. Обновите таблицу, нажав напротив ее названия и выбрав пункт меню Обновить таблицу:

    image-20250514-125025.png

     

В ситуации, когда таблица уже есть

  1. Нажмите напротив названия таблицы и выберите пункт меню Редактировать загрузчик:

    image-20250520-061453.png

     

  2. Выполните все шаги, описанные в инструкции “В ситуации, когда таблицы еще нет” (см. выше), начиная с пункта 1.


Смотрите также

Инкрементальная загрузка данных