Automate loading data to a data lake or data warehouse using OCI Data Integration and Fn
Oracle Cloud Infrastructure Data Integration is an Oracle-managed service that provides extract, transform, and load (ETL) capabilities to target data lake and data mart use cases on Oracle Cloud Infrastructure (OCI). Oracle Functions is a fully managed, multi-tenant, highly scalable, on-demand, Functions-as-a-Service platform.
This illustration and architecture shows how you can use Data Integration and server-less functions to automate the process of extracting data from files generated by various databases or applications and loading the data into a data warehouse or data lake for analysis. The post here showed a simple hello world example of using Data Integration and Fn together, the basis of this is used to execute tasks.
Oracle Cloud Infrastructure services emit events which are structured messages that describe the changes in resources. Events are emitted for all kinds of things including create, read, update, or delete operations, resource lifecycle state changes, and system events that affect cloud resources. In this example, when a file is uploaded to a bucket in Oracle Cloud Infrastructure Object Storage, an event is emitted. The event invokes a Data Integration task which extracts data from the file and loads it to Oracle Autonomous Data Warehouse.
The diagram illustrates the flow. When CSV files are uploaded to a specific bucket in Oracle Cloud Infrastructure Object Storage, the Events service is triggered. The emitted event invokes a function that wrappers a Data Integration task, which extracts the data from the uploaded files and loads the data into an Oracle Autonomous Data Warehouse instance for example.
To create a new python based function to invoke the task, we can simple do;
fn init --runtime python execute-task-inputcd execute-task-input/
This generates a stub that if we change the generated func.py to have the content below with whatever workspace id and task you want to use;
import ioimport jsonimport timefrom fdk import responseimport ocifrom oci.data_integration.data_integration_client import DataIntegrationClientdef handler(ctx, data: io.BytesIO=None): signer = oci.auth.signers.get_resource_principals_signer() body = json.loads(data.getvalue()) resource_name = body["data"]["resourceName"] resp = do(signer,resource_name) return response.Response( ctx, response_data=resp, headers={"Content-Type": "application/json"} )def do(signer, objectName): dip = DataIntegrationClient(config={}, signer=signer) wsid = "ocid1.disworkspace.oc1.iad.anuwcljt2ow634yaab6eqwuaweacfbsb2zvj5uftarsfabvyv6peeko4jkeq" application="33521450-ac88-4405-92d6-f3f8e5f03774" task="a9494475-4f8b-4fb7-b4d8-b07961245fd2" md = oci.data_integration.models.RegistryMetadata(aggregator_key=task) trkey = str(int(time.time())) cp={"bindings":{"SRCDATAPARAM":{"rootObjectValue":{"modelType":"ENRICHED_ENTITY","entity":{"modelType":"FILE_ENTITY","key":"dataref:37bed599-997d-4da2-b0a9-cccccc46ec5b/disdemodata/FILE_ENTITY:"+objectName, "externalKey":"https://objectstorage.us-ashburn-1.oraclecloud.com/myosnamespace/disdemodata/"+objectName, "objectStatus" : 1},"dataFormat":{"formatAttribute":{"modelType":"JSON_FORMAT","encoding":"UTF-8"},"type":"JSON"}}}}} task = oci.data_integration.models.CreateTaskRunDetails(key=trkey, registry_metadata=md, config_provider=cp) tsk = dip.create_task_run(wsid,application, create_task_run_details=task) return tsk.data
The config provider value is specific to my connection, bucket etc, you would have to change for your environment, this is loading JSON also, so you would have to change to whatever you have. Then deploy the function to the application (distools in my case);
fn -v deploy --app distools
You can invoke the function from the command line to test for example;
echo '{"data":{"resourceName":"meterdata071820.json"}}' | fn invoke distools execute-dataflow-input
This passes the JSON (a simplified version of an event payload — see https://docs.cloud.oracle.com/en-us/iaas/Content/Events/Reference/eventsproducers.htm for details of event producers) to the standard input of the function, the function is invoked and the data in the object name meterdata071820.json processed.
Now we get to the fun part! Let’s create an event in OCI that is triggered when an object is created in a bucket — when an object is created in a bucket in a specific compartment then execute the DIS function to process the data.
Notice the rule is enabled. Its now ready to process data. Let’s see that in action, you will see a new object uploaded, the rule triggered and the task in DIS executed below;
Here you have seen multiple services from Oracle Cloud Infrastructure work together to load data into the Data Lake or ADW, leverage Data Integration, Fn and Events Service to automate the load. Check out the Data Integration in OCI and try it out along with all the other services including the Autonomous Data Warehouse, Functions, Event Service and much more!