Apache Pig : Hadoop Notes

HOW TO RUN PIG PROGRAMS - EXAMPLES

Pig programs can be run in three methods which work in both local and MapReduce mode. They are

  • Script Mode
  • Grunt Mode
  • Embedded Mode
Let see each mode in detail

Script Mode or Batch Mode: In script mode, pig runs the commands specified in a script file. The following example shows how to run a pig programs from a script file:
> cat scriptfile.pig
A = LOAD 'script_file';
DUMP A;
> pig scriptfile.pig

(pig script mode example)
(pig runs on top of hadoop)

Grunt Mode or Interactive Mode: The grunt mode can also be called as interactive mode. Grunt is pig's interactive shell. It is started when no file is specified for pig to run.
> pig
grunt> A = LOAD 'grunt_file';
grunt> DUMP A;

(pig grunt or interactive mode example)
(pig runs on top of hadoop)

You can also run pig scripts from grunt using run and exec commands.
grunt> run scriptfile.pig
grunt> exec scriptfile.pig

Embedded Mode: You can embed pig programs in java and can run from java.

CREATING SCHEMA, READING AND WRITING DATA - PIG TUTORIAL

The first step in processing a data set using pig is to define a schema for the data set. A schema is a representation of the data set in terms of fields. Let see how to define a schema with an example.

Consider the following products data set in Hadoop as an example:
10, iphone,  1000
20, samsung, 2000
30, nokia,   3000

Here first field is the product id, second field is the product name and third field is the product price.

Defining Schema

LOAD operator

The LOAD operator is used to define a schema for a data set. Let see different usages of the LOAD operator for defining the schema for the above dataset.

1. Creating Schema without specifying any fields.

In this method, we don't specify any field names for creating the schema. An example is shown below:
grunt> A = LOAD '/user/hadoop/products';

Pig is a data flow language. Each operational statement in pig consists of a relation and an operation. The left side of the statement is called relation and the right side is called the operation. Pig statements must terminated with a semicolon. Here A is a relation. /user/hadoop/products is the file in the hadoop.

DESCRIBE

To view the schema of a relation, use the describe statement which is shown below:
grunt> describe A;
Schema for A unknown.

As there are no fields are defined, the above describe statement on A shows that "Schema for A unkown". To display the contents on the console use the DUMP operator.
grunt> DUMP A;
(10,iphone,1000)
(20,samsung,2000)
(30,nokia,3000)

STORE operator
To write the data set into HDFS, use the STORE operator as shown below
grunt> STORE A INTO 'hadoop directory name'

2. Defining schema without specifying any data types.

We can create a schema just by specifying the field names without any data types. An example is shown below:
grunt> A = LOAD '/user/hadoop/products' USING PigStorage(',') AS (id, product_name, price);

grunt> describe A;
A: {id: bytearray,product_name: bytearray,price: bytearray}

grunt> STORE A into '/user/hadoop/products' USING PigStorage('|'); --Writes data with pipe as delimiter into hdfs product directory.

The PigStorge is used to specify the field delimiter. The default field delimiter is tab. If your data is a tab separated, then you can ignore the USING PigStorage keywords. In the STORE operation, you can use the PigStorage class for specifying the output separator.

You have to specify the field names in the 'AS' clause. As we didn't specified any data type, by default pig assigned bytearray as the data type for the fields.

3. Defining schema with field names and data types.

To specify the data type use the colon. Take a look at the below example:
grunt> A = LOAD '/user/hadoop/products' USING PigStorage(',') AS (id:int, product_name:chararray, price:int);

grunt> describe A;
A: {id: int,product_name: chararray,price: int}

Accessing the Fields

So far, we have seen how to define a schema, how to print the contents of the data on the console and how to write data to hdfs. Now we will see how to access the fields.

The fields can be accessed in two ways:

  • Field Names: We can specify the field name to access the values from that particular value.
  • Positional Parameters: The field positions start from 0 to n. $0 indicates first field, $1 indicates second field.

Example:
grunt> A = LOAD '/user/products/products' USING PigStorage(',') AS (id:int, product_name:chararray, price:int);
grunt> B = FOREACH A GENERATE id;
grunt> C = FOREACH A GENERATE $1,$2;
grunt> DUMP B;
(10)
(20)
(30)
grunt> DUMP C;
(iphone,1000)
(samsung,2000)
(nokia,3000)

FOREACH is like a for loop used to iterate over the records of a relation. The GENERATE keyword specifies what operation to do on the record. In the above example, the GENERATE is used to get the fields from the relation A.

