Wednesday, August 20, 2014

Apache Pig - Hadoop Fast Track

Apache Pig is a tool used to analyze large amounts of data by represeting them as data flows. Using the PigLatin scripting language operations like ETL (Extract, Transform and Load), adhoc data anlaysis and iterative processing can be easily achieved.
Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done. Pig was built to make programming MapReduce applications easier. Before Pig, Java was the only way to process the data stored on HDFS.
Pig was first built in Yahoo! and later became a top level Apache project. In this series of we will walk through the different features of pig using a sample dataset.

Dataset

The dataset that we are using here is from one of my projects called Flicksery. Flicksery is a Netflix Search Engine. The dataset is a simple text (movies_data.csv) file lists movie names and its details like release year, rating and runtime.
A sample of the dataset is as follows:
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1932,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1991,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1985,3.8,5333
7,Muriel's Wedding,1994,3.5,6323
8,Mother's Boys,1994,3.4,5733
9,Nosferatu: Original Version,1929,3.5,5651
10,Nick of Time,1995,3.4,5333
All code and data for this post can be downloaded from github. The file has a total of 49590 records.

Installing Pig

Download Pig
$ wget http://mirror.symnds.com/software/Apache/pig/pig-0.12.0/pig-0.12.0.tar.gz
Untar
$ tar xvzf pig-0.12.0.tar.gz
Rename to folder for easier access:
$ mv pig-0.12.0 pig
Update .bashrc to add the following:
export PATH=$PATH:/home/hduser/pig/bin
Pig can be started in one of the following two modes:
  1. Local Mode
  2. Cluster Mode
Using the ’-x local’ options starts pig in the local mode whereas executing the pig command without any options starts in Pig in the cluster mode. When in local mode, pig can access files on the local file system. In cluster mode, pig can access files on HDFS.
Restart your terminal and execute the pig command as follows:
To start in Local Mode:
$ pig -x local
2013-12-25 20:16:26,258 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0 (r1529718) compiled Oct 07 2013, 12:20:14
2013-12-25 20:16:26,259 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hduser/pig/myscripts/pig_1388027786256.log
2013-12-25 20:16:26,281 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hduser/.pigbootup not found
2013-12-25 20:16:26,381 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt>
To start in Cluster Mode:
$ pig
2013-12-25 20:19:42,274 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0 (r1529718) compiled Oct 07 2013, 12:20:14
2013-12-25 20:19:42,274 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hduser/pig/myscripts/pig_1388027982272.log
2013-12-25 20:19:42,300 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hduser/.pigbootup not found
2013-12-25 20:19:42,463 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:54310
2013-12-25 20:19:42,672 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: hdfs://localhost:9001
grunt>
This command presents you with a grunt shell. The grunt shell allows you to execute PigLatin statements to quickly test out data flows on your data step by step without having to execute complete scripts. Pig is now installed and we can go ahead and start using Pig to play with data.

Pig Latin

To learn Pig Latin, let’s question the data. Before we start asking questions, we need the data to be accessible in Pig.
Use the following command to load the data:
grunt> movies = LOAD '/home/hduser/pig/myscripts/movies_data.csv' USING PigStorage(',') as (id,name,year,rating,duration);
The above statement is made up of two parts. The part to the left of “=” is called the relation or alias. It looks like a variable but you should note that this is not a variable. When this statement is executed, no MapReduce task is executed.
Since our dataset has records with fields separated by a comma we use the keyword USING PigStorage(‘,’).
Another thing we have done in the above statement is giving the names to the fields using the ‘as’ keyword.
Now, let’s test to see if the alias has the data we loaded.
grunt> DUMP movies;
Once, you execute the above statement, you should see lot of text on the screen (partial text shown below).
2013-12-25 23:03:04,550 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
2013-12-25 23:03:04,633 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
2013-12-25 23:03:04,748 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2013-12-25 23:03:04,805 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2013-12-25 23:03:04,805 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2013-12-25 23:03:04,853 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job

................

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.1.2 0.12.0 hduser 2013-12-25 23:03:04 2013-12-25 23:03:05 UNKNOWN

Success!

Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0001 movies MAP_ONLY file:/tmp/temp-1685410826/tmp1113990343,

