Pandas, Functions and Data Integration

There’s times when everything you need is in the box, then there’s times you need to get creative. Let’s see some cases for when you want to push the envelope and get creative in OCI Data Integration with respect to dataflows. Since the initial release, the list of operators is growing, you can see new operators being released continually. These operators support lots of data transformations you can perform operations on sets of attributes and change shape and enrich. What if you need to do more?

Recently OCI Data Integration introduced the Function Operator — this lets you plug an OCI Function into the dataflow as a mid-stream operator. Some examples? Well its very easy using python functions to use libraries like pandas to unpivot data, this let’s you do some custom transformations to solve business problems. I’ve used python here but there are a lot of languages available. The function operator has 4 items of information that need configured;

  • the function itself
  • the input parameters
  • the output parameters
  • the function configuration attributes (by default there is one named BATCH_SIZE=10000 which controls the batches each function will be given), you can also add in attributes to control how the function behaves.

The example I’ll illustrate is how to do an unpivot, the unpivot transformation can be represented graphically as the image below, columns are transposed to values in multiple rows;

To drive this transformation and make it generic we need some information on the role of the attributes, this is defined via attributes in the the function configuration — they define the column meaning, which columns play the role of the “identity” columns and which play the role of the unpivoted data key and value. In the OCI Function we can use the pandas library in python to do interesting transformations on this quite easily, from within OCI Data Integration we can integrate this via a midstream OCI Function that has the python code; this takes the input payload and unpivots the data and returns the output. Essentially its using the melt function in python pandas (see the reference doc here and an easy to follow blog here).

Let’s look at a specific example, here is our source data with columns for Q1, Q2, Q3, Q4;

Viewing the upstream input data for the function

To unpivot this so we get rows for each quarter, let’s see how we do it. First let’s define the input attributes for the shape of the function note we only pass the required data to the transformation — the quarters;

Q1,Q2,Q3,Q4 were added as inputs to function.

Then in the function configuration we define the values for the var_name which is the unpivot key column name (quarter here, the values will be Q1, Q2, Q3,Q4). We also need the value_name, which will be the name for the column with the quarter value (sales);

The var_name (quarter) and value_name (sales) attributes were added as function configuration.

If for some reason we needed to pass additional attributes to the function that were not to be unpivoted, then we can also specify an id_cols function configuration that would be passed to the python pandas melt function. The function implemented for this example supports a comma separated value — so say if staff_no and name were also passed, then in the function configuration you can add id_cols attribute of type VARCHAR and its value would be “staff_no,name”. The function splits the string by “,” and creates a list that gets passed to id_vars in melt function. Using the function configuration like this we can write generic functions that can transform many different shapes.

We also need the output attributes created by the function — in this case its the quarter and the sales value;

Define the output attributes — quarter and sales

When we preview we can see we now have more rows, one for each quarter — in this example I excluded the input columns (Q1,Q2,Q3,Q4) which are still projected;

View the data

Here is the function definition for supporting the above example — you could take this code and modify the execute method to do other interesting things;

The function is a passive operator that should be idempotent. What do I mean by passive? I mean you only need to pass in the data for the function that is needed and you can enrich the upstream data with additional information (there’s an identifer called secret_id_field that you can use to correlate the data you return with the upstream data).

The OCI Function integration in OCI Data Integration invokes the function in user-defined batches, from a large data set the user configured the batch size and that’s what you get as input to the function.

By default the function operator has a function configuration attribute (BATCH_SIZE) whose value is 10,000 — that’s 10,000 rows for each function, this system defined attribute is not passed at runtime but is used by Data Integration in the micro batching above. The input document is represented as this JSON document

{“data”:”<base64 encoded JSON document>”,”parameters”:{“parameter_name”:”parameter_value”, “parameter_name”:”parameter_value”, “parameter_name”:”parameter_value”}}

Each input row within the base64 encoded JSON document has an identity column (secret_id_field). Depending on the function you are writing you may or may not need to know or use this column. In the unpivot case the function writer must know about it as the way that melt works you have to specify the id_cols parameter which defines the columns which are not to be unpivoted but are the identity columns in the unpivoted data. In this example, me as the function writer don’t want the data integration user of the function to know about this, they just tell me what their logical identity columns are (staff_no and name for example) or none of those are not even passed, within the function I add secret_id_field into the identity list for melt function.

You can test your function from the command line by

echo '{"data":"<base64 encoded JSON document>","parameters":{"parameter_name":"parameter_value", "parameter_name":"parameter_value", "parameter_name":"parameter_value"}}' | fn invoke <fnapplicationname> <fnname>

For example here this input record;

{"secret_id_field":1,"staff_no":9999, "name":"Dean McGrath", "2016":349, "2017":231, "2018":876, "2019":679}

is represented as the following base64 value;

eyJzZWNyZXRfaWRfZmllbGQiOjEsInN0YWZmX25vIjo5OTk5LCAibmFtZSI6IkRlYW4gTWNHcmF0aCIsICIyMDE2IjozNDksICIyMDE3IjoyMzEsICIyMDE4Ijo4NzYsICIyMDE5Ijo2Nzl9Cg==

If I then call my function as follows;

echo '{"data":"eyJzZWNyZXRfaWRfZmllbGQiOjEsInN0YWZmX25vIjo5OTk5LCAibmFtZSI6IkRlYW4gTWNHcmF0aCIsICIyMDE2IjozNDksICIyMDE3IjoyMzEsICIyMDE4Ijo4NzYsICIyMDE5Ijo2Nzl9Cg==","parameters":{"id_cols":"staff_no,name","var_name":"year","value_name":"hours"}}' | fn invoke diworkshopapp unpivot

The response is a JSON document which OCI Data Integration consumes and continues downstream processing. The response must also have the record’s identity field — secret_id_field.

Debugging

Use metrics, check for successful executions, errors and so on. For logging, often I will add a debug function configuration attribute and then in the function use print for example to write to the OCI Functions log.

debugparam = input_parameters.get("debug")debug = "false"if (debugparam is not None):    debug = debugparam# Example to print the input data to the OCI Functions log
if debug == "true":
print(input_data, flush=True)

Common Issues

  1. Check your policies — ensure OCI Data Integration can invoke your service. Also check that your function can use the dependencies you have.
  2. If you see blank values or get java.lang.NumberFormatException check that the attributes you defined on the output match the response from your function — for example check case/spellings.
  3. when viewing data in your dataflow and you see “com.oracle.bmc.model.BmcException: (-1, null, false) Processing exception while communicating to: https://functions.null.oci.oraclecloud.com (outbound opc-request-id: nnnnnnnn)” check that you have actually selected an OCI Function in the Details panel of the Fn operator within the dataflow in OCI Data Integration.
  4. when viewing data in your dataflow and you see something like “com.oracle.bmc.model.BmcException: (502, FunctionInvokeExecutionFailed, false) function failed (opc-request-id: nnnnnn)” check that you have mapped the input attributes of the function.

Summary

As you have seen we can add arbitrary transformations such as unpivoting data, thee are plenty of other examples from calling your own logic/micro services to OCI Language services such as Named Entity Recognition. We looked at unpivoting data using a generic function that had parameters to drive the role of the attributes in the unpivot. There’s many more use cases for this as you will soon discover, see the OCI Data Integration documentation for the out of the box operators.

Architect at @Oracle developing cloud services for data. Connect on Twitter @i_m_dave