Note: It is always good practice to see the schema of a relation using the describe statement before performing a operation. By knowing the schema, you will know how to access the fields in the schema. 

HOW TO FILTER RECORDS - PIG TUTORIAL EXAMPLES

Pig allows you to remove unwanted records based on a condition. The Filter functionality is similar to the WHERE clause in SQL. The FILTER operator in pig is used to remove unwanted records from the data file. The syntax of FILTER operator is shown below:
 = FILTER  BY 

Here relation is the data set on which the filter is applied, condition is the filter condition and new relation is the relation created after filtering the rows.

Pig Filter Examples:

Lets consider the below sales data set as an example
year,product,quantity
---------------------
2000, iphone, 1000
2001, iphone, 1500 
2002, iphone, 2000
2000, nokia,  1200
2001, nokia,  1500
2002, nokia,  900

1. select products whose quantity is greater than or equal to 1000.
grunt> A = LOAD '/user/hadoop/sales' USING PigStorage(',') AS (year:int,product:chararray,quantity:int);
grunt> B = FILTER A BY quantity >= 1000;
grunt> DUMP B;
(2000,iphone,1000)
(2001,iphone,1500)
(2002,iphone,2000)
(2000,nokia,1200)
(2001,nokia,1500)

2. select products whose quantity is greater than 1000 and year is 2001
grunt> C = FILTER A BY quantity > 1000 AND year == 2001;
(2001,iphone,1500)
(2001,nokia,1500)

3. select products with year not in 2000
grunt> D = FILTER A BY year != 2000;
grunt> DUMP D;
(2001,iphone,1500)
(2002,iphone,2000)
(2001,nokia,1500)
(2002,nokia,900)

You can use all the logical operators (NOT, AND, OR) and relational operators (< , >, ==, !=, >=, <= ) in the filter conditions. 

WORD COUNT EXAMPLE - PIG SCRIPT

Q) How to find the number of occurrences of the words in a file using the pig script?

You can find the famous word count example written in map reduce programs in apache website. Here we will write a simple pig script for the word count problem.

The following pig script finds the number of times a word repeated in  a file:

Word Count Example Using Pig Script:

lines = LOAD '/user/hadoop/HDFS_File.txt' AS (line:chararray);
words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) as word;
grouped = GROUP words BY word;
wordcount = FOREACH grouped GENERATE group, COUNT(words);
DUMP wordcount;

The above pig script, first splits each line into words using the TOKENIZE operator. The tokenize function creates a bag of words. Using the FLATTEN function, the bag is converted into a tuple. In the third statement, the words are grouped together so that the count can be computed which is done in fourth statement.

You can see just with 5 lines of pig program, we have solved the word count problem very easily.

Please comment here if you have any problems in running the above pig script and you are not getting expected word count. 

RELATIONS, BAGS, TUPLES, FIELDS

In this article, we will see what is a relation, bag, tuple and field. Let see each one of these in detail.

Lets consider the following products dataset as an example:

Id, MobileService_name
-----------------------
101, Airtel
102, Tata
103, Relience

  • Field: A field is a piece of data. In the above data set product_name is a field. 
  • Tuple: A tuple is a set of fields. Here Id and product_name form a tuple. Tuples are represented by braces. Example: (101, Airtel). 
  • Bag: A bag is collection of tuples. Bag is represented by flower braces. Example: {(101,Airtel),(102, Tata),(103,Relience)}. 
  • Relation: Relation represents the complete database. A relation is a bag. To be precise relation is an outer bag. We can call a relation as a bag of tuples.
To compare with RDBMS, a relation is a table, where as the tuples in the bag corresponds to the rows in the table. Note that tuples in pig doesn't require to contain same number of fields and fields in the same position have the same data type. 

PIG DATA TYPES - PRIMITIVE AND COMPLEX

Pig has a very limited set of data types. Pig data types are classified into two types. They are:
  • Primitive
  • Complex

Primitive Data Types: The primitive datatypes are also called as simple datatypes. The simple data types that pig supports are:
  • int : It is signed 32 bit integer. This is similar to the Integer in java.
  • long : It is a 64 bit signed integer. This is similar to the Long in java.
  • float : It is a 32 bit floating point. This data type is similar to the Float in java.
  • double : It is a 63 bit floating pint. This data type is similar to the Double in java.
  • chararray : It is character array in unicode UTF-8 format. This corresponds to java's String object.
  • bytearray : Used to represent bytes. It is the default data type. If you don't specify a data type for a filed, then bytearray datatype is assigned for the field.
  • boolean : to represent true/false values.

