Sqoop data import process performance tuning techniques.

Sqoop is a data ingestion tool widely used for transfer data between Hadoop and  RDBMS databases. Using Sqoop we can import data from RDBMS database systems to Hadoop and export data from Hadoop to RDBMS database systems. Sqoop uses MapReduce model to import and export the data which offers parallel processing as well as fault tolerance. Sqoop document is sufficient enough to learn the data transfer techniques between Hadoop and RDBMS. From this blog, I have shared Sqoop performance tuning techniques from my experience in building a High performing data ingestion framework using Sqoop.

Apache Sqoop
Photo by rawpixel.com from Pexels

Sqoop import performance factors depend on the following Sqoop arguments.

–num-mappers – Use n map tasks to import in parallel.

Sqoop can imports data parallelly from database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or –num-mappers argument. Each of these arguments takes an integer value which corresponds to the number of parallelism to employ. By default, four tasks are used(default configuration).

To optimize performance, set the number of map tasks to a value lower than the maximum number of connections that the database supports and allowed for the user.

Uses: –num-mappers 10

Note: Controlling the amount of parallelism that Sqoop will use to transfer data is the main way to control the load on your database. Using more mappers will lead to a higher number of concurrent data transfer tasks, which can result in faster job completion. However, it will also increase the load on the database as Sqoop will execute more concurrent queries therefore calculate it carefully per connection provided and available network bandwidth.

–split-by – The column of the table used to split work units across the mappers.

When performing parallel imports, Sqoop needs a criterion by which it can split the workload across the executors. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. If a table does not have a primary key defined, then use –split-by argument with Sqoop import command. You can explicitly choose a different integer column with this argument for optimal performance. If a table does not have a primary key defined and the –split-by <col> is not provided, then the import will fail unless the number of mappers is explicitly set to one along with the –num-mappers like –num-mappers 1

Uses: –split-by col1

–boundary-query – Boundary query to use for creating splits.

To partition data into multiple executors that is going to use by the different independent executor, Sqoop needs to find the minimum and maximum value of the column specified in the –split-by argument. In a table-based import, Sqoop uses the column specified with –split-by argument and generates the select query min(col), max(col) from table to identify the min and max boundary. In the case of the free-form query import, it will use the entire query specified on the command line as a subquery in place of the table name, resulting in a query select min(col), max(col) from (query). This is highly inefficient, as it requires materialization of the output result set prior to moving any data just for getting the import boundaries.

To overcome such kind of query, Sqoop does offer the parameter –boundary-query, with which a custom query can override the generated query. The only requirement for this query is to return exactly one row with exactly two columns. The first column will be considered the lower bound, while the second column will be the upper bound. The type of both columns must be the same as the type of the column used in the –split-by parameter.

Uses: –boundary-query “select min(id), max(id) from table where 1 = 1”

The query used for fetching boundaries can indeed be arbitrary. Let’s walk through a few examples. If you happen to know the boundaries prior to running Sqoop, you can select them directly without opening a single table using a constant boundary query like SELECT 1, 500.

If you’re storing the minimum and maximum values in different tables for accounting purposes, you can fetch the data from there as well. There is no requirement to reference any table used in the –query parameter inside the –boundary-query parameter. As the output of the boundary query serves as the basis for importing data, it is imperative that the return value not skew the import process.

–fetch-size – Number of entries to read from the database at once.

This argument controls the number of records to read from the database at a time. Sqoop by default use 1000 record at a time. Default configurations are good for the development and testing purpose but when it comes to production it can be increased for large tables. You can increase the value of the fetch-size argument based on the volume of data that need to read. Set the value based on the available memory and bandwidth of your environment.

Uses: –fetch-size 10000

Where 10000 represents the number of entries that Sqoop must fetch at a time.

In the case of MySql database, the following configuration can be used as well.

dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true

This configuration needs to supply with the jdbc connection argument.

Uses: –connect jdbc:mysql://host/db?dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true

–direct -. Use direct connector if exists for the database

This is currently supported by MySQL and PostgreSQL only. In the case of MySQL, the mysqldump and mysqlimport will be used for retrieving data from the database server or moving data back. In the case of PostgreSQL, Sqoop will take advantage of the pg_dump utility to import data. Using native utilities will greatly improve performance, as they are optimized to provide the best possible transfer speed while putting less burden on the database server.

Uses:   sqoop import –connect <JDBC URL> –username <USER_NAME> –password <PASSWORD> –table <TABLE_NAME> –direct