Chapter 9. Embedding Pig Latin in Python

Pig Latin is a dataflow language. Unlike general-purpose programming languages, it does not include control flow constructs such as if and for. For many data-processing applications, the operators Pig provides are sufficient. But there are classes of problems that either require the data flow to be repeated an indefinite number of times or need to branch based on the results of an operator. Iterative processing, where a calculation needs to be repeated until the margin of error is within an acceptable limit, is one example. It is not possible to know beforehand how many times the data flow will need to be run before processing begins.

Blending data flow and control flow in one language is difficult to do in a way that is useful and intuitive. Building a general-purpose language and all the associated tools, such as IDEs and debuggers, is a considerable undertaking; also, there is no lack of such languages already. If we turned Pig Latin into a general-purpose language, it would require users to learn a much bigger language to process their data. For these reasons, we decided to embed Pig in existing scripting languages. This avoids the need to invent a new language while still providing users with the features they need to process their data.[21]

As with UDFs, we chose to use Python for the initial release of embedded Pig in version 0.9. The embedding interface is a Java class, so a Jython interpreter is used to run these Python scripts that embed Pig. This means Python 2.5 features can be used but Python 3 features cannot. In the future we hope to extend the system to other scripting languages that can access Java objects, such as JavaScript[22] and JRuby. Of course, since the Pig infrastructure is all in Java, it is possible to use this same interface to embed Pig into Java scripts.

This embedding is done in a JDBC-like style, where your Python script first compiles a Pig Latin script, then binds variables from Python to it, and finally runs it. It is also possible to do filesystem operations, register JARs, and perform other utility operations through the interface. The top-level class for this interface is org.apache.pig.scripting.Pig.

Throughout this chapter we will use an example of calculating page rank from a web crawl. You can find this example under examples/ch9 in the example code. This code iterates over a set of URLs and links to produce a page rank for each URL.[23] The input to this example is the webcrawl data set found in the examples. Each record in this input contains a URL, a starting rank of 1, and a bag with a tuple for each link found at that URL:

http://pig.apache.org/privacypolicy.html 1 {(http://www.google.com/privacy.html)}
http://www.google.com/privacypolicy.html 1 {(http://www.google.com/faq.html)}
http://desktop.google.com/copyrights.html 1 {}

Even though control flow is done via a Python script, it can still be run using Pig’s bin/pig script. bin/pig looks for the #! line and calls the appropriate interpreter. This allows you to use these scripts with systems that expect to invoke a Pig Latin script. It also allows Pig to include UDFs from this file automatically and to give correct line numbers for error messages.

In order to use the Pig class and related objects, the code must first import them into the Python script:

from org.apache.pig.scripting import *

Compile

Calling the static method Pig.compile causes Pig to do an initial compilation of the code. Because we have not bound the variables yet, this check cannot completely verify the script. Type checking and other semantic checking is not done at this phase—only the syntax is checked. compile returns a Pig object that can be bound to a set of variables:

# pagerank.py
P = Pig.compile("""
previous_pagerank = load '$docs_in' as (url:chararray, pagerank:float,
                      links:{link:(url:chararray)});
outbound_pagerank = foreach previous_pagerank generate
                      pagerank / COUNT(links) as pagerank,
                      flatten(links) as to_url;
cogrpd            = cogroup outbound_pagerank by to_url,
                      previous_pagerank by url;
new_pagerank      = foreach cogrpd generate group as url,
                      (1 - $d) + $d * SUM (outbound_pagerank.pagerank)
                      as pagerank,
                      flatten(previous_pagerank.links) as links,
                      flatten(previous_pagerank.pagerank) AS previous_pagerank;
store new_pagerank into '$docs_out';
nonulls           = filter new_pagerank by previous_pagerank is not null and
                        pagerank is not null;
pagerank_diff     = foreach nonulls generate ABS (previous_pagerank - pagerank);
grpall            = group pagerank_diff all;
max_diff          = foreach grpall generate MAX (pagerank_diff);
store max_diff into '$max_diff';
""")

The only pieces of this Pig Latin script that we have not seen before are the four parameters, marked in the script as $d, $docs_in, $docs_out, and $max_diff. The syntax for these parameters is the same as for parameter substitution. However, Pig expects these to be supplied by the control flow script when bind is called.