Complex Types: Pig supports three complex data types. They are listed below:
  • Tuple : An ordered set of fields. Tuple is represented by braces. Example: (1,2)
  • Bag : A set of tuples is called a bag. Bag is represented by flower or curly braces. Example: {(1,2),(3,4)}
  • Map : A set of key value pairs. Map is represented in a square brackets. Example: [key#value] . The # is used to separate key and value.

Pig allows nesting of complex data structures. Example: You can nest a tuple inside a tuple, bag and a Map

Null: Null is not a datatype. Null is an undefined value or corrupted value. Example: Let say you have declared a field as int type. However that field contains character values. When reading data from this field, pig converts those character values(corrupted) values into Nulls. Any operation with Null results in Null. The Null in pig is similar to the Null in SQL.

A Simple Explanation of COGROUP in Apache Pig


The COGROUP command in Apache Pig is somewhat confusing because it is sort of both a GROUP and a JOIN.

COGROUP One Table

In its simplest form, COGROUP is exactly the same as GROUP. It groups rows based on a column, and creates bags for each group.
For example, assume we have a data set of animal owners:
$ cat > owners.csv
adam,cat
adam,dog
alex,fish
alice,cat
steve,dog
We could COGROUP on animal using the Pig code:
owners = LOAD 'owners.csv' 
    USING PigStorage(',')
    AS (owner:chararray,animal:chararray);

grouped = COGROUP owners BY animal;
DUMP grouped;
This returns a list of animals. For each animal, Pig groups the matching rows into bags. The resulting table grouped is:
groupowners
cat{(adam,cat),(alice,cat)}
dog{(adam,dog),(steve,dog)}
fish{(alex,fish)}

COGROUP Two Tables

Where COGROUP gets fancy is that you can COGROUP on two tables at once. Pig will group the two tables and then join the two tables on the grouped column. For example, assume we also had a data set of pet names:
$ cat > pets.csv
nemo,fish
fido,dog
rex,dog
paws,cat
wiskers,cat
Given this table, we could compare for example all the people with a given animal to all the names of that animal. The COGROUP command is:
owners = LOAD 'owners.csv' 
    USING PigStorage(',')
    AS (owner:chararray,animal:chararray);

pets = LOAD 'pets.csv' 
    USING PigStorage(',')
    AS (name:chararray,animal:chararray);

grouped = COGROUP owners BY animal, pets by animal;
DUMP grouped;
This will group each table based on the animal column. For each animal, it will create a bag of matching rows from both tables. For this example, we get:
groupownerspets
cat{(adam,cat),(alice,cat)}{(paws,cat),(wiskers,cat)}
dog{(adam,dog),(steve,dog)}{(fido,dog),(rex,dog)}
fish{(alex,fish)}{(nemo,fish)}
In summary, you can use COGROUP when you need to group two tables by a column and then join on the grouped column.
I’ve been doing a fair amount of helping people get started with Apache Pig. One common stumbling block is the GROUP operator. Although familiar, as it serves a similar function to SQL’s GROUP operator, it is just different enough in the Pig Latin language to be confusing. Hopefully this brief post will shed some light on what exactly is going on.

Basic Usage

Let us start by loading up some data:
1
2
my_data = LOAD '/data/customers' using PigStorage()
  as (name:chararray, age:int, eye_color:chararray, height:int);
If we want to compute some aggregates from this data, we might want to group the rows into buckets over which we will run the aggregate functions:
1
2
by_age = GROUP my_data BY age;
by_age_and_color = GROUP my_data BY (age, eye_color);

Resulting Schema

When you group a relation, the result is a new relation with two columns: “group” and the name of the original relation. The group column has the schema of what you grouped by. If you grouped by an integer column, for example, as in the first example, the type will be int. If you grouped by a tuple of several columns, as in the second example, the “group” column will be a tuple with two fields, “age” and “eye_color”.
They can be retrieved by flattening “group”, or by directly accessing them: “group.age, group.eye_color”:
1
2
3
4
5
-- using FLATTEN
age_and_color = FOREACH by_age_and_color GENERATE FLATTEN(group) as (age, color);
 
-- or using explicit projections
age_and_ccolor = FOREACH by_age_and_color GENERATE group.age, group.color;
Note that using the FLATTEN operator is preferable since it allows algebraic optimizations to work — but that’s a subject for another post.
The second column will be named after the original relation, and contain a bag of all the rows in the original relation that match the corresponding group. The rows are unaltered — they are the same as they were in the original table that you grouped.
As a side note, Pig also provides a handy operator called COGROUP, which essentially performs a join and a group at the same time. The syntax is as follows:
1
cogrouped_data = COGROUP data1 on id, data2 on user_id;
The resulting schema will be the group as described above, followed by two columns — data1 and data2, each containing bags of tuples with the given group key. This is very useful if you intend to join and group on the same key, as it saves you a whole Map-Reduce stage.

Processing the results

To work on the results of the group operator, you will want to use a FOREACH. This is a simple loop construct that works on a relation one row at a time. You can apply it to any relation, but it’s most frequently used on results of grouping, as it allows you to apply aggregation functions to the collected bags.
Referring to somebag.some_field in a FOREACH operator essentially means “for each tuple in the bag, give me some_field in that tuple”. So you can do things like
1
2
3
4
age_counts = FOREACH by_age GENERATE
  group as age,  -- the key you grouped on
 COUNT(my_data), -- the number of people with this age
 MAX(my_data.height); -- the maximum height of people with this age
Note that all the functions in this example are aggregates. That’s because they are things we can do to a collection of values. Folks sometimes try to apply single-item operations in a foreach — like transforming strings or checking for specific values of a field. Remember, my_data.height doesn’t give you a single height element — it gives you all the heights of all people in a given age group.

Multiple dimensions

It is common to need counts by multiple dimensions; in our running example, we might want to get not just the maximum or the average height of all people in a given age category, but also the number of people in each age category with a certain eye color. There are a few ways two achieve this, depending on how you want to lay out the results.
The simplest is to just group by both age and eye color:
1
2
3
4
5
6
7
by_age_color = GROUP my_data BY (age, eye_color);
 
-- count colors separately
by_age_color_counts = FOREACH by_age_color GENERATE
    FLATTEN(group) AS (age, eye_color),
    AVG(my_data.height) as age_color_height_avg,
    COUNT(my_data) AS age_color_count;
From there, you can group by_age_color_counts again and get your by-age statistics.
If you have a set list of eye colors, and you want the eye color counts to be columns in the resulting table, you can do the following:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- break out the counts
my_data = FOREACH my_data
               GENERATE name, age, height,
               (eye_color == 'brown' ? 1 : 0) AS brown_eyes,
               (eye_color == 'blue'  ? 1 : 0) AS blue_eyes,
               (eye_color = 'green' ? 1 : 0 ) AS green_eyes;
 
-- group and generate
by_age = group my_data by age;
final_data = FOREACH by_age GENERATE
    group as age,
    COUNT(my_data) as num_people,
    AVG(my_data.height) as avg_height,
    SUM(brown_eyes) as num_brown_eyes,
    SUM(blue_eyes) as num_blue_eyes,
    SUM(green_eyes) as num_green_eyes;

Advanced Topics

A few notes on more advanced topics, which perhaps should warrant a more extensive treatment in a separate post.
The GROUP operator in Pig is a ‘blocking’ operator, and forces a Hdoop Map-Reduce job. All the data is shuffled, so that rows in different partitions (or “slices”, if you prefer the pre-Pig 0.7 terminology) that have the same grouping key wind up together. Therefore, grouping has non-trivial overhead, unlike operations like filtering or projecting. Consider this when putting together your pipelines.
If you need to calculate statistics on multiple different groupings of the data, it behooves one to take advantage of Pig’s multi-store optimization, wherein it will find opportunities to share work between multiple calculations.
When groups grow too large, they can cause significant memory issues on reducers; they can lead to hot spots, and all kinds of other badness. Look up algebraic and accumulative EvalFunc interfaces in the Pig documentation, and try to use them to avoid this problem when possible. Check the execution plan (using the ‘explain” command) to make sure the algebraic and accumulative optimizations are used.
Pig 0.7 introduces an option to group on the map side, which you can invoke when you know that all of your keys are guaranteed to be on the same partition. Consider it when this condition applies.

Cleanup()

So there you have it, a somewhat ill-structured brain dump about the GROUP operator in Pig. I hope it helps folks — if something is confusing, please let me know in the comments!
Project for PIG:http://blog.cloudera.com/blog/2012/08/process-a-million-songs-with-apache-pig/

Comments

Popular posts from this blog

SharePoint 2007 - Simple Task Dashboard

MERGE transformation in SSIS