Sqoop In Depth

In earlier post we have seen  need of Sqqop ? what are the Sqoop commands ? And what are the file formats Sqoop supports.
Now lets dive in into the functional aspect of Sqoop like How does Sqoop imports/exports data from/to RDBMS..

Sqoop Import


Sqoop’s import tool will run a MapReduce job that connects to the MySQL database
and reads the table. By default, this will use four map tasks in parallel.Each task will write its imported results to a different file.If a distributed Hadoop cluster is being used, localhost (in jdbc:mysql://localhost/dbname )should not be specified in the connect string, because map tasks not running on the same machine as the database will fail to connect. For example we have 3 nodes (machines) in a cluster with ip 192.168.0.1,192.168.0.2,192.168.0.3 (consider these ip addresses just for example purpose). Lets assume MySql is installed on machine 192.168.0.2 .After running sqoop import with connection string as "jdbc:mysql://localhost/dbname", Map tasks running on node 192.168.0.2 would be able to access the mysql db since it is installed on same machine. But for the map tasks running on 192.168.0.1 or 192.168.0.3, localhost would try to connect to mysql from respective machines and since no mysql installed on those two machines it would throw an error.

The default separator for sqoop import is comma(,). You can overwrite this property with --fields-terminated-by 
Sqoop is capable of importing into a few different file formats.Default is Text file.

Text files - Sqoop by default produces an output in the form of text file which

offer a human-readable representation of data, platform independence, and the simplest
structure. However, they cannot hold binary fields (such as database columns of type
VARBINARY), and distinguishing between null values and String-based fields containing
the value "null"(We can control this behaviour using --null-string while importing ).
Other supported file formats - SequenceFiles, Avro datafiles, and Parquet files

Sqoop Java Code Generation -
Along with data transfer to HDFS, Sqoop also creates a source Java file which is written to the current local directory. Sqoop can use generated code to handle the deserialization of table-specific data from the database source before writing it to HDFS. The default name of the java file would be tablename.java. You can change this using --class-name property where you can specify any name to java file.

sqoop codegen --connect jdbc:mysql://localhost/website --table USERS --class-name CodeUsers


In above command, We specified that we’d like it to generate a class named CodeUsers; this will be written to CodeUsers.java. .We also could have specified --class-name and other code-generation arguments during the import process we performed earlier.


Till now we understood that Sqoop imports a table from a database by running a MapReduce and writes the records to HDFS. Now lets understand these steps in details.







The above diagram demonstrates how Sqoop interacts with both the database source and Hadoop.Sqoop uses JDBC,to communicate with RDBMS. Before the import can start, Sqoop examines the table that will be imported using JDBC, After examination of table it would gets list of all the columns and their data types.These data types are mapped to appropriate Java data types. Sqoop uses this information to create a java class we have seen above (CodeUsers.java).Sqoop’s code generator will use this
information to create a table-specific class to hold a record extracted from the table.


The class which gets generated by sqoop ,contains the methods that retrieve each column from an extracted record:
Public return_type get_column_name();

JDBC’s ResultSet interface provides a cursor that retrieves records from a query.
The MapReduce job launched by Sqoop uses an InputFormat that can read sections of
a table from a database via JDBC. The DataDrivenDBInputFormat provided with Hadoop partitions a query’s results over several map tasks.
Reading a table is typically done with a simple query such as:
SELECT col1,col2,col3,... FROM tableName

For better performance we can  divide this query across multiple nodes. This is done using a splitting column.Using metadata about the table, Sqoop will guess a good column to use for splitting the table (typically the primary key for the table, if one exists). The minimum and maximum values for the primary key column are retrieved, and then these are used in conjunction with a target number of tasks to determine the queries that each map task should issue.
For example lets say you have 100 records in a source table which you want to bring it on to hdfs and you want to run 10 map tasks parallel so that overall time to load data would be less . Sqoop would determine if there is any primary key column for the table(in our example user_id). When starting the MapReduce job, the DataDrivenDBInputFormat used to perform the import would issue a statement such as 
SELECT MIN(primary_id_column), MAX(primary_id_column) FROM Source_Table
This range decides which map task gets which set of data from the source. For this each map task runs below command to get specific range of data - 
first map would fire query like
SELECT col1,col2,col3 FROM source_table WHERE primary_id_column>= 1 AND primary_id_column < 10 
second map task would fire query like 
SELECT col1,col2,col3 FROM source_table WHERE primary_id_column>= 11 AND primary_id_column < 20 and so on.
The above range is decided based on number of parallel tasks In our case we had run 10 map tasks (using -m 10 )  and we have 100 records so each map task would have (100/10=10) records to copy. And we have MIN(primary_id_column) and  MAX(primary_id_column) as well. Based on all of above factors, Select range is decided. The choice of splitting column is essential to parallelizing work efficiently if the primary_column values are not uniformly distributed then some map tasks would have little or no work to perform, whereas some would have a great deal. So in this case if you want to provide different column you can specify it using --split-by column_name argument.
In case of -m 1 i.e. only one single mapper, Split operation is not performed.
To maintain the data consistency, It is advised that you should not update the data while import is in progress.

If you want to filter data while importing, You can achieve that using --where argument.
Lets say you want users data only from maharashtra. In this case you could user where like -

sqoop import --connect jdbc:mysql://localhost/website --table USERS --where "state='mh'"



Comments

Popular posts from this blog

Hadoop calculate maximum temperature explained

Hadoop Map-Reduce Word Count Java Example