There are three other compilation methods in addition to the one shown in this example. compile(String name, String script) takes a name in addition to the Pig Latin to be compiled. This name can be used in other Pig Latin code blocks to import this block:

P1 = Pig.compile("initial", """
A = load 'input';
...
""")
    P2 = Pig.compile("""
import initial;
B = load 'more_input';
...
""")

There are two compilation methods called compileFromFile. These take the same arguments as compile, but they expect the script argument to refer to a file containing the script, rather than the script itself.

Bind

Once your script has been compiled successfully, the next step is to bind variables in the control flow to variables in Pig Latin. In our example script this is done by providing a map to the bind call. The keys are the name of the variables in Pig Latin. The values in the following example are literal string values that are updated as the script progresses. They also could be references to Python variables:

# pagerank.py
params = { 'd': '0.5', 'docs_in': 'data/webcrawl' }

for i in range(10):
    out = "out/pagerank_data_" + str(i + 1)
    max_diff = "out/max_diff_" + str(i + 1)
    params["docs_out"] = out
    params["max_diff"] = max_diff
    Pig.fs("rmr " + out)
    Pig.fs("rmr " + max_diff)
    bound = P.bind(params)
    stats = bound.runSingle()
    if not stats.isSuccessful():
        raise 'failed'
    mdv = float(str(stats.result("max_diff").iterator().next().get(0)))
    print "max_diff_value = " + str(mdv)
    if mdv < 0.01:
        print "done at iteration " + str(i)
        break
    params["docs_in"] = out

For the initial run, the Pig Latin $d will take on the value of 0.5, $docs_in the filename webcrawl, $docs_out out/pagerank_data_1, and $max_diff out/max_diff_1.

bind returns a BoundScript object. This object can be run, explained, described, or illustrated. As is shown in this script, a single Pig object can be bound multiple times. A compile is necessary only on the first pass, with different values being bound to it each time.

In our example, bind is given a mapping of the variables to bind. If all of your Python variables and Pig Latin variables have the same name, you can call bind with no arguments. This will cause bind to look in the Python context for variables of the same name as the parameters in Pig and use them. If it cannot find appropriate variables, it will throw an error. We could change our example script to look like this:

# pagerankbindnoarg.py
d = 0.5
docs_in = 'data/webcrawl'

for i in range(10):
    docs_out = "out/pagerank_data_" + str(i + 1)
    max_diff = "out/max_diff_" + str(i + 1)
    Pig.fs("rmr " + docs_out)
    Pig.fs("rmr " + max_diff)
    bound = P.bind()
    stats = bound.runSingle()
    if not stats.isSuccessful():
        raise 'failed'
    mdv = float(str(stats.result("max_diff").iterator().next().get(0)))
    print "max_diff_value = " + str(mdv)
    if mdv < 0.01:
        print "done at iteration " + str(i)
        break
    docs_in = docs_out

Binding Multiple Sets of Variables

Our example page rank script binds its compiled Pig Latin to different variables multiple times in order to iterate over the data. Each of these jobs is run separately, as is required by the iterative nature of calculating page rank. However, sometimes you want to run a set of jobs together; for example, consider calculating census data from countries all over the world. You want to run the same Pig Latin for each country, but you do not want to run them separately. There is no point in having a massively parallel system such as Hadoop if you are going to run jobs one at a time. You want to tell Pig to take your script and run it against input from all the countries at the same time.

There is a form of bind that provides this capability. Instead of taking a map of parameters, it takes a list of maps of parameters. It still returns a single BoundScript object, but when run is called on this object, all of the separate instantiations of the script will be run together:

#!/usr/bin/python
from org.apache.pig.scripting import *
pig = Pig.compile("""
    input = load '$country' using CensusLoader();
    ...
    store output into '$country_out';
""")

    params = [{'country': 'Afghanistan', 'country_out': 'af.out'},
    ...
              {'country': 'Zimbabwe', 'country_out': 'zw.out'}]

    bound = pig.bind(params)
    stats = bound.run()

Run

Once we have our BoundScript object, we can call runSingle to run it. This tells Pig to run a single Pig Latin script. This is appropriate when you have bound your script to just one set of variables. runSingle returns a PigStats object. This object allows you to get your results and examine what happened in your script, including status, error codes and messages if there was an error, and statistics about the run itself. Table 9-1 summarizes the more important methods available for PigStats.

