Spark Parallel Export from Databases to Object Storage
Are you looking to export large tables fast? An approach to export data out in multiple chunks in parallel? Using a robust, open source mechanism? These are typical questions when customers are looking to move data from hot to cold storage or into a Data Lake like OCI Object Storage. It is becoming more and more frequent, being able to scale this also is important.
In this blog we will look at leveraging an OCI Dataflow application which supports parallel export from databases into partitioned data in Object Storage using Apache Spark. The number of workers can be configured when executing the application along with the information of where to extract from, the partition/parallel chunks from that source and also the target partition column (this lets users write data into Object Storage with the partition information in the path — such as the date.
This can be visualized as follows with chunks of the source table or query result being extracted in parallel and written in parallel to OCI Object Storage in partitioned form;
The number of workers can be controlled so you can control how much parallelism is performed in spark along with the number of partitions that you want to concurrently process from the source database.
Obviously how many workers, partitions and source / target partition columns are all important aspects in the performance you will get along with the shape of the workers — all of which are configurable, OCI Dataflow also has auto scaling capability. This will help you save resources and reduce time on management, use Spark dynamic allocation. Resource planning for data processing is a complex task and is a function of the volume of the data.
The below table details the mandatory arguments for the Application to export from database to Object Storage — there are optional arguments for fetch size and also batch size;
The target partition column is optional — users can define this to partition the data on target. Another option is to use the Spark configuration property (spark.sql.files.maxRecordsPerFile) to write files by the number of records. This may be useful for when constraining the file size so that the downstream dependencies are met.
The sample export from Oracle database to OCI Object Storage script is here;
There is a different variant for export from an autonomous database such as an ADW here, this includes the wallet file which you can get from the OCI Console;
The sample export from ADW to OCI Object Storage script is here;
When the application is executed you can control the shape of the executor plus driver and the number of executors. This will help scale large executions, adding more executors will provide more concurrency. This can be done when the application is created and then executed, you can select the number of executors and also the shape for both the driver and the executors;
One of the capabilities that is very useful in OCI Dataflow is the Autoscaling feature, you see above it was disabled, above I explicitly can define the number of executors.
Below I have selected Enable Autoscaling and you can see that now I can define the minimum and maximum number of executors.
This is useful for this parallel export capability as not all exports are the same, so using enable autoscaling I only use what I actually need. Spark executes queries and loads entire result-set in memory. Therefore each partition should only fetch data small enough to be held in memory. For OCI Dataflow you can configure the executor size (in above I was using 30 GB of memory). Having more partitions along with higher number of executors can further improve speed by increasing parallelism.
Hopefully some useful information for you here, let me know what you think, would love to hear. These examples were related to Oracle but all other JDBC accessible sources can be used. Let me now if there are other ones you are interested in.