Input(s):
Successfully read records from: "/home/hduser/pig/myscripts/movies_data.csv"

Output(s):
Successfully stored records in: "file:/tmp/temp-1685410826/tmp1113990343"

Job DAG:
job_local_0001

................

(49586,Winter Wonderland,2013,2.8,1812)
(49587,Top Gear: Series 19: Africa Special,2013,,6822)
(49588,Fireplace For Your Home: Crackling Fireplace with Music,2010,,3610)
(49589,Kate Plus Ei8ht,2010,2.7,)
(49590,Kate Plus Ei8ht: Season 1,2010,2.7,)
It is only after the DUMP statement that a MapReduce job is initiated. As we see our data in the output we can confirm that the data has been loaded successfully.
Now, since we have the data in Pig, let’s start with the questions.
List the movies that having a rating greater than 4
grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;
grunt> DUMP movies_greater_than_four;
The above statements filters the alias movies and store the results in a new aliasmovies_greater_than_four. The movies_greater_than_four alias will have only records of movies where the rating is greater than 4.
The DUMP command is only used to display information onto the standard output. If you need to store the data to a file you can use the following command:
grunt> store movies_greater_than_four into '/user/hduser/movies_greater_than_four';
In this post we got a good feel of Apache Pig. We loaded some data and executed some basic commands to query it. The next post will dive deeper into Pig Latin where we will learn some advanced techniques to do data analysis.

grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;
Here, we see a (float) keyword placed before the column ‘rating’. This is done to tell Pig that the column we are working on is of type, float. Pig was not informed about the type of the column when the data was loaded.
Following is the command we used to load the data:
grunt> movies = LOAD '/home/hduser/pig/myscripts/movies_data.csv' USING PigStorage(',') as (id,name,year,rating,duration);
The load command specified only the column names. We can modify the statement as follows to include the data type of the columns:
grunt> movies = LOAD '/home/hduser/pig/myscripts/movies_data.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
In the above statement, name is chararray (string), rating is a double and fields id, year and duration are integers. If the data was loaded using the above statement we would not need to cast the column during filtering.
The datatypes used in the above statement are called scalar data types. The other scalar types are long, double and bytearray.
To get better at using filters, let’s ask the data a few more questions:
List the movies that were released between 1950 and 1960
grunt> movies_between_50_60 = FILTER movies by year>1950 and year<1960; List the movies that start with the Alpahbet A grunt> movies_starting_with_A = FILTER movies by name matches 'A.*';
List the movies that have duration greater that 2 hours
grunt> movies_duration_2_hrs = FILTER movies by duration > 3600;
List the movies that have rating between 3 and 4
grunt> movies_rating_3_4 = FILTER movies BY rating>3.0 and rating<4.0; DESCRIBE The schema of a relation/alias can be viewed using the DESCRIBE command: grunt> DESCRIBE movies;
movies: {id: int,name: chararray,year: int,rating: double,duration: int}
ILLUSTRATE
To view the step-by-step execution of a sequence of statements you can use the ILLUSTRATE command:
grunt> ILLUSTRATE movies_duration_2_hrs;

------------------------------------------------------------------------------------------------------------------------
| movies | id:int | name:chararray | year:int | rating:double | duration:int |
------------------------------------------------------------------------------------------------------------------------
| | 1567 | Barney: Sing & Dance with Barney | 2004 | 2.7 | 3244 |
| | 3045 | Strange Circus | 2005 | 2.8 | 6509 |
------------------------------------------------------------------------------------------------------------------------