Table 9-1. PigStats methods
Method Returns
result(String alias) Given an alias, returns an OutputStats object that describes the output stored from that alias. You can get a results iterator from OutputStats.
isSuccessful() Returns true if all went well, and false otherwise.
getReturnCode() Gets the return code from running Pig. See Table 2-1 for return code details.
getErrorMessage() Returns the error message if the run failed. This will try to pick the most relevant error message that was returned, most likely the last.
getAllErrorMessages() Returns a list of all of the error messages if the run failed.
getOutputLocations() Returns a list of location strings that were stored in the script. For example, if you wrote output to a file on HDFS, this will return the filename.
getOutputNames() Returns a list of aliases that were stored in the script.
getRecordWritten() Returns the total number of records written by the script.
getBytesWritten() Returns the total number of bytes written by the script.
getNumberRecords(String location) Given an output location, returns the number of records written to that location.
getNumberBytes(String location) Given an output location, returns the number of bytes written to that location.
getDuration() Wall clock time it took the script to run.
getNumberJobs() Number of MapReduce jobs run by this script.

As seen in the example, the OutputStats object returned by result() can be used to get an iterator on the result set. With this you can iterate through the tuples of your data, processing them in your Python script. Standard Tuple methods such as get() can be used to inspect the contents of each record. See “Interacting with Pig values” for a discussion of working with Tuples. Based on the results read in the iterator, your Python script can decide whether to cease iteration and declare success, raise an error, or continue with another iteration.

For this iterator to work, the store function you use to store results from the alias must also be a load function. Pig attempts to use the same class to load the results as was used to store it. The default PigStorage works well for this.

Running Multiple Bindings

If you bound your Pig object to a list of maps of parameters, rather than call runSingle, you should call run. This will cause Pig to start a thread for each binding and run it. All these jobs will be submitted to Hadoop at the same time, making use of Hadoop’s parallelism. run returns a list of PigStats objects. The PigStats objects are guaranteed to be in the same order in the list as in the maps of bound variables passed to bind. Thus the results of the first binding map are in the first position of the PigStats list, etc.

Utility Methods

In addition to the compile, bind, and run methods presented so far, there are also utility methods provided by Pig and BoundScript.

Filesystem operations can be done by calling the static method Pig.fs. The string passed to it should be a valid string for use in the Grunt shell (see Chapter 3). The return code from running the shell command will be returned.

You can use register, define, and set in your compiled Pig Latin statements as you do in nonembedded Pig Latin. However, you might wish to register a JAR, define a function alias, or set a value that you want to be effective through all your Pig Latin code blocks. In these cases you can use the static methods of Pig described in Table 9-2. The registers, defines, and sets done by these methods will affect all Pig Latin code compiled after they are called:

# register etc. will not affect this block.
p1 = Pig.compile("...")

Pig.registerJar("acme.jar")
Pig.registerUDF("acme_python.py", "acme")
Pig.define("d_to_e", "com.acme.financial.CurrencyConverter('dollar', 'euro'"))
Pig.set("default_parallel", "100")

# register etc. will affect p2 and p3
p2 = Pig.compile("...")
p3 = Pig.compile("...")
Table 9-2. Pig utility methods
Method Arguments Pig Latin equivalent
registerJar(String jarfile) jarfile is the JAR to register. register jarfile;
registerUDF(String udffile, String namespace)

udffile is the UDF file to register.

namespace is the namespace to place the UDF in.

register udffile using jython as namespace;
define(String alias, String definition)

alias is the name of the definition.

definition is the string being aliased.

define alias definition;
set(String variable, String value)

variable is the variable to set.

value is the value to set the variable to.

set variable value;

Once a script has been bound and a BoundScript returned, in addition to running the script you can also call describe, explain, or illustrate. These do exactly what they would if they were in a nonembedded Pig Latin script. However, they do not return the resulting output to your script; instead, it is dumped to the standard out. (These operators are intended for use in debugging rather than for returning data directly to your script.)



[21] In some of the documentation, wiki pages, and issues on JIRA, embedded Pig is referred to as Turing Complete Pig. This was what the project was called when it first started, even though we did not make Pig itself Turing complete.

[22] There is already an experimental version of JavaScript in 0.9.

[23] The example code was graciously provided by Julien Le Dem.