Chapter 12. Pig and Other Members of the Hadoop Community

The community of applications that run on Hadoop has grown significantly as the adoption of Hadoop has increased. Many (but not all) of these applications are Apache projects. Some are quite similar in functionality. It can be confusing, especially for those new to Hadoop, to understand how these different applications integrate, complement, and overlap. In this chapter we will look at the different projects from a Pig perspective, focusing on how they complement, integrate, or compete with Pig.

Pig and Hive

Apache Hive provides a SQL layer on top of Hadoop. It takes SQL queries and translates them to MapReduce jobs, much in the same way that Pig translates Pig Latin. It stores data in tables and keeps metadata concerning those tables, such as partitions and schemas. Many view Pig and Hive as competitors. Since both provide a way for users to operate on data stored in Hadoop without writing Java code, this is a natural conclusion. However, as was discussed in “Comparing query and dataflow languages”, SQL and Pig Latin have different strengths and weaknesses. Because Hive provides SQL, it is a better tool for doing traditional data analytics. Most data analysts are already familiar with SQL, and business intelligence tools expect to speak to data sources in SQL. Pig Latin is a better choice when building a data pipeline or doing research on raw data.

Cascading

Another data-processing framework available for Hadoop is Cascading, available at http://www.cascading.org. The goal of Cascading is similar to Pig in that it enables users to build data flows on Hadoop. However, its approach differs significantly. Rather than presenting a new language, Cascading data flows are written in Java. A library of operators is provided so that users can string together data operators as well as implement their own. This allows users more control but requires more low-level coding.

NoSQL Databases

Over the last few years a number of NoSQL databases have arisen. These databases break one or more of the traditional rules of relational database systems. They do not expect data to be normalized. Instead, the data accessed by a single application lives in one large table so that few or no joins are necessary. Many of these databases do not implement full ACID semantics.[31]

Like MapReduce, these systems are built to manage terabytes of data. Unlike MapReduce, they are focused on random reads and writes of data. Where MapReduce and technologies built on top of it (such as Pig) are optimized for reading vast quantities of data very quickly, these NoSQL systems optimize for finding a few records very quickly. This different focus does not mean that Pig does not work with these systems. Users often want to analyze the data stored in these systems. Also, because these systems offer good random lookup, certain types of joins could benefit from having the data stored in these systems.

Two NoSQL databases have been integrated with Pig: HBase and Cassandra.

HBase

Apache HBase is a NoSQL database that uses HDFS to store its data. HBase presents its data to users in tables. Within each table, every row has a key. Reads in HBase are done by a key, a range of keys, or a bulk scan. Users can also update or insert individual rows by keys. In addition to a key, rows in HBase have column families, and all rows in a table share the same column families. Within each column family there are columns. There is no constraint that each row have the same columns as any other row in a given column family. Thus an HBase table T might have one column family F, which every row in that table would share, but a row with key x could have columns a, b, c in F, while another row with key y has columns a, b, d in F. Column values also have a version number. HBase keeps a configurable number of versions, so users can access the most recent version or previous versions of a column value. All keys and column values in HBase are arrays of bytes.

Pig provides HBaseStorage to read data from and write data to HBase tables. All these reads and writes are bulk operations. Bulk reads from HBase are slower than scans in HDFS. However, if the data is already in HBase, it is faster to read it directly than it is to extract it, place it in HDFS, and then read it.

When loading from HBase, you must tell Pig what table to read from and what column families and columns to read. You can read individual columns or, beginning in version 0.9, whole column families. Because column families contain a variable set of columns and their values, they must be cast to Pig’s map type. As an example, let’s say we have an HBase table users that stores information on users and their links to other users. It has two column families: user_info and links. The key for the table is the user ID. The user_info column family has columns such as name, email, etc. The links column family has a column for each user that the user is linked to. The column name is the linked user’s ID, and the value of these columns is the type of the link—friend, relation, colleague, etc.:

user_links = load 'hbase://users'
                 using org.apache.pig.backend.hadoop.hbase.HBaseStorage(
                 'user_info:name, links:*', '-loadKey true -gt 10000')
                 as (id, name:chararray, links:map[]);

The load location string is the HBase table name. The appropriate HBase client configuration must be present on your machine to allow the HBase client to determine how to connect to the HBase server. Two arguments are passed as constructor arguments to HBaseStorage. The first tells it which column families and columns to read, and the second passes a set of options.

In HBase, columns are referenced as column_family:column. In the preceding example, user_info:name indicates the column name in the column family user_info. When you want to extract a whole column family, you give the column family and an asterisk, for example, links:*. You can also get a subset of the columns in a column family. For example, links:100* would result in a map having all columns that start with 100. The map that contains a column family has the HBase column names as keys and the column values as values.

The options string allows you to configure HBaseStorage. This can be used to control whether the key is loaded, which rows are loaded, and other features. All of these options are placed in one string, separated by spaces. Table 12-1 describes each of these options.

Table 12-1. HBaseStorage options
Option Valid values Default Description
loadKey Boolean false If true, the key will be loaded as the first column in the input.
gt Row key None Only loads rows with a key greater than the provided value.
gte Row key None Only loads rows with a key greater than or equal to the provided value.
lt Row key None Only loads rows with a key less than the provided value.
lte Row key None Only loads rows with a key less than or equal to the provided value.
caching Integer 100 The number of rows the scanners should cache.
limit Integer None Read at most this many rows from each HBase region.
caster Java classname Utf8StorageConverter The Java class to use to do casting between Pig types and the bytes that HBase stores. This class must implement Pig’s LoadCaster and StoreCaster interfaces. The default Utf8StorageConverter can be used when the data stored in HBase is in UTF8 format and the numbers are stored as strings (rather than in binary). HBaseBinaryConverter uses Java’s Byte.toInt, Byte.toString, etc., methods. It is not possible to cast to maps using this converter, so you cannot read entire column families.

As of the time of this writing, Pig is able to read only the latest version of a column value. There have been discussions about what the best interface and data type mapping would be to enable Pig to read multiple versions. This feature will most likely be added at some point in the future.

HBaseStorage stores data into HBase as well. When storing data, you specify the table name as the location string, just as in load. The constructor arguments are also similar to the load case. The first describes the mapping of Pig fields to the HBase table, which uses the same column_family:column syntax as in load. Any Pig value can be mapped to a column. A Pig map can be mapped to a column family by saying column_family:* (again, only in 0.9 and later). The row key is not referenced in this argument, but it is assumed to be the first field in the Pig tuple. The only valid option in the optional second argument in the store case is -caster.

Assume at the end of processing that our Pig data has a schema of id: long, name:chararray, email:chararray, links:map. Storing into our example HBase table we used earlier looks like this:

// Schema of user_links is (id, name, email, links).
// Notice how the id (key) field is omitted in the argument.
store user_links into 'hbase://users'
    using org.apache.pig.backend.hadoop.hbase.HBaseStorage(
    'user_info:name, user_info:email, links:*');

Cassandra

Apache Cassandra is another scalable database used for high-volume random reading and writing of data. It differs from HBase in its approach to distribution. Whereas HBase guarantees consistency between its servers, Cassandra has an eventual consistency model, meaning that servers might have different values for the same data for some period of time. For more information about Cassandra, see Cassandra: The Definitive Guide, by Eben Hewitt (O’Reilly).

Cassandra comes with support for Pig, which means that you can load data from and store data to Cassandra column families. This works just as it does with any other storage mechanism that is used with Pig, such as HDFS. This includes data locality for input splits.

Pig and Cassandra can be used together in a number of ways. Pig can be used to do traditional analytics while Cassandra performs real-time operations. Because Pig and MapReduce can be run on top of Cassandra, this can be done without moving data between Cassandra and HDFS. HDFS is still required for storing intermediate results; however, Pig can be used to do data exploration, research, testing, validation, and correction over Cassandra data as well. It can be used to populate the data store with new data as new tables or column families are added.

The Pygmalion project was written to ease development when using Pig with data stored in Cassandra. It includes helpful UDFs to extract column values from the results, marshal the data back to a form that Cassandra accepts, and others.

In order to properly integrate Pig workloads with data stored in Cassandra, the Cassandra cluster needs to colocate the data with Hadoop task trackers. This allows the Hadoop job tracker to move the data processing to the nodes where the data resides. Traditionally, Cassandra is used for heavy writes and real-time, random-access queries. Heavy Hadoop analytic workloads can be performed on Cassandra without degrading the performance of real-time queries by splitting the cluster by workload type. A set of nodes is dedicated to handling analytic batch processing and another set is dedicated to handling real-time queries. Cassandra’s cross-datacenter replication copies data transparently between these sections of the cluster so that manual copying of data is never required, and the analytic section always has updated data.

Metadata in Hadoop

Apache HCatalog provides a metadata and table management layer for Hadoop. It allows Hadoop users—whether they use MapReduce, Pig, Hive, or other tools—to view their data in HDFS as if it were in tables. These tables are partitioned and have consistent schemas. As a consequence of this abstraction, Pig users do not need to be concerned with where a file is located, which load and store function should be used, and whether the file is compressed. It also makes it much easier for Pig, MapReduce, and Hive users to share data because HCatalog provides a single schema and data type model for all of these tools. That data type model, taken from Hive, varies slightly from Pig’s, but the load and store functions take care of mapping between the models. HCatalog uses Hive’s metastore to store metadata. For full details of HCatalog, see http://incubator.apache.org/hcatalog.

HCatalog includes the load function HCatLoader. The location string for HCatLoader is the name of the table. It implements LoadMetadata, so you do not need to specify the schema as part of your load statement; Pig will get it from HCatLoader. Also, because it implements this interface, Pig can work with HCatalog’s partitioning. If you place the filter statement that describes which partitions you want to read immediately after the load, Pig will push that into the load so that HCatalog returns only the relevant partitions.

HCatStorer is the store function for HCatalog. As with the load function, the location string indicates the table to store records to. The store function also requires a constructor argument to indicate the partition key values for this store. At this time (version 0.1) only one partition can be written to in a single store. There are plans to allow writing to multiple partitions in version 0.2. HCatStorer expects the schema of the alias being stored to match the schema of the table that records are being stored to.

As an example, let’s consider a very simple data pipeline that reads in raw web logs from a table web_server_logs, does some processing, and stores them back into HCatalog in a table named processed_logs. web_server_log’s schema is (userid:chararray, date:chararray, time:chararray, url:chararray), and processed_log’s schema is (userid:chararray, user_ref:int, date:chararray, time:chararray, pageid:int, url:chararray). A Pig Latin script to do this processing would look like the following:

logs   = load 'web_server_logs' using HCatalogLoader();
-- use parameter substitution so script doesn't have to be rewritten every day
-- filter will be split and date portion pushed to the loader
today  = filter logs by date = '$DATE' and NotABot(user_id);
...
-- schema of output must exactly match HCatalog schema 
-- of processed_logs, including field names
output = foreach rslvd generate userid, user_ref, date, time, pageid, url;
store output into 'processed_logs' using HCatStorer('date=$DATE');


[31] Atomicity, Consistency, Isolation, and Durability. See http://en.wikipedia.org/wiki/ACID for a discussion of these properties in relational databases.