---------------------------------------------------------------------------------------------------------------------
| movies_duration_2_hrs | id:int | name:chararray | year:int | rating:double | duration:int |
---------------------------------------------------------------------------------------------------------------------
| | 3045 | Strange Circus | 2005 | 2.8 | 6509 |
---------------------------------------------------------------------------------------------------------------------
DESCRIBE and ILLUSTRATE are really useful for debugging.
Complex Types
Pig supports three different complex types to handle data. It is important that you understand these types properly as they will be used very often when working with data.
Tuples
A tuple is just like a row in a table. It is comma separated list of fields.
(49539,'The Magic Crystal',2013,3.7,4561)
The above tuple has five fields. A tuple is surrounded by brackets.
Bags
A bag is an unordered collection of tuples.
{ (49382, 'Final Offer'), (49385, 'Delete') }
The above bag is has two tuples. Each tuple has two fields, Id and movie name.
Maps
A map is a store. The key and value are joined together using #.
['name'#'The Magic Crystal', 'year'#2013]
The above map has two keys and name and year and have values ‘The Magic Crystal’ and 2013. The first value is a chararray and the second one is an integer.
We will be using the above complex type quite often in our future examples.
FOREACH
FOREACH gives a simple way to apply transformations based on columns. Let’s understand this with an example.
List the movie names its duration in minutes
grunt> movie_duration = FOREACH movies GENERATE name, (double)(duration/60);
The above statement generates a new alias that has the list of movies and it duration in minutes.
You can check the results using the DUMP command.
GROUP
The GROUP keyword is used to group fields in a relation.
List the years and the number of movies released each year.
grunt> grouped_by_year = group movies by year;
grunt> count_by_year = FOREACH grouped_by_year GENERATE group, COUNT(movies);
You can check the result by dumping the count_by_year relation on the screen.
We know in advance that the total number of movies in the dataset is 49590. We can check to see if our GROUP operation is correct by verify the total of the COUNT field. If he sum of of the count field is 49590 we can be confident that our grouping has worked correctly.
grunt> group_all = GROUP count_by_year ALL;
grunt> sum_all = FOREACH group_all GENERATE SUM(count_by_year.$1);
grunt> DUMP sum_all;
From the above three statements, the first statement, GROUP ALL, groups all the tuples to one group. This is very useful when we need to perform aggregation operations on the entire set.
The next statement, performs a FOREACH on the grouped relation group_all and applies the SUM function to the field in position 1 (positions start from 0). Here field in position 1, are the counts of movies for each year. One execution of the DUMP statement the MapReduce program kicks off and gives us the following result:
(49590)
The above value matches to our know fact that the dataset has 49590 movies. So we can conclude that our GROUP operation worked successfully.
ORDER BY
Let us question the data to illustrate the ORDER BY operation.
List all the movies in the ascending order of year.
grunt> desc_movies_by_year = ORDER movies BY year ASC;
grunt> DUMP desc_movies_by_year;
List all the movies in the descending order of year.
grunt> asc_movies_by_year = ORDER movies by year DESC;
grunt> DUMP asc_movies_by_year;
DISTINCT
The DISTINCT statement is used to remove duplicated records. It works only on entire records, not on individual fields.
Let’s illustrate this with an example:
grunt> movies_with_dups = LOAD '/home/hduser/pig/myscripts/movies_with_duplicates.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
grunt> DUMP movies_with_dups;

(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
(9,Nosferatu: Original Version,1929,3.5,5651)
You see that there are are duplicates in this data set. Now let us list the distinct records present movies_with_dups:
grunt> no_dups = DISTINCT movies_with_dups;
grunt> DUMP no_dups;

(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
LIMIT
Use the LIMIT keyword to get only a limited number for results from relation.
grunt> top_10_movies = LIMIT movies 10;
grunt> DUMP top_10_movies;

(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
SAMPLE
Use the sample keyword to get sample set from your data.
grunt> sample_10_percent = sample movies 0.1;
grunt> dump sample_10_percent;
Here, 0.1 = 10%
As we already know that the file has 49590 records. We can check to see the count of records in the relation.
grunt> sample_group_all = GROUP sample_10_percent ALL;
grunt> sample_count = FOREACH sample_group_all GENERATE COUNT(sample_10_percent.$0);
grunt> dump sample_count;
The output is (4937) which is approximately 10% for 49590.
In this post we have touched upon some important operations used in Pig. I suggest that you try out all the samples when you go through this tutorial as it is the doing that registers and not the reading. In the next post we will learn few more operations dealing with data transformation.
All code and data for this post can be downloaded from github.
http://www.hadoopscreencasts.com/

0 comments:

Popular Posts

Recent Posts

Unordered List

Text Widget

Blog Archive