In a world where new data processing languages appear every day, it can be helpful to have tutorials explaining language characteristics in detail from the ground up. This blog post is not such a tutorial. It also isn’t a tutorial on getting started with MySQL or Hadoop, nor is it a list of best practices for the various languages I’ll reference here – there are bound to be better ways to accomplish certain tasks, and where a choice was required, I’ve emphasized clarity and readability over performance. Finally, this isn’t meant to be a quickstart for SQL experts to access Hadoop – there are a number of SQL interfaces to Hadoop such as Impala or Hive that make Hadoop incredibly accessible to those with existing SQL skills.
Instead, this post is a pale equivalent of the Rosetta Stone – examples of identical concepts expressed in three different languages: SQL (for MySQL), Pig and Spark. These are the exercises I’ve worked through in order to help think in Pig and Spark as fluently as I think in SQL, and I’m recording this experience in a blog post for my own benefit. I expect to reference it periodically in my own future work in Pig and Spark, and if it benefits anybody else, great.
Setting up
To make life easy, I’m using the Cloudera Quickstart VM to execute all of these examples. This is easy to download and run using Oracle VirtualBox, and the image provided also conveniently includes MySQL. For the most accessible Hadoop experience, I’m using Cloudera Manager, though my examples will be from the command-line clients, and you can stand up CDH without Cloudera Manager if RAM is limited.
MySQL Data
I’m using the employees
sample database, which is
available from Launchpad here. This database was cultivated by
Giuseppe Maxia (thanks!) and is familiar to many MySQL users.
To download and install it in the Cloudera VM, you can use
the following commands:
wget https://launchpadlibrarian.net/24493586/employees_db-full-1.0.6.tar.bz2 bzip2 -d employees_db-full-1.0.6.tar.bz2 cd employees_db mysql -uroot -pcloudera < employees.sql
Hadoop Data
Once the data is in MySQL, getting it copied into Hadoop is simple with Sqoop:
sqoop import-all-tables \ --connect jdbc:mysql://localhost/employees \ --username root \ --password cloudera \ --warehouse-dir /example/employees \ --fields-terminated-by "\t"
The above Sqoop command imports all the tables from the
employees
database into HDFS using tab delimiters.
With the data in both MySQL and HDFS, we’re ready to get
started.
Example Scripts
The examples shown below can be downloaded here. This .ZIP file contains the SQL, Pig and Spark examples in separate text files for easy copy-and-paste.
Basic data retrieval
This is the most fundamental data processing funtion: retrieving data from storage and printing it out in full:
MySQL
mysql> select * from departments; +---------+--------------------+ | dept_no | dept_name | +---------+--------------------+ | d009 | Customer Service | | d005 | Development | | d002 | Finance | | d003 | Human Resources | | d001 | Marketing | | d004 | Production | | d006 | Quality Management | | d008 | Research | | d007 | Sales | +---------+--------------------+
Pig
To do the same thing in Pig, you need to issue multiple statements. Pig is procedural – while SQL describes the output you’d like to see, a Pig script walks through the steps required to produce it. While the underlying engines are naturally different, I like having to think through the steps that a query engine has to execute in order to process a query. Pig makes you do that to a large extent, while MySQL hides those steps.
It’s worth noting that in both Pig and Spark examples shown, I’ve left out extensive logging messages which get displayed during execution. I originally used ellipses to indicate what I cut, but that’s also the continuation prompt for multi-line Spark statements in the pyspark shell, so I’ve just left a blank line instead.
grunt> A = LOAD '/example/employees/departments' >> USING PigStorage('\t'); grunt> DUMP A; (d007 Sales) (d008 Research) (d009 Customer Service) (d005 Development) (d006 Quality Management) (d003 Human Resources) (d004 Production) (d001 Marketing) (d002 Finance)
Spark
In my examples, I’ll be using Python, though Scala is an equally viable alternative. Those new to Python may need an explanation of lambda functions, but a simple working definition is that they are anonymous functions where the argument is the identifier after “lambda”. Like Pig, multiple statements are used to execute the operation:
Using Python version 2.6.6 (r266:84292, Feb 22 2013 00:00:18) SparkContext available as sc, HiveContext available as sqlContext. >>> dept_files = "/example/employees/departments/" >>> results = sc.textFile(dept_files) \ ... .map(lambda line: line.split('\t')) \ ... .collect() >>> for dept in results: print dept ... [u'd001', u'Marketing'] [u'd002', u'Finance'] [u'd003', u'Human Resources'] [u'd004', u'Production'] [u'd005', u'Development'] [u'd006', u'Quality Management'] [u'd007', u'Sales'] [u'd008', u'Research'] [u'd009', u'Customer Service']
It’s certainly possible to express the above Spark script as a single chained command – I think it’s a bit more readable when broken up.
Projections
We often need to limit the columns returned from a command, and this is done as a projection. The following example collection returns the same rows as the first, but only the department name (not ID):
MySQL
mysql> select dept_name from departments; +--------------------+ | dept_name | +--------------------+ | Customer Service | | Development | | Finance | | Human Resources | | Marketing | | Production | | Quality Management | | Research | | Sales | +--------------------+
Pig
This is a good example of Hadoop schema-on-read – we define the data columns when we do the LOAD statement. The previous example simply broke the columns apart by the tab separator; this example defines the column name and type, making subsequent reference easier.
grunt> A = LOAD '/example/employees/departments' >> USING PigStorage('\t') >> AS (deptid:chararray, dept:chararray); grunt> B = FOREACH A GENERATE dept; grunt> DUMP B; (Sales) (Research) (Customer Service) (Development) (Quality Management) (Human Resources) (Production) (Marketing) (Finance)
Spark
Unlike the Pig script above, there are no column aliases. This results in lists, which can get nested with operations like joins (shown later).
>>> dept_files = "/example/employees/departments/" >>> results = sc.textFile(dept_files) \ ... .map(lambda line: line.split('\t')) \ ... .map(lambda depts: depts[1]) \ ... .collect() >>> for dept in results: print dept ... Marketing Finance Human Resources Production Development Quality Management Sales Research Customer Service
Filtering
Just like projections eliminate columns we don’t need, filtering eliminates rows that don’t match specific criteria.
MySQL
mysql> SELECT * FROM departments -> WHERE dept_name NOT LIKE '%a%'; +---------+------------------+ | dept_no | dept_name | +---------+------------------+ | d009 | Customer Service | | d005 | Development | | d004 | Production | +---------+------------------+
Pig
grunt> A = LOAD '/example/employees/departments' >> USING PigStorage('\t') >> AS (deptid:chararray, dept:chararray); grunt> B = FILTER A BY NOT (dept MATCHES '.*a.*'); grunt> DUMP B; (d009,Customer Service) (d005,Development) (d004,Production)
Spark
>>> dept_files = "/example/employees/departments/" >>> results = sc.textFile(dept_files) \ ... .map(lambda line: line.split('\t')) \ ... .filter(lambda depts: "a" not in depts[1]) \ ... .collect() >>> for dept in results: print dept ... [u'd004', u'Production'] [u'd005', u'Development'] [u'd009', u'Customer Service']
Ordering and Limiting
Two concepts in one! Ordering and limiting result sets are often paired together to get answers to questions like, “what are the top 5 selling products?” This example set is a bit more contrived, asking the first five departments, alphabetically ranked by name.
MySQL
mysql> SELECT * FROM departments -> ORDER BY dept_name LIMIT 5; +---------+------------------+ | dept_no | dept_name | +---------+------------------+ | d009 | Customer Service | | d005 | Development | | d002 | Finance | | d003 | Human Resources | | d001 | Marketing | +---------+------------------+
Pig
grunt> A = LOAD '/example/employees/departments' >> USING PigStorage('\t') >> AS (deptid:chararray, dept:chararray); grunt> B = ORDER A BY dept; grunt> C = LIMIT B 5; grunt> DUMP C; (d009,Customer Service) (d005,Development) (d002,Finance) (d003,Human Resources) (d001,Marketing)
Spark
>>> dept_files = "/example/employees/departments/" >>> results = sc.textFile(dept_files) \ ... .map(lambda line: line.split('\t')) \ ... .takeOrdered(5, key=lambda dept: dept[1]) >>> for dept in results: print dept ... [u'd009', u'Customer Service'] [u'd005', u'Development'] [u'd002', u'Finance'] [u'd003', u'Human Resources'] [u'd001', u'Marketing']
Aggregation
Counting stuff! At its most basic, finding the distinct values for a given field and the number of times each value appears.
MySQL
mysql> SELECT COUNT(*), gender -> FROM employees -> GROUP BY gender; +----------+--------+ | COUNT(*) | gender | +----------+--------+ | 179973 | M | | 120051 | F | +----------+--------+
Pig
I sometimes lose track of how Pig transforms bags during aggregation or join operations – for example, it was originally difficult for me to remember that GROUP BY produces a bag with a key named “group”, and a value which is a list of bags matching the group value. Doing DESCRIBE B shows the structure of the B bag. I’ve left this useful debugging step in to demonstrate.
grunt> A = LOAD '/example/employees/employees' >> USING PigStorage('\t') >> AS ( >> empid:int, >> birth_date:chararray, >> first_name:chararray, >> last_name:chararray, >> gender:chararray, >> hire_date:chararray >> ); grunt> B = GROUP A BY gender; grunt> DESCRIBE B; B: {group: chararray,A: {(empid: int,birth_date: chararray,first_name: chararray,last_name: chararray,gender: chararray,hire_date: chararray)}} grunt> C = FOREACH B GENERATE group, COUNT(A); grunt> DUMP C; (F,120051) (M,179973)
Spark
>>> emp_files = "/example/employees/employees/" >>> results = sc.textFile(emp_files) \ ... .map(lambda line: (line.split('\t')[4])) \ ... .countByKey().items() >>> for gender in results: print gender ... (u'M', 179973) (u'F', 120051)
Joining Data Sets
We’re using data stored in relational formats here, but the same principles apply to less-structured, bigger data. Whether joining account data to web logs, or working from RDBMS-sourced data as shown here, combining data sets makes data powerful. In the example below, we’re looking for the 5 highest-paid current employees, and the department in which they work.
MySQL
mysql> SELECT e.first_name, e.last_name, d.dept_name, s.salary -> FROM employees e -> JOIN dept_emp de ON (e.emp_no = de.emp_no) -> JOIN departments d ON (de.dept_no = d.dept_no) -> JOIN salaries s ON (s.emp_no = e.emp_no) -> WHERE s.to_date = '9999-01-01' -> AND de.to_date = '9999-01-01' -> ORDER BY s.salary DESC -> LIMIT 5; +------------+-----------+-----------+--------+ | first_name | last_name | dept_name | salary | +------------+-----------+-----------+--------+ | Tokuyasu | Pesch | Sales | 158220 | | Honesty | Mukaidono | Sales | 156286 | | Xiahua | Whitcomb | Sales | 155709 | | Sanjai | Luders | Sales | 155513 | | Tsutomu | Alameldin | Sales | 155190 | +------------+-----------+-----------+--------+
Pig
grunt> EMP = LOAD '/example/employees/employees' >> USING PigStorage('\t') >> AS ( >> empid:int, >> birth_date:chararray, >> first_name:chararray, >> last_name:chararray, >> gender:chararray, >> hire_date:chararray >> ); grunt> DEPT = LOAD '/example/employees/departments' >> USING PigStorage('\t') >> AS (deptid:chararray, dept:chararray); grunt> grunt> DE = LOAD '/example/employees/dept_emp' >> USING PigStorage('\t') >> AS ( >> empid:int, >> deptid:chararray, >> from_date:chararray, >> to_date:chararray >> ); grunt> grunt> SAL = LOAD '/example/employees/salaries' >> USING PigStorage('\t') >> AS ( >> empid:int, >> salary:int, >> from_date:chararray, >> to_date:chararray >> ); grunt> SAL_CUR = FILTER SAL BY (to_date == '9999-01-01'); grunt> EMP_CUR = FILTER DE BY (to_date == '9999-01-01'); grunt> SAL_CUR_P = FOREACH SAL_CUR GENERATE empid, salary; grunt> EMP_CUR_P = FOREACH EMP_CUR GENERATE empid, deptid; grunt> EMP_P = FOREACH EMP GENERATE >> empid, >> first_name, >> last_name; grunt> EMP_P = FOREACH EMP GENERATE >> empid, >> first_name, >> last_name; grunt> EMP_SAL = JOIN SAL_CUR_P BY empid, EMP_P BY empid; grunt> DEPT_EMP = JOIN EMP_CUR_P BY deptid, DEPT BY deptid; grunt> DEPT_EMP_P = FOREACH DEPT_EMP GENERATE >> EMP_CUR_P::empid AS empid, >> DEPT::dept AS dept; grunt> EMP_SAL_P = FOREACH EMP_SAL GENERATE >> SAL_CUR_P::empid AS empid, >> SAL_CUR_P::salary AS salary, >> EMP_P::first_name AS first_name, >> EMP_P::last_name AS last_name; grunt> JOINED = JOIN EMP_SAL_P BY empid, DEPT_EMP_P BY empid; grunt> ORDERED = ORDER JOINED BY EMP_SAL_P::salary DESC; grunt> TOP_EARNERS = LIMIT ORDERED 5; grunt> RESULTS = FOREACH TOP_EARNERS GENERATE >> EMP_SAL_P::first_name AS first_name, >> EMP_SAL_P::last_name AS last_name, >> DEPT_EMP_P::dept AS dept, >> EMP_SAL_P::salary AS salary; grunt> DUMP RESULTS; (Tokuyasu,Pesch,Sales,158220) (Honesty,Mukaidono,Sales,156286) (Xiahua,Whitcomb,Sales,155709) (Sanjai,Luders,Sales,155513) (Tsutomu,Alameldin,Sales,155190)
Spark
This is where the multi-dimensional arrays referenced earlier are demonstrated. It can be a bit of a headache to remember how the RDDs are constructed from previous transformations – using RDD.take(1) to print out a sample can prove useful in determining the organization, in the same way DESCRIBE <bag> was used earlier for Pig.
>>> dept_files = "/example/employees/departments/" >>> de_files = "/example/employees/dept_emp/" >>> emp_files = "/example/employees/employees/" >>> salary_files = "/example/employees/salaries/" >>> >>> current_salaries = sc.textFile(salary_files) \ ... .map(lambda line: line.split('\t')) \ ... .filter(lambda row: row[3] == '9999-01-01') \ ... .map(lambda row: (int(row[0]), int(row[1]))) >>> employees = sc.textFile(emp_files) \ ... .map(lambda line: line.split('\t')) \ ... .map(lambda row: (int(row[0]), (row[2], row[3]))) \ ... .join(current_salaries) ... departments = sc.textFile(dept_files) \ ... .map(lambda line: line.split('\t')) >>> dept_emps = sc.textFile(de_files) \ ... .map(lambda line: line.split('\t')) \ ... .filter(lambda row: row[3] == '9999-01-01') \ ... .map(lambda row: (row[1], int(row[0]))) \ ... .join(departments) \ ... .values() >>> results = dept_emps.join(employees) \ ... .takeOrdered(5, key=lambda row: -row[1][1][1]) >>> for emp in results: ... print "%15s %15s %15s %15d" % \ ... (emp[1][1][0][0], \ ... emp[1][1][0][1], \ ... emp[1][0], \ ... emp[1][1][1]) ... Tokuyasu Pesch Sales 158220 Honesty Mukaidono Sales 156286 Xiahua Whitcomb Sales 155709 Sanjai Luders Sales 155513 Tsutomu Alameldin Sales 155190
Aggregated Joins
This is a little more involved than counting – we’re joining different data sources to compute the average salary of current employees by department.
MySQL
mysql> SELECT d.dept_name, AVG(s.salary) -> FROM departments d -> JOIN dept_emp de ON (d.dept_no = de.dept_no) -> JOIN salaries s ON (s.emp_no = de.emp_no) -> WHERE s.to_date = '9999-01-01' -> AND de.to_date = '9999-01-01' -> GROUP BY d.dept_name; +--------------------+---------------+ | dept_name | AVG(s.salary) | +--------------------+---------------+ | Customer Service | 67285.2302 | | Development | 67657.9196 | | Finance | 78559.9370 | | Human Resources | 63921.8998 | | Marketing | 80058.8488 | | Production | 67843.3020 | | Quality Management | 65441.9934 | | Research | 67913.3750 | | Sales | 88852.9695 | +--------------------+---------------+
Pig
grunt> DEPT = LOAD '/example/employees/departments' >> USING PigStorage('\t') >> AS (deptid:chararray, dept:chararray); grunt> DE = LOAD '/example/employees/dept_emp' >> USING PigStorage('\t') >> AS ( >> empid:int, >> deptid:chararray, >> from_date:chararray, >> to_date:chararray >> ); grunt> SAL = LOAD '/example/employees/salaries' >> USING PigStorage('\t') >> AS ( >> empid:int, >> salary:int, >> from_date:chararray, >> to_date:chararray >> ); grunt> SAL_CUR = FILTER SAL BY (to_date == '9999-01-01'); grunt> EMP_CUR = FILTER DE BY (to_date == '9999-01-01'); grunt> SAL_CUR_P = FOREACH SAL_CUR GENERATE empid, salary; grunt> EMP_CUR_P = FOREACH EMP_CUR GENERATE empid, deptid; grunt> DEPT_EMP = JOIN EMP_CUR_P BY deptid, DEPT BY deptid; grunt> DEPT_EMP_P = FOREACH DEPT_EMP GENERATE >> EMP_CUR_P::empid AS empid, >> DEPT::dept AS dept; grunt> JOINED = JOIN SAL_CUR_P BY empid, DEPT_EMP_P BY empid; grunt> GROUPED = GROUP JOINED BY DEPT_EMP_P::dept; grunt> GROUPED = GROUP JOINED BY DEPT_EMP_P::dept; grunt> RESULTS = FOREACH GROUPED GENERATE >> group AS department, >> AVG(JOINED.SAL_CUR_P::salary) AS avg_salary; grunt> DUMP RESULTS; (Sales,88852.96947030583) (Finance,78559.93696228994) (Research,67913.374975714) (Marketing,80058.84880743835) (Production,67843.30198484166) (Development,67657.91955820545) (Human Resources,63921.89982943092) (Customer Service,67285.2301781547) (Quality Management,65441.99340024749)
Spark
In the following example, I defined a function avg(), which takes a list as an argument and calculated the average of the values in the list. This highlights how easy it is to write transformation functions – it would be relatively easy to calculate the median value as well (something that’s far harder in MySQL).
>>> dept_files = "/example/employees/departments/" >>> de_files = "/example/employees/dept_emp/" >>> salary_files = "/example/employees/salaries/" >>> current_salaries = sc.textFile(salary_files) \ ... .map(lambda line: line.split('\t')) \ ... .filter(lambda row: row[3] == '9999-01-01') \ ... .map(lambda row: (int(row[0]), int(row[1]))) >>> departments = sc.textFile(dept_files) \ ... .map(lambda line: line.split('\t')) >>> dept_emps = sc.textFile(de_files) \ ... .map(lambda line: line.split('\t')) \ ... .filter(lambda row: row[3] == '9999-01-01') \ ... .map(lambda row: (row[1], int(row[0]))) \ ... .join(departments) \ ... .values() >>> def avg(l): ... return float(sum(l))/len(l) ... >>> result = dept_emps.join(current_salaries) \ ... .map(lambda data: (data[1][0], data[1][1])) \ ... .groupByKey() \ ... .map(lambda r: (r[0], avg(r[1]))) \ ... .sortBy(lambda row: row[0]) \ ... .collect() >>> for row in result: ... print "%20s %5.4f" % row ... Customer Service 67285.2302 Development 67657.9196 Finance 78559.9370 Human Resources 63921.8998 Marketing 80058.8488 Production 67843.3020 Quality Management 65441.9934 Research 67913.3750 Sales 88852.9695
Conclusion
These are just some basic examples to illustrate core expressions in SQL, Pig and Spark. It helps me to see Pig and Spark alongside the same operation expressed in SQL, and provides a reference point to a language I’m very familiar with. Maybe others will find it handy as well.