Executing Data Integration Tasks using Oracle PLSQL
One of the coolest stories for OCI SDKs was the availability as PLSQL APIs from within the OCI Autonomous Database family — both ADW and ATP. This lets you do all kinds of interesting stuff with OCI Data Integration, Data Catalog and all other services! An example dear to me is to execute OCI Data Integration tasks from the Oracle database — these tasks can be any of the support ones; data pipelines orchestrating all kinds of tasks, data integration tasks, data loader tasks and so on.
OCI Data Integration lets you build simple designs for doing complex activities using data pipelines, custom dataflows, data loading, integration tasks and so on.
Executing a data pipeline task that defines concurrent tasks, dependencies, REST tasks such as forecasting or anomaly detection is simple from PLSQL, like this sample pipeline;
Just use the CREATE_TASK_RUN function, passing the workspace, application and task along with your credential for authenticating;
Let’s look in more detail with an example dataflow that we will execute via PLSQL.
As well as using the OCI Console to execute tasks, you can use any of the SDKs including the PLSQL, let’s check out executing from the Oracle database!
Firstly you need to prepare how you authenticate, you can use resource principal access which will involve DBA admin (see the Use Resource Principal to Access Oracle Cloud Infrastructure Resources documentation for details) or use your user credential — to create a credential that you will use in the OCI API to execute the data integration task, that’s done using this API — when you use user principal beware your keys will be rotated as a security good practice, but this is useful for quickly trying this out;
begin
DBMS_CLOUD.CREATE_CREDENTIAL(
'CREDENTIAL_NAME_HERE',
'ENTER_OCI_USER_OCID',
'ENTER_OCI_TENANCY_OCID',
'ENTER_PRIVATE_KEY_HERE'
'ENTER_FINGERPRINT_HERE'');
end;
You’ll need a few things from OCI Data Integration; the workspace ocid, the application key, the task key are the main ones. If you have parameterized your task you can also pass these parameters via the SDK. Then you can set the region and credential name to use, that’s it! Execute the PLSQL from within an ADB and you’ll launch tasks in OCI Data Integration.
In each of these you can see the key has a special format depending on whether its a reference to an existing object like this (there is a simpler way of parameterizing entities shown later);
dataref:connectionKey/bucketName/FILE_ENTITY:fileName
or
dataref:connectionKey/schemaName/TABLE_ENTITY:tableName
Or whether we are creating a new entity/object — see the OUTPUT_OBJECT EXAMPLE which also has foldering defined in the object name (xpdata/exppart1).
The simplest way to invoke tasks with parameters for the entities is to use a scalar string value for the entity name. For example in the source or target operator when selecting the entity, you can add a scalar parameter;
Then you can enter the parameter name using ${PVO_NAME} for example as shown below, then select this as the entity. At runtime, you can pass a scalar string value for the PVO name which makes it easy to execute the same task many times. Same goes for target table.
You can then use other APIs to monitor the task (see this post here for monitoring task execution using the PLSQL SDK), store the task run key in your own system tables, do whatever. You can also monitor the jobs in the OCI Console as below;
The PLSQL OCI Data Integration functions are defined here and the types are defined here. In this example I used the SQL Developer Web from within OCI Console to execute this…..
Very cool, and what makes it better is that as new functionality is added into the OCI Data Integration service and any other service, the ADBs are automatically updated.
See here for more information on OCI Data Integration;
There’s a post here on using python SDK to execute these tasks too;