Incremental Extract using OCI Data Integration

This solution makes incremental extract and load as simple as possible for the user AND not only for a single source entity but with a metadata driven approach to support many source entities. There’s plenty of users just now asking how do I do this on a single table, how do I store the state, how do I orchestrate it. This makes it effortless for a single or for many tables. The example uses Oracle database source as the illustration but can be used effortlessly to support extract from Fusion applications using BICC connectivity also. This can then be scheduled and executed on a regular basis, this can also handle changes in source (new columns for example) and make them available in the target so you do not need to worry about schema drift.

The incremental driver procedure is driven from a metadata config table which has the source/target pairs and a task designed by the customer that is parameterized. The user designs a dataflow that filters the source data using a date field like last updated and the value is parameterized as are the source and entity information. This task is then called for each source/target pair. The procedure maintains the watermark for each extract. Upon successful execution the watermark advances — any error run will be reprocessed on subsequent executions. The metadata configuration table has the last run key in OCI DI, the last extract date and the last run date. These are used to orchestrate the parameter values upon execution.

The current design supports insert and overwrite modes on the target.

  • get privileges in the database to utilize OCI — this is best done using resource principal, see below (needs OCI administrator and database admin to do) — it can also be done by using an OCI user principal, this is done by creating a credential in the ADW with your OCI user, you will need the OCID, key, fingerprint etc.

The solution currently supports a fixed set of parameters that you must define in the dataflow— you must follow the naming of these exactly in your dataflow;

  • P_EXTRACT_SCHEMA — the source schema (parameterize the source schema in the source operator)

When the procedure is executed, the workspace is provided along with values for the credential for the procedure to invoke the task using and the task to run along with whether it is FULL or INCREMENTAL mode. Typically customers will schedule the INCREMENTAL load on a regular cadence, the extract of any updated data will be done and loaded to the target.

Database as Source

For a database as source, the dataflow design looks like below, the source schema and entity are parameterized using P_EXTRACT_SCHEMA and P_EXTRACT_OBJECT respectively;

The filter is parameterized (all of my sources have a column named LOAD_DATE) as are the source and target entities. In the filter condition ensure you parameterize the value with a parameter named P_EXTRACT_DATE that is of datatype TIMESTAMP.

On the target the data entity name is parameterized — rather than the entire data entity;

Fusion as Source

If this was a Fusion source, there is no filter operator needed, the Fusion source operator has a property for incremental and an extract date property. Ensure you parameterize this with a parameter named P_EXTRACT_DATE that is of datatype TIMESTAMP, the source schema and entity are parameterized using P_EXTRACT_SCHEMA and P_EXTRACT_OBJECT respectively;

When the incremental load executes, the procedure in the solution checks the status of each run using LASTRUNKEY. If the status was SUCCESSFUL, then the last run date is used as the extract date — so only records updated after that date are extracted. If the last run was in ERROR state, then the last extract date is used as the extract date — so records are reprocessed.

Object Storage as Target

Your dataflow design can target whatever you desire. The above examples were targeting databases, its also possible to have this target Object Storage. When the driver is executed you can pass the connection key for your OCI Object Storage data asset and set the bucket name and object name in the driver table. This may need extending if you also want to include some path information to include the year/month/day for example.

Sample Data

The example below uses 2 simple tables to represent our source, they have different number and names of columns but do have a common LOAD_DATE column. We will use this to see how we can incrementally extract from these. The example dataflow I used had a database as a source and a database as target.

create table srctab1 (
C1 VARCHAR2(10),
C2 VARCHAR2(10),
LOAD_DATE TIMESTAMP(6) WITH TIME ZONE);
create table srctab2 (
X VARCHAR2(10),
Y VARCHAR2(10),
Z VARCHAR2(10),
LOAD_DATE TIMESTAMP(6) WITH TIME ZONE);
insert into srctab1
SELECT 'a'||LEVEL, 'a'||LEVEL,CURRENT_TIMESTAMP
FROM DUAL CONNECT BY LEVEL <= 100;
insert into srctab2
SELECT 'a'||LEVEL, 'a'||LEVEL,'a'||LEVEL,CURRENT_TIMESTAMP
FROM DUAL CONNECT BY LEVEL <= 300;
commit;

Populate the metadata configuration

Populate the driver table with a row for each source/target pair replace with your application name, task name and connection key source source/target, schema name and table name. The source, the last 3 NULL values are the last extract date, the last run date and the last run key — these will be populated during the procedure execution.

insert into adrivertable values(
'YourApplicationName',
'YourTaskName',
'your_source_connection_key',
'YOURSRCSCHEMA',
'SRCTAB1',
'your_target_connection_key',
'YOURTGTSCHEMA',
'TGTTAB1',
NULL, NULL,NULL);
insert into adrivertable values(
'YourApplicationName',
'YourTaskName',
'your_source_connection_key',
'YOURSRCSCHEMA',
'SRCTAB2',
'your_target_connection_key',
'YOURTGTSCHEMA',
'TGTTAB2',
NULL, NULL,NULL);
commit;

Execute the Procedure

Initial load execution pass the value FULL;

exec INCREMENTAL_UTILS.execute_tasks(
'OCI$RESOURCE_PRINCIPAL',
'us-ashburn-1',
'your_workspace_ocid',
'YourApplicationName',
'YourTaskName',
'FULL')

After you have executed this, go to the OCI Data Integration console and look at the runs for the tasks, you should see 2 runs for example (that’s what I was using above).

When this is completed, try out the incremental extract. If you try before the initial load is completed, the procedure will return.

Incremental extract pass the value INCREMENTAL, first insert some new rows;

insert into srctab1 
SELECT 'b'||LEVEL, 'b'||LEVEL,CURRENT_TIMESTAMP
FROM DUAL CONNECT BY LEVEL <= 100;
exec INCREMENTAL_UTILS.execute_tasks(
'OCI$RESOURCE_PRINCIPAL',
'us-ashburn-1',
'your_workspace_ocid',
'YourApplicationName',
'YourTaskName',
'INCREMENTAL')

Summary

Here we have seen a solution that makes incremental extract and load as simple as possible for the user AND not only for a single source entity but with a metadata driven approach to support many source entities. This makes it effortless. The example uses Oracle database source as the illustration but can be used effortlessly to support extract from Fusion applications using BICC connectivity also. This can then be scheduled and executed on a regular basis. Check out earlier posts such as this one Executing Data Integration Tasks using PLSQL and see what else you can do.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
David Allan

Architect at @Oracle developing cloud services for data. Connect on Twitter @i_m_dave