Creatio

Creatio

Ноутбуки, используемые в примере:

Тип статьи

Инструкция

Компетенции

JupyterLab, Python, Petl, Pandas, Creatio API

Необходимые права

Доступ к JupyterLab

Версии компонентов

Jupyter core - 4.7.1, pandas - 1.2.3, Sqlalchemy - 1.3.23,

Статус

БЕТА

Сложность

ЛЕГКО

Полезные ссылки

Requests, Pandas, Creatio API OData 4, SQLAlchemy

Дополнительные сведения

ОС Ubuntu 18.04

 

В статье рассматривается пример обработки данных, полученных с помощью Creatio API OData 4.

Для выгрузки данных в PostgreSQL используется библиотека Pandas.

Цель - получение данных с помощью Creatio API OData 4 для загрузки их в платформу и анализа на дашбордах.

 

Стратегия.


  • Последовательно получить записи из сущностей Creatio в требуемом временном промежутке и загрузить эти данные в соответствующие таблицы промежуточной базы PostgreSQL. Финальным этапом загрузить данные из этой базы в ViQube через SQL-загрузчик.

В этой статье не рассматривается:

  • Данные загружаются разово. Для регулярной автоматический выгрузки необходимо настроить планировщик Chronicle.

  • Для дальнейшей загрузки в платформу можно воспользоваться стандартными средствами в платформе: “создание загрузчика” и “планы загрузки”. Всё описано в разделе документации (ссылка для версии 2.20):
    Загрузка данных и формирование структуры в аналитической базе данных ViQube

  • Получение наименований и полей сущностей Creatio.

  • Подробности работы подключенных Python библиотек.


Нам понадобятся такие библиотеки, как: Requests, Pandas, SQLAlchemy.

Если они у вас не установлены, то вы можете воспользоваться статьей по установке библиотек:

Установка Python библиотек

Для работы скрипта необходима созданная база PostgreSQL и пользователь в ней с необходимыми правами для создания сущностей.


#Подключаем библиотеки

import requests import pandas as pd import sys import sqlalchemy from datetime import datetime

#Фиксируем момент запуска скрипта

starttime = datetime.now()

#Авторизационные данные

postgreHost = '*****' postgreDB = '*****' postgreSchema = 'creatio' postgreUser = '*****' postgrePort = '5432' postgrePass = '*****' baseCreatioUser = "*****" baseCreatioPass = "*****" baseCreatioURL = '*****'

#Пути

loginCreatioJSON = {"UserName": baseCreatioUser, "UserPassword": baseCreatioPass } sqlalchemy_DB_URI = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}" loginCreatioURL = baseCreatioURL + "ServiceModel/AuthService.svc/Login" selectCreatioURL = baseCreatioURL +"0/odata/"

#GET фильтры для Creatio

creatioAccountSelect = '$select=CountryId,Id,Name,Web' creatioCurrencySelect = '$select=Id,ShortName' creatioDefaultSelect = '$select=Id,Name' creatioContactSelect = '$select=id,Name,JobTitle,MobilePhone,Phone,Email' creatioOrderProductSelect = '$select=Amount,Id,Name,OrderId' creatioOrderSelect = '$select=AccountId,Amount,Comment,ContactId,CreatedOn,\ CurrencyId,Date,Id,ModifiedOn,Number,OwnerId,StatusId,\ UsrEndUser,UsrEndUserLookupId,UsrNameOfProject,\ UsrOrderResellerId,UsrPO,UsrPaidOn,UsrPipelineperiodId,\ UsrProductLineId,UsrString1,UsrTypeOfProjectId'

#Признак первого запуска

firstTimeStart=True

#Крайняя дата начала выгрузки

lastDate = datetime(2000,1,10,0,0,0)

#Подключение к базе Postgre

engine = sqlalchemy.create_engine(sqlalchemy_DB_URI) try: connection=engine.connect() except: sys.exit('Database connecting error')

#Проверка наличия схемы, если отсутствует, создаём

if not engine.dialect.has_schema(engine, postgreSchema): engine.execute(sqlalchemy.schema.CreateSchema(postgreSchema))

#Проверка наличия служебной таблицы TransferControl

sqlstring = f"select 1 from information_schema.tables where table_schema =\ '{postgreSchema}' and table_name = 'TransferControl'" res = engine.execute(sqlstring).fetchall()

#Если таблицы нет, создаем

if not len(res): metadata = sqlalchemy.MetaData(engine,schema=postgreSchema) sqlalchemy.Table('TransferControl', metadata, sqlalchemy.Column('Status', sqlalchemy.types.String), sqlalchemy.Column('TransferPeriod', sqlalchemy.types.Time), sqlalchemy.Column('Id', sqlalchemy.types.Integer, primary_key=True, nullable=False), sqlalchemy.Column('TransferTimeStamp', sqlalchemy.types.DateTime)) metadata.create_all()

#Иначе берём последнюю дату удачной выгрузки

else: # df = pd.read_sql_query(f'select * from {postgreSchema}."TransferControl" \ order by "TransferTimeStamp" DESC limit 1',con=engine) if not df.empty: if df.at[0,'Status'] == 'true': firstTimeStart=False lastDate = df.at[0,'TransferTimeStamp']

#Подготовка строки фильтра по датам

datestring = lastDate.isoformat()+'Z' creatioDateFilter = f"$filter=ModifiedOn gt {datestring}"

#Удаляем VIEW из базы, если он есть

engine.execute('DROP VIEW if exists creatio.creatioOrders')

#Заголовок для запросов

headersGET = { "Content-Type": "application/json; charset=utf-8", "ForceUseSession":"true", "Accept":"application/json" }

#Получаем токен от Creatio

session = requests.post(loginCreatioURL, headers={"Content-Type": "application/json"}, json=loginCreatioJSON) headersGET['BPMCSRF'] = session.cookies.get_dict()['BPMCSRF']

#Далее следует основная функция, получающая необходимые данные

def getCollection(collectionName,collectionSelect): #Посылаем GET запрос на коллекцию с фильтром response = requests.get(selectCreatioURL+collectionName+'?'+ collectionSelect+'&'+creatioDateFilter, headers=headersGET,cookies=session.cookies) if not response.ok : storeStatus("FALSE") sys.exit(collectionName+' collection transfer error'+response.text) json_response = response.json() if not len(json_response['value']): return # Преобразуем ответ в DataFrame df = pd.DataFrame(data=json_response['value'], index=None) # преобразование типов к datetime в данном примере необходимо только у Order # это необходимо, чтобы исключить ошибочные даты до 1970 года , зануляя их if collectionName == "Order": df['CreatedOn'] = pd.to_datetime(df['CreatedOn'], errors='coerce') df['Date'] = pd.to_datetime(df['Date'], errors='coerce') df['ModifiedOn'] = pd.to_datetime(df['ModifiedOn'], errors='coerce') df['UsrPaidOn'] = pd.to_datetime(df['UsrPaidOn'], errors='coerce') # errors='coerce' некорректные значения заменяет на NaT df.replace({pd.NaT: None}) # Если первый запуск, то загружаем DataFrame в одноимённую таблицу и # назначаем PRIMARY KEY if firstTimeStart: df.to_sql(collectionName, engine, schema=postgreSchema, index=False, index_label='Id', if_exists="replace") with engine.connect() as con: con.execute('ALTER TABLE "'+postgreSchema+'"."'+collectionName+ '" ADD PRIMARY KEY ("Id");') # Иначе обновляем имеющиеся данные else: upsert_postgresql(collectionName,df)

#Функция записи статуса в служебную таблицу TransferControl, которая позволяет при повторных запусках выбрать только записи, модифицированные с момента последнего удачного запуска. Сохраняем время завершения скрипта, статус и время исполнения скрипта

def storeStatus(status): now = datetime.now() timeperiod = now - starttime sqlstring = f'insert into {postgreSchema}."TransferControl"("TransferTimeStamp",\ "Status","TransferPeriod") values(\''+ now.strftime("%Y-%m-%d %H:%M:%S")+ f'\',{status},\'{timeperiod}\')' engine.execute(sqlstring)

#Функция UPSERT, аналог MERGE. Добавляет новые или обновляет старые, если таковые уже имеются. Признак для этого - первичный ключ Id

def upsert_postgresql(collectionName,dataf): metadata = sqlalchemy.MetaData(bind=engine, schema=postgreSchema) tab = sqlalchemy.Table(collectionName,metadata,autoload=True) insert_statement = sqlalchemy.dialects.postgresql.insert(tab).values(dataf.to_dict(orient='records')) upsert_statement = insert_statement.on_conflict_do_update(index_elements=['Id'],set_={c.key: c for c in insert_statement.excluded if c.key != 'Id'}) engine.execute(upsert_statement)

#Далее следует основная рутина

Получение интересующих коллекций по фильтрам

getCollection('Account', creatioAccountSelect) getCollection('Contact', creatioContactSelect) getCollection('OrderProduct', creatioOrderProductSelect) getCollection('Order', creatioOrderSelect) getCollection('Country', creatioDefaultSelect) getCollection('Currency', creatioCurrencySelect) getCollection('OrderStatus', creatioDefaultSelect) getCollection('UsrTyoeOfProject', creatioDefaultSelect) getCollection('UsrProductLine', creatioDefaultSelect) getCollection('UsrPipelineperiod', creatioDefaultSelect)

#Запись успешного состояния

storeStatus("TRUE")

#Создаём VIEW на основе полученных данных для загрузки таблицы фактов через SQL-загрузчик

createViewString='create or replace view creatio.creatioOrders \ as select acc."Name" "AccountName",acc."Web" "AccountWeb",cac."Name" "AccountCountry",\ ord."Amount",ord."Comment",con."Name" "ContactName",con."JobTitle" "ContactJobTitle",con."MobilePhone" "ContactMobilePhone",con."Phone" "ContactPhone",con."Email" "ContactEmail",\ ord."CreatedOn", cur."ShortName" "Currency", ord."Date", ord."ModifiedOn",ord."Number", own."Name" "OwnerName",own."JobTitle" "OwnJobTitle",own."MobilePhone" "OwnerMobilePhone",own."Phone" "OwnerPhone",own."Email" "OwnerEmail",\ sts."Name" "Status", orp."Amount" "ProductAmount",orp."Name" "ProductName", ord."UsrEndUser",enu."Name" "inCRMName",enu."Web" "inCRMWeb",cen."Name" "inCRMCountry", ord."UsrNameOfProject",\ reu."Name" "ResellerName",reu."Web" "ResellerWeb",cre."Name" "ResellerCountry", ord."UsrPO",ord."UsrPaidOn",ppl."Name" "Pipeline",upl."Name" "ProductLine",ord."UsrString1" "Invoice",prt."Name" "ProjectTypeName"\ from "creatio"."Order" ord left join "creatio"."Account" acc on ord."AccountId"=acc."Id" left join "creatio"."Country" cac on acc."CountryId"=cac."Id" left join "creatio"."Contact" con on ord."ContactId"=con."Id" left join "creatio"."Currency" cur on ord."CurrencyId"=cur."Id" left join "creatio"."Contact" own on ord."OwnerId"=own."Id" left join "creatio"."OrderStatus" sts on ord."StatusId"=sts."Id"\ left join "creatio"."OrderProduct" orp on orp."OrderId"=ord."Id" left join "creatio"."Account" enu on ord."UsrEndUserLookupId"=enu."Id" left join "creatio"."Country" cen on enu."CountryId"=cen."Id"\ left join "creatio"."Account" reu on ord."UsrOrderResellerId"=reu."Id" left join "creatio"."Country" cre on reu."CountryId"=cre."Id" left join "creatio"."UsrPipelineperiod" ppl on ord."UsrPipelineperiodId"=ppl."Id"\ left join "creatio"."UsrProductLine" upl on ord."UsrProductLineId"=upl."Id" left join "creatio"."UsrTyoeOfProject" prt on ord."UsrTypeOfProjectId"=prt."Id"' engine.execute(createViewString)