Event driven transformation of files to the cloud using Data Integration
This use case illustrates an OCI Data Integration solution where the integration supports many consumers staging product inventory information, the files are posted to buckets in Object Storage. There is a bucket per supplier, different file formats are supported, some suppliers work with JSON, some work with TSV others with CSV (Avro, Parquet are supported also and more to come), some may have compressed data and so on. They don’t have to worry about how this is all done they post their favorite files and the Data Integration tasks take care of staging this into Autonomous Databases, object storage buckets or other database systems.
How does it work? When the object is uploaded we will trigger a task run to process the data — there is one rule listening on object creation events on the buckets. When the event is matched we will trigger a function which invokes the Data Integration task — the Fn function does some data binding to the execute task API, its some simple python glue code.
Here we can see the data flow, it has an input of the inventory data, this is being enriched with the supplier information. We could do all kinds of transformations here such as deduplication, data preparation, aggregation and so on. The dataflow has 3 parameters; one for the inventory data (INVENTORY_DATA), one for the bucket the inventory data is in (SUPPLIER_BUCKET) and one for the supplier reference data (SUPPLIER_REF).
We’ve designed a single dataflow that is parameterized and used for loading JSON, CSV, gzipped data and so on. It was designed with a sample CSV representative of the shape, a small set of sample data. The parameters allow us to pass in much more data such as multiple files, CSV data, TSV data, GZipped data and so on.
Here is the task run dialog in the OCI Console we can see how the parameters are able to be defined here. Different file types can be specified or file patterns can be defined and the data may be compressed.
Let’s see this in action, let’s upload some data into Object Storage, let’s upload gzipped data plus json and tsv data;
In OCI Data Integration we will see a task run in application task run monitoring page;
Let’s look under the covers and see what has been defined in the event service, we can see for the Object Create event type, we are matching on the compartment id and the bucket names.
On the creation of a new object we will trigger the Fn function.
The Fn Function is a simple python script which will actually pass the object storage object name to the task. We can do things like check the suffix type and call data integration (then we can have one function rather than one per type). You can see the information produced on objects within Object Storage for example here.
The code for this Fn function used in example is here;
Now we have seen events trigger the jobs, what we can ALSO do as a push notification — we have a topic defined and subscribe to it with the same function.
Now below we’re going to use a PATTERN (suppliery_inventory*.json) for the files and process all JSON files!
This will then cause the function to be executed and the task kicked off.
That’s it, thanks for reading this far! Above we have see an interesting use case that illustrated an OCI Data Integration solution where the integration supported many consumers staging product inventory information, the files were posted to buckets in Object Storage then loaded into Autonomous Databases.
If you want an intro to creating functions in Fn you can check out the OBE here.
Check out all the blogs related to Oracle Cloud Infrastructure Data Integration — https://blogs.oracle.com/dataintegration/oracle-cloud-infrastructure-data-integration. To learn more, check out the Oracle Cloud Infrastructure Data Integration Tutorials and Documentation.