Executing Tasks using Python SDK in Oracle Cloud Infrastructure Data Integration
Here we will use the Oracle Cloud Infrastructure Data Integration Python SDK to execute a task which has been published to an application. To execute a task you’ll need the OCID of the workspace itself, the application key and the task key. Everything else can be defaulted and overwritten when needed — I’ll demonstrate both cases here. In this post you will see how to pass a CSV, a JSON file a table into a task. Calling this API has the same effect as invoking “Run” from a task within an application in OCI Console;
Tasks can have parameters for overriding the data asset, connection, schema, entity for any source or target operator. Tasks can also have parameters for filter conditions and join conditions. Parameters are really useful in OCI Data Integration dataflows — they are all about about reusing logic. Let’s see the different places parameters can be defined and some use cases to illustrate the capabilities. What can be parameterized?
- Data Entity — the source and target operators in dataflows can have the data entity parameterized. You can design using a CSV, run passing a CSV or a compressed CSV or many compressed CSVs (patterns of objects)! For tables you may want to modify the table name or even have the SQL Query representing the entity passed into the execution; you can have a data entity that is a reference to an existing table/object/query or a new table/object. A use case for this could be for example a generic dataflow which has a source table say in an Oracle database and a target Parquet object say in Object Storage, each of these can be parameterized and then the task used to export a table specified at runtime along with the folder/object name where the data is written. We can then take this even further and parameterize the data asset and connection representing the database and export from an ADW or ATP or any of the other data asset types!
- Schema/Bucket — the source and target operators in dataflows can have the schema parameterized. Many applications will have a cookie cutter approach with tenants in each schema or same logic for different buckets, the schema content/entities are the same, just switching the schema allows reuse of DataFlow for many uses. The schema can be a database schema or a bucket even.
- Connection — sometimes the connection mechanism has to be switched at runtime for different uses for different authentication/authorization. The source and target operators in dataflows can have the connection parameterized.
- Data Asset — many customers will have cookie cutter applications with tenants in different databases for example. If you parameterize the data asset you would have to parameterize the connection also. The source and target operators in dataflows can have the data asset parameterized.
- Filter/Join conditions as parameters allow controlling what data is to be processed. The entire filter or join can be parameterized.
- Expression text can use parameters — this allows allow controlling how expressions are defined at runtime. This can be configured in many operators within dataflow such as filter, join, lookup, pivot, split or expression/aggregate expressions. For example you could parameterize a filter to get orders created after a value passed as a parameter (FILTER_1.ORDER_TXNS.ORDER_DATE > $LAST_LOAD_DATE). This is an incremental extract use case from a source we could parameterize by passing the date of the last extract into a dataflow and the parameter will be used in a filter condition.
Below you can see what happens in the console when you run a task with parameters, the console will prompt to see if you wish to override the defaults;
In the code example below we will see how to execute with default values and then how to execute and pass a different object name to be used during execution.
Data Entity Parameters
When you want to override parameters pass in values in the config provider property. This is JSON which has binding for the parameters. If its a database table you can simply pass in the table name, if its a file you’’ll need to pass in the data format for the data. For example below you can see dataFormat, the format attribute passes in model type of CSV_FORMAT, the encoding (UTF-8), the delimiter (,), the quote character (“), the time stamp format (yyyy-MM-dd HH:mm:ss.SSS), escape character(\);
If it was JSON, the data format is much simpler, just need the model type of JSON_FORMAT and the encoding of UTF-8 (for example);
The file properties are as follows;
If its a database table its even simpler;
In each of these you can see the key has a special format;
dataref:connectionKey/bucketName/FILE_ENTITY:fileName
or
dataref:connectionKey/schemaName/TABLE_ENTITY:tableName
Below you can see an example where the file and format is passed in as a parameter to the execution, the property cpjson is passed into the parameter config_provider.
You can then see the task running in the console;
Simple to execute and also wrapper commands using the Python SDK!
Data Integration also supports processing directories of objects in a bucket, that’s based on the naming convention for the object — so if the objects have name ‘20200707/meterx10934.csv’, ‘20200707/meterx10935.csv’ etc., then using ‘20200707/’ as the file entity name when executing above will process all objects with that prefix, in that logical directory. Next up we will take this a step further and see Oracle Cloud Infrastructure Fn in action.
Data Asset / Connection / Schema Parameters
When you want to override parameters for data asset, connection and schema, you can similar to entity pass in the values in the config provider property. Here is an example that passes in the data asset, connection and schema for object storage info;
Filter / Join / Literal Parameters
When you want to override parameters for filter condition, join condition or literals you can similar to entity pass in the values in the config provider property. Here is an example that passes in the information;
Check out the Oracle Cloud Infrastructure Python SDK here along with details of the new Oracle Cloud Infrastructure Data Integration service;
https://www.oracle.com/middleware/data-integration/oracle-cloud-infrastructure-data-integration