Executing Tasks using OCI CLI in Oracle Cloud Infrastructure Data Integration

Here we will use the Oracle Cloud Infrastructure CLI to execute an OCI Data Integration task which has been published to an application. To execute a task you’ll need the OCID of the workspace itself, the application key and the task key. Everything else can be defaulted and overwritten when needed;

Run a task from the OCI Console

Tasks can have parameters for overriding the data asset, connection, schema, entity for any source or target operator. Tasks can also have parameters for filter conditions and join conditions. Parameters are really useful in OCI Data Integration dataflows — they are all about about reusing logic. Let’s see the different places parameters can be defined and some use cases to illustrate the capabilities. What can be parameterized?

  • Data Entity — the source and target operators in dataflows can have the data entity parameterized. You can design using a CSV, run passing a CSV or a compressed CSV or many compressed CSVs (patterns of objects)! For tables you may want to modify the table name or even have the SQL Query representing the entity passed into the execution; you can have a data entity that is a reference to an existing table/object/query or a new table/object. A use case for this could be for example a generic dataflow which has a source table say in an Oracle database and a target Parquet object say in Object Storage, each of these can be parameterized and then the task used to export a table specified at runtime along with the folder/object name where the data is written. We can then take this even further and parameterize the data asset and connection representing the database and export from an ADW or ATP or any of the other data asset types!
  • Schema/Bucket — the source and target operators in dataflows can have the schema parameterized. Many applications will have a cookie cutter approach with tenants in each schema or same logic for different buckets, the schema content/entities are the same, just switching the schema allows reuse of DataFlow for many uses. The schema can be a database schema or a bucket even.
  • Connection — sometimes the connection mechanism has to be switched at runtime for different uses for different authentication/authorization. The source and target operators in dataflows can have the connection parameterized.
  • Data Asset — many customers will have cookie cutter applications with tenants in different databases for example. If you parameterize the data asset you would have to parameterize the connection also. The source and target operators in dataflows can have the data asset parameterized.
  • Filter/Join conditions as parameters allow controlling what data is to be processed. The entire filter or join can be parameterized.
  • Expression text can use parameters — this allows allow controlling how expressions are defined at runtime. This can be configured in many operators within dataflow such as filter, join, lookup, pivot, split or expression/aggregate expressions. For example you could parameterize a filter to get orders created after a value passed as a parameter (FILTER_1.ORDER_TXNS.ORDER_DATE > $LAST_LOAD_DATE). This is an incremental extract use case from a source we could parameterize by passing the date of the last extract into a dataflow and the parameter will be used in a filter condition.

Below you can see what happens in the console when you run a task with parameters, the console will prompt to see if you wish to override the defaults;

Run the task and pass runtime parameters.

In the code example below we will see how to execute with default values and then how to execute and pass a different object name to be used during execution.

oci data-integration task-run create --workspace-id YOUR_WORKSPACE_OCID --application-key YOUR_APPLICATION_KEY --registry-metadata '{"aggregator-key":"YOUR_TASK_KEY"}'

We can execute this from within the OCI Console as below;

Execute the create task run command

You can then refresh the runs and see the new task run in the OCI Console;

Data Entity Parameters

When you want to override parameters pass in values in the config provider property. This is a JSON snippet which has binding for the parameters. If it’s a database table you can simply pass in the table name, if it’s a file you’’ll need to pass in the data format for the data, compression and so on. For example below you can see dataFormat, the format attribute passes in model type of CSV_FORMAT, the encoding (UTF-8), the delimiter (,), the quote character (“), the time stamp format (yyyy-MM-dd HH:mm:ss.SSS), escape character(\);

Example of a CSV data file parameter named INVENTORY_DATA

If it was JSON, the data format is much simpler, just need the model type of JSON_FORMAT and the encoding of UTF-8 (for example);

Example of a JSON data file parameter named INVENTORY_DATA.

If its a database table its even simpler, the parameter is named SRCDATAPARAM;

In each of these you can see the key has a special format;

dataref:connectionKey/bucketName/FILE_ENTITY:fileName

or

dataref:connectionKey/schemaName/TABLE_ENTITY:tableName

Below you can see an example where the file and format is passed in as a parameter to the execution, the property is passed into the parameter config-provider.

oci data-integration task-run create --workspace-id YOUR_WORKSPACE_OCID --application-key YOUR_APPLICATION_KEY --registry-metadata '{"aggregator-key":"YOUR_TASK_KEY"}' --config-provider '{"bindings":{"INVENTORY_DATA":{"rootObjectValue":{"modelType":"ENRICHED_ENTITY","entity":{"key":"dataref:0a5ae9ef-5b74-447a-9d5b-49511f3b7600/a_supplierx_stage/FILE_ENTITY:supplierx_inventory2.csv","name":"supplierx_inventory2.csv","modelType":"FILE_ENTITY","resourceName":"FILE_ENTITY:supplierx_inventory2.csv"},"dataFormat":{"formatAttribute":{"modelType":"CSV_FORMAT","encoding":"UTF-8","delimiter":",","quoteCharacter":"\"","hasHeader":true,"timestampFormat":"yyyy-MM-dd HH:mm:ss.SSS","isFilePattern":false,"escapeCharacter":"\\"},"type":"CSV","compressionConfig":{"codec":"NONE"},"isFilePattern":false}}}}}'

You can then see the task running in the console;

Monitor the tasks running from within the OCI Console

It’s quite simple to execute commands using the CLI, you can put it in a cron job or your favorite scheduler. There are some other useful capabilities.

Process Directories/Folders

Data Integration also supports processing directories of objects in a bucket, that’s based on the naming convention for the object — so if the objects have name ‘20200707/meterx10934.csv’, ‘20200707/meterx10935.csv’ etc., then using ‘20200707/’ as the file entity name when executing above will process all objects with that prefix, in that logical directory.

Process File Patterns

Data Integration also supports processing patterns of objects, so you can process ‘supplierx*json” for example. Enter this as the resource name and set isFilePattern to true above.

Process Zipped Data

Data Integration also supports processing zipped objects, so you can process ‘supplierx*csv.gz” for example. Enter this as the resource name and set the compression algorithm to use above. There is support for GZIP, BZIP2, DEFLATE, LZ4, SNAPPY, you can set the value in the codec property within the compressionConfig property;

The compression options are defined when selecting the entity.

The above are cool capabilities that let you design small and process much more via the parameterization of these properties.

Data Asset / Connection / Schema Parameters

When you want to override parameters for data asset, connection and schema, you can similar to entity pass in the values in the config provider property. Here is an example that passes in the data asset, connection and schema for object storage info (not you must specify the key and the name and the type;

oci data-integration task-run create --workspace-id YOUR_WORKSPACE_OCID --application-key YOUR_APPLICATION_KEY --registry-metadata '{"aggregator-key":"YOUR_TASK_KEY"}'
--config-provider '{"bindings":{
"DATA_ASSET_PARAM":{"rootObjectValue": {"modelType":"ORACLE_ADWC_DATA_ASSET","key":"your_data_asset_key", "name":"your_data_asset_name"}},
"CONNECTION_PARAM":{"rootObjectValue":{"modelType":"ORACLE_ADWC_CONNECTION","key":"your_connection_key", "name":"your_connection_name"}},
"SCHEMA_PARAM":{"rootObjectValue":{"modelType":"SCHEMA","key":"dataref:your_connection_key/your_schema_name", "name":"your_schema_name"}}}}'

Filter / Join / Literal Parameters

When you want to override parameters for filter condition, join condition or literals you can similar to entity pass in the values in the config provider property. Here is an example that passes in the information;

oci data-integration task-run create --workspace-id YOUR_WORKSPACE_OCID --application-key YOUR_APPLICATION_KEY --registry-metadata '{"aggregator-key":"YOUR_TASK_KEY"}'
--config-provider '{"bindings":{
"FILTER_PARAMETER":{"simpleValue": "FILTER_1.USER1OBJECTS_CSV.PID is not null"},
"JOIN_PARAMETER":{"simpleValue":""},
"LITERAL_PARAMETER":{"simpleValue":"777"}}}'

I hope you found this post useful, it was a quick flyover of using the Oracle Cloud Infrastructure CLI to execute a task which had been published to an application.

Check out the Oracle Cloud Infrastructure here along with details of the Oracle Cloud Infrastructure Data Integration service;

https://www.oracle.com/middleware/data-integration/oracle-cloud-infrastructure-data-integration