Chapter 10. Writing Evaluation and Filter Functions

It is time to turn our attention to how you can extend Pig. So far we have looked at the operators and functions Pig provides. But Pig also makes it easy for you to add your own processing logic via User Defined Functions (UDFs). These are written in Java and, starting with version 0.8, in Python.[24] This chapter will walk through how you can build evaluation functions, UDFs that operate on single elements of data or collections of data. It will also cover how to write filter functions, UDFs that can be used as part of filter statements.

UDFs are powerful tools, and thus the interfaces are somewhat complex. In designing Pig, our goal was to make easy things easy and hard things possible. So, the simplest UDFs can be implemented in a single method, but you will have to implement a few more methods to take advantage of more advanced features. We will cover both cases in this chapter.

Throughout this chapter we will use several running examples of UDFs. Some of these are built-in Pig UDFs, which can be found in your Pig distribution at src/org/apache/pig/builtin/. The others can be found on GitHub with the other example UDFs, in the directory udfs.

Writing an Evaluation Function in Java

Pig and Hadoop are implemented in Java, and so it is natural to implement UDFs in Java. This allows UDFs access to the Hadoop APIs and to many of Pig’s facilities.

Before diving into the details, it is worth considering names. Pig locates a UDF by looking for a Java class that exactly matches the UDF name in the script. For details on where it looks, see “Registering UDFs” and “define and UDFs”. There is not an accepted standard on whether UDF names should be all uppercase, camelCased (e.g., MyUdf), or all lowercase. Even the built-in UDFs provided by Pig vary in this regard. Keep in mind that, whatever you choose, you and all of the users of your UDF will have a better user experience if you make the name short, easy to remember, and easy to type.

Where Your UDF Will Run

Writing code that will run in a parallel system presents challenges. A separate instance of your UDF will be constructed and run in each map or reduce task. It is not possible to share state across these instances because they may not all be running at the same time. There will be only one instance of your UDF per map or reduce task, so you can share state within that context.[25]

When writing code for a parallel system, you must remember the power of parallelism. Operations that are acceptable in serial programs may no longer be advisable. Consider a UDF that, when it first starts, connects to a database server to download a translation table. In a serial or low-parallelism environment, this is a reasonable approach. But if you have 10,000 map tasks in your job and they all connect to your database at once, you will most likely hear from your DBA, and the conversation is unlikely to be pleasant.

In addition to an instance in each task, Pig will construct an instance of your UDF on the frontend during the planning stage. It does this for a couple of reasons. One, it wants to test early that it can construct your UDF; it would rather fail during planning than at runtime. Two, as we will cover later in this chapter, it will ask your UDF some questions about schemas and types it accepts as part of the execution planning. It will also give your UDF a chance to store information it wants to make available to the instances of itself that will be run in the backend.

Evaluation Function Basics

All evaluation functions extend the Java class org.apache.pig.EvalFunc. This class uses Java generics. It is parameterized by the return type of your UDF. The core method in this class is exec. It takes one record and returns one result, which will be invoked for every record that passes through your execution pipeline. As input it takes a tuple, which contains all of the fields the script passes to your UDF. It returns the type by which you parameterized EvalFunc. For simple UDFs, this is the only method you need to implement. The following code gives an example of a UDF that raises an integer to an integral power and returns a long result:

// java/com/acme/math/Pow.java
/**
 * A simple UDF that takes a value and raises it to the power of a second
 * value.  It can be used in a Pig Latin script as Pow(x, y), where x and y
 * are both expected to be ints.
 */
public class Pow extends EvalFunc<Long> {

    public Long exec(Tuple input) throws IOException {
        try {
            /* Rather than give you explicit arguments, UDFs are always handed
             * a tuple.  The UDF must know the arguments it expects and pull
             * them out of the tuple.  These next two lines get the first and
             * second fields out of the input tuple that was handed in.  Since
             * Tuple.get returns Objects, we must cast them to Integers.  If
             * the case fails, an exception will be thrown.
             */
            int base = (Integer)input.get(0);
            int exponent = (Integer)input.get(1);
            long result = 1;

            /* Probably not the most efficient method...*/
            for (int i = 0; i < exponent; i++) {
                long preresult = result;
                result *= base;
                if (preresult > result) {
                    // We overflowed.  Give a warning, but do not throw an
                    // exception.
                    warn("Overflow!", PigWarning.TOO_LARGE_FOR_INT);
                    // Returning null will indicate to Pig that we failed but
                    // we want to continue execution.
                    return null;
                }
            }
            return result;
        } catch (Exception e) {
            // Throwing an exception will cause the task to fail.
            throw new IOException("Something bad happened!", e);
        }
    }
}

EvalFunc is also used to implement aggregation functions. Because the group operator returns a record for each group, with a bag containing all the records in that group, your eval func still takes one record and returns one record. As an example of this, let’s take a look at the implementation of exec in Pig’s COUNT function. Some of the error-handling code has been removed for ease of reading:

// src/org/apache/pig/builtin/COUNT.java
public Long exec(Tuple input) throws IOException {
    try {
        // The data bag is passed to the UDF as the first element of the
        // tuple.
        DataBag bag = (DataBag)input.get(0);
        Iterator it = bag.iterator();
        long cnt = 0;
        while (it.hasNext()){
            Tuple t = (Tuple)it.next();
            // Don't count nulls or empty tuples
            if (t != null && t.size() > 0 &&
                t.get(0) != null) {
              cnt++;
            }
        }
        return cnt;
    } catch (Exception e) {
        ...
    }
}

Just as UDFs can take complex types as input, they also can return complex types as output. You could, for example, create a SetIntersection UDF that took two bags as input and returned a bag as output.

UDFs can also be handed the entire record by passing * to the UDF. You might expect that in this case the input Tuple argument passed to the UDF would contain all the fields passed into the operator the UDF is in. But it does not. Instead, it contains one field, which is a tuple that contains all those fields. Consider a Pig Latin script like this:

data      = load 'input' as (x, y, z);
processed = foreach data generate myudf(*);

In this case, myudf.exec will get a tuple with one field, which will be a tuple that will have three fields: x, y, and z. To access the y field of data, you will need to call t.get(0).get(1).

Interacting with Pig values

Evaluation functions and other UDFs are exposed to the internals of how Pig represents data types. This means that when you read a field and expect it to be an integer, you need to know that it will be an instance of java.lang.Integer. For a complete list of Pig types and how they are represented in Java, see “Types”. For most of these types, you construct the appropriate Java objects in the normal way. However, this is not the case for tuples and bags. These are interfaces, and they do not have direct constructors. Instead, you must use factory classes for each of these. This was done so that users and developers could build their own implementations of tuple and bag and instruct Pig to use them.

TupleFactory is an abstract singleton class that you must use to create tuples. You can also configure which TupleFactory is used, since users who provide their own tuples will need to provide their own factory to produce them. To get an instance of TupleFactory to construct tuples, call the static method TupleFactory.getInstance().

You can now create new tuples with either newTuple() or newTuple(int size). Whenever possible you should use the second method, which preallocates the tuple with the right number of fields. This avoids the need to dynamically grow the tuple later and is much more efficient. The method creates a tuple with size number of fields, all of which are null. You can now set the fields using the Tuple’s set(int fieldNum, Object val) method. As an example, we can look at how the example load function we will build in the next chapter creates tuples:

// JsonLoader.java
private TupleFactory tupleFactory = TupleFactory.getInstance();

private Object readField(JsonParser p,
                         ResourceFieldSchema field,
                         int fieldnum) throws IOException {
    ...
        ResourceSchema s = field.getSchema();
        ResourceFieldSchema[] fs = s.getFields();
        Tuple t = tupleFactory.newTuple(fs.length);

        for (int j = 0; j < fs.length; j++) {
            t.set(j, readField(p, fs[j], j));
        }
    ...
}

If you do not know the number of fields in the tuple when it is constructed, you can use newTuple(). You can then add fields using Tuple’s append(Object val) method, which will append the field to the end of the tuple.

To read data from tuples, use the get(int fieldNum) method. This returns a Java Object because the tuple does not have a schema instance and does not know what type this field is. You must either cast the result to the appropriate type or use the utility methods in org.apache.pig.data.DataType to determine the type.

Similar to tuples, BagFactory must be used to construct bags. You can get an instance using BagFactory.getInstance(). To get a new, empty bag, call newDefaultBag(). You can then add tuples to it as you construct them using DataBag’s add(Tuple t) method. You should do this rather than constructing a list of tuples and then passing it using newDefaultBag(List<Tuple> listOfTuples), because bags know how to spill to disk when they grow so large that they cannot fit into memory. Again we can look at JsonLoader to see an example of constructing bags:

// JsonLoader.java
private BagFactory bagFactory = BagFactory.getInstance();

private Object readField(JsonParser p,
                         ResourceFieldSchema field,
                         int fieldnum) throws IOException {
    ...
        DataBag bag = bagFactory.newDefaultBag();

        JsonToken innerTok;
        while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {

            t = tupleFactory.newTuple(fs.length);
            for (int j = 0; j < fs.length; j++) {
                t.set(j, readField(p, fs[j], j));
            }

            p.nextToken(); // read end of object
            bag.add(t);
        }
    ...
}

To read data from a bag, use the iterator provided by iterator(). This also implements Java’s Iterable, so you can use the construct for (Tuple t : bag).

Bags make the assumption that once data is being read from them, no new data will be written to them. Their implementation of how they spill and reread data depends on this assumption. So once you call iterator, you should never call add again on the same bag.

Input and Output Schemas

Pig typechecks a script before running it. EvalFunc includes a method to allow you to turn on type checking for your UDF as well, both for input and output.

When your UDF returns a simple type, Pig uses Java reflection to determine the return type. However, because exec takes a tuple, Pig has no way to determine what input you expect your UDF to take. You can check this at runtime, of course, but your development and testing will go more smoothly if you check it at compile time instead. For example, we could use the Pow UDF example in the previous section like this:

register 'acme.jar';
A = load 'input' as (x:chararray, y :int);
B = foreach A generate y, com.acme.math.Pow(x, 2);
dump B;

Pig will start a job and run your tasks. All the tasks will fail, and you will get an error message ERROR 2078: Caught error from UDF: com.acme.math.Pow [Something bad happened!]. Runtime exceptions like this are particularly expensive in Hadoop, both because scheduling can take a while on a busy cluster and because each task is tried three times before the whole job is declared a failure. Let’s fix this UDF so it checks up front that it was given reasonable input.

The method to declare the input your UDF expects is outputSchema. The method is called this because it returns the schema that describes the UDF’s output. If your UDF does not override this method, Pig will attempt to ascertain your return type from the return type of your implementation of EvalFunc, and pass your UDF whatever input the script indicates. If your UDF does implement this method, Pig will pass it the schema of the input that the script has indicated to pass into the UDF. This is also your UDF’s opportunity to throw an error if it receives an input schema that does not match its expectations. An implementation of this method for Pow looks like this:

 // java/com/acme/math/Pow.java
public Schema outputSchema(Schema input) {
    // Check that we were passed two fields
    if (input.size() != 2) {
        throw new RuntimeException(
            "Expected (int, int), input does not have 2 fields");
    }

    try {
        // Get the types for both columns and check them.  If they are
        // wrong, figure out what types were passed and give a good error
        // message.
        if (input.getField(0).type != DataType.INTEGER ||
                input.getField(1).type != DataType.INTEGER) {
            String msg = "Expected input (int, int), received schema (";
            msg += DataType.findTypeName(input.getField(0).type);
            msg += ", ";
            msg += DataType.findTypeName(input.getField(1).type);
            msg += ")";
            throw new RuntimeException(msg);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

    // Construct our output schema, which is one field that is a long
    return new Schema(new FieldSchema(null, DataType.LONG));
}

With this method added to Pow, when we invoke the previous script that mistakenly tries to pass a chararray to Pow, it now fails almost immediately with java.lang.RuntimeException: Expected input of (int, int), but received schema (chararray, int).

Pig’s Schema is a complicated class, and we will not delve into all its complexities here. The following summary will be enough to help you build your own schemas for outputSchema. At its core, Schema is a list of FieldSchemas and a mapping of aliases to FieldSchemas. Each FieldSchema contains an alias and a type. The types are stored as Java bytes, with constants for each type defined in the class org.apache.pig.data.DataType. Schema is a recursive structure. Each FieldSchema also has a Schema member. This member is nonnull only when the type is complex. In the case of tuples, it defines the schema of the tuple. In the case of bags, it defines the schema of the tuples in the bag. Starting in 0.9, if a schema is present for a map, it indicates the data type of values in the map. Before 0.9, maps did not have schemas:

public class Schema implements Serializable, Cloneable {

    // List of all fields in the schema.
    private List<FieldSchema> mFields;

    // Map of alias names to field schemas, so that lookup can be done by alias.
    private Map<String, FieldSchema> mAliases;

    // A FieldSchema represents a schema for one field.
    public static class FieldSchema implements Serializable, Cloneable {

         // Alias for this field.
        public String alias;

         // Datatype, using codes from org.apache.pig.data.DataType.
        public byte type;

        // If this is a tuple itself, it can have a schema. Otherwise, this field
        // must be null.
        public Schema schema;

        /**
         * Constructor for any type.
         * @param a Alias, if known. If unknown, leave null.
         * @param t Type, using codes from org.apache.pig.data.DataType.
         */
        public FieldSchema(String a, byte t) { ... }
    }

    /**
     * Create a schema with more than one field.
     * @param fields List of field schemas that describes the fields.
     */
    public Schema(List<FieldSchema> fields) { ... }

    /**
     * Create a schema with only one field.
     * @param fieldSchema field to put in this schema.
     */
    public Schema(FieldSchema fieldSchema) { ... }
 
    /**
     * Given an alias name, find the associated FieldSchema.
     * @param alias Alias to look up.
     * @return FieldSchema, or null if no such alias is in this tuple.
     */
    public FieldSchema getField(String alias) throws FrontendException { 
        // some error checking omitted.
        return mAliases.get(alias);
    }

    /**
     * Given a field number, find the associated FieldSchema.
     *
     * @param fieldNum Field number to look up.
     * @return FieldSchema for this field.
     */
    public FieldSchema getField(int fieldNum) throws FrontendException {
        // some error checking omitted.
        return mFields.get(fieldNum);
    }
}

As mentioned earlier, when your UDF returns a scalar type, Pig can use reflection to figure out that return type. When your UDF returns a bag or a tuple, however, you will need to implement outputSchema if you want Pig to understand the contents of that bag or tuple.

Error Handling and Progress Reporting

Our previous examples have given some hints of how to deal with errors. When your UDF encounters an error, you have a couple of choices on how to handle it. The most common case is to issue a warning and return a null. This tells Pig that your UDF failed and its output should be viewed as unknown.[26] We saw an example of this when the Pow function detected overflow:

for (int i = 0; i < exponent; i++) {
    long preresult = result;
    result *= base;
    if (preresult > result) {
        // We overflowed.  Give a warning, but do not throw an
        // exception.
        warn("Overflow!", PigWarning.TOO_LARGE_FOR_INT);
        // Returning null will indicate to Pig that we failed but
        // we want to continue execution.
        return null;
    }
}

warn, a method of EvalFunc, takes a message that you provide as well as a warning code. The warning codes are in org.apache.pig.PigWarning, including several user-defined codes that you can use if none of the provided codes matches your situation. These warnings are aggregated by Pig and reported to the user at the end of the job.

Warning and returning null is convenient because it allows your job to continue. When you are processing billions of records, you do not want your job to fail because one record out of all those billions had a chararray where you expected an int. Given enough data, the odds are overwhelming that a few records will be bad, and most calculations will be fine if a few data points are missing.

For errors that are not tolerable, your UDF can throw an exception. If Pig catches an exception, it will assume that you are asking to stop everything, and it will cause the task to fail. Hadoop will then restart your task. If any particular task fails three times, Hadoop will not restart it again. Instead, it will kill all the other tasks and declare the job a failure.

When you have concluded that you do need an exception, you should also issue a log message so that you can read the task logs later and get more context to determine what happened. EvalFunc has a member log that is an instance of org.apache.commons.logging.Log. Hadoop prints any log messages into logfiles on the task machine, which are available from the JobTracker UI. See “MapReduce Job Status” for details. You can also print info messages into the log to help you with debugging.

In addition to error reporting, some UDFs will need to report progress. Hadoop listens to its tasks to make sure they are making progress. If it does not hear from a task for five minutes, it concludes that the task died or went into an infinite loop. It then kills the task if it is still running, cleans up its resources, and restarts the task elsewhere. Pig reports progress to Hadoop on a regular basis. However, if you have a UDF that is very compute-intensive and a single invocation of it might run for more than five minutes, you should also report progress. To do this, EvalFunc provides a member reporter. By invoking report.progress() or report.progress(String msg) (where msg can say whatever you want) at least every five minutes, your UDF will avoid being viewed as a timeout.

Constructors and Passing Data from Frontend to Backend

Our discussion so far assumes that your UDF knows everything it needs to know at development time. This is not always the case. Consider a UDF that needs to read a lookup table from HDFS. You would like to be able to declare the filename when you use the UDF. You can do that by defining a nondefault constructor for your UDF.

By default, EvalFuncs have a no-argument constructor, but you can provide a constructor that takes one or more String arguments. This alternate constructor is then referenced in Pig Latin by using the define statement to define the UDF; see “define and UDFs” for details.

As an example, we will look at a new UDF, MetroResolver. This UDF takes a city name as input and returns the name of the larger metropolitan area that city is part of. For example, given Pasadena, it will return Los Angeles. Based on which country the input cities are in, a different lookup table will be needed. The name of a file in HDFS that contains this lookup table can be provided as a constructor argument. The class declaration, members, and constructor for our UDF look like this:

// java/com/acme/marketing/MetroResolver.java
/**
 * A lookup UDF that maps cities to metropolitan areas.
 */
public class MetroResolver extends EvalFunc<String> {

    String lookupFile;
    HashMap<String, String> lookup = null;

    /*
     * @param file - File that contains a lookup table mapping cities to metro
     * areas.  The file must be located on the filesystem where this UDF will
     * run.
     */
    public MetroResolver(String file) {
        // Just store the filename. Don't load the lookup table, since we may
        // be on the frontend or the backend.
        lookupFile = file;
    }
}

The UDF can now be invoked in a Pig Latin script like this:

register 'acme.jar';
define MetroResolver com.acme.marketing.MetroResolver('/user/you/cities/us');
A = load 'input' as (city:chararray);
B = foreach A generate city, MetroResolver(city);
dump B;

The filename /user/you/cities/us will be passed to MetroResolver every time Pig constructs it. However, our UDF is not yet complete because we have not constructed the lookup table. In fact, we explicitly set it to null. It does not make sense to construct it in the constructor, because the constructor will be invoked on both the frontend and backend. There are forms of dark magic that will allow the UDF to figure out whether it is being invoked on the frontend or backend, but I cannot recommend them, because they are not guaranteed to work the same between releases. It is much better to do the lookup table construction in a method that we know will be called only in the backend.

EvalFunc does not provide an initialize method that it calls on the backend before it begins processing. You can work around this by keeping a flag to determine whether you have initialized your UDF in a given task. The exec function for MetroResolver does this by tracking whether lookup is null:

public String exec(Tuple input) throws IOException {
    if (lookup == null) {
        // We have not been initialized yet, so do it now.

        lookup = new HashMap<String, String>();
        // Get an instance of the HDFS FileSystem class so
        // we can read a file from HDFS.  We need a copy of
        // our configuration to do that.
        // Read the configuration from the UDFContext.
        FileSystem fs = FileSystem.get(UDFContext.getUDFContext().getJobConf());
        DataInputStream in = fs.open(new Path(lookupFile));
        String line;
        while ((line = in.readLine()) != null) {
            String[] toks = new String[2];
            toks = line.split(":", 2);
            lookup.put(toks[0], toks[1]);
        }
        in.close();
    }
    return lookup.get((String)input.get(0));
}

This initialization section handles opening the file and reading it. In order to open the file, it must first connect to HDFS. This is accomplished by FileSystem.get. This method in turn needs a JobConf object, which is where Hadoop stores all its job information. The JobConf object can be obtained using UDFContext, which we will cover in more detail later. Note that obtaining JobConf in this way works only on the backend, as no job configuration exists on the frontend.

Once we are connected to HDFS, we open the file and read it as we would any other file. It is parsed into two fields and put into the hash table. All subsequent calls to exec will just be lookups in the hash table.

Loading the distributed cache

Our MetroResolver UDF opens and reads its lookup file from HDFS, which you will often want. However, having hundreds or thousands of map tasks open the same file on HDFS at the same time puts significant load on the NameNode and the DataNodes that host the file’s blocks. To avoid this situation, Hadoop provides the distributed cache, which allows users to preload HDFS files locally onto the nodes their tasks will run on. For details, see “Distributed Cache”.

Let’s write a second version of MetroResolver that uses the distributed cache. Beginning in version 0.9, EvalFunc provides a method getCacheFiles that is called on the frontend. Your UDF returns a list of files from this method that it wants in the distributed cache. The format of each file is client_file#task_file, where client_file is the path to the file on your client, and task_file is the name the file will be given on your task node. task_file is relative to your UDF’s working directory on the backend. You should place any files in your working directory rather than using an absolute path. task_file will be a local file on the task node and should be read using standard Java file utilities. It should not be read using HDFS’s FileSystem:

// java/com/acme/marketing/MetroResolverV2.java
/**
 * A lookup UDF that maps cities to metropolatin areas, this time using the
 * Distributed Cache.
 */
public class MetroResolverV2 extends EvalFunc<String> {

    String lookupFile;
    HashMap<String, String> lookup = null;

    /*
     * @param file - File that contains a lookup table mapping cities to metro
     * areas.  The file must be located on the filesystem where this UDF will
     * run.
     */
    public MetroResolverV2(String file) {
        // Just store the filename. Don't load the lookup table, since we may
        // be on the frontend or the backend.
        lookupFile = file;
    }

    public String exec(Tuple input) throws IOException {
        if (lookup == null) {
            // We have not been initialized yet, so do it now.
            lookup = new HashMap<String, String>();

            // Open the file as a local file.
            FileReader fr = new FileReader("./mrv2_lookup");
            BufferedReader d = new BufferedReader(fr);
            String line;
            while ((line = d.readLine()) != null) {
                String[] toks = new String[2];
                toks = line.split(":", 2);
                lookup.put(toks[0], toks[1]);
            }
            fr.close();
        }
        return lookup.get((String)input.get(0));
    }

    public List<String> getCacheFiles() {
        List<String> list = new ArrayList<String>(1);
        // We were passed the name of the file on HDFS.  Append a
        // name for the file on the task node.
        list.add(lookupFile + "#mrv2_lookup");
        return list;
    }
}

UDFContext

Constructor arguments work as a way to pass information into your UDF, if you know the data at the time the script is written. You can extend this using parameter substitution (see “Parameter Substitution”) so that data can be passed when the script is run. But some information you want to pass from frontend to backend cannot be known when the script is run, or it might not be accessible in String form on the command line. An example is collecting properties from the environment and passing them.

To allow UDFs to pass data from the frontend to the backend, starting in version 0.8, Pig provides a singleton class, UDFContext. Your UDF obtains a reference to it by calling getUDFContext. We have already seen that UDFs can use UDFContext to obtain a copy of the JobConf. Beginning in version 0.9, UDFContext also captures the System properties on the client and carries them to the backend. Your UDF can then obtain them by calling getClientSystemProperties.

UDFContext also provides mechanisms for you to pass a properties object explicitly for your UDF. You can either pass a properties object for all UDFs of the same class or pass a specific object for each instance of your UDF. To use the same one for all instances of your UDF, call getUDFProperties(this.getClass()). This will return a Properties object that is a reference to a properties object kept by UDFContext. UDFContext will capture and transmit to the backend any changes made in this object. You can call this in outputSchema, which is guaranteed to be called in the frontend. When you want to read the data, call the same method again in your exec method. When using the object in the exec method, keep in mind that any changes made to the returned Properties will not be transmitted to other instances of the UDF on the backend, unless you happen to have another instance of the same UDF in the same task. This is a mechanism for sending information from the frontend to the backend, not between instances in the backend.

Sometimes you will want to transmit different data to different instances of the same UDF. By different instances I mean different invocations in your Pig Latin script, not different instantiations in various map and reduce tasks. To support this, UDFContext provides getUDFProperties(Class, String[]). The constructor arguments to your UDF are a good candidate to be passed as the array of String. This allows each instance of the UDF to differentiate itself. If your UDF does not take constructor arguments, or all arguments have the same value, you can add one unused argument that is solely to distinguish separate instances of the UDF.

Consider a UDF that has its own properties file, which might be useful if you want to pass different properties to different UDFs, or if you have many UDF-specific properties that you want to change without changing your Pig properties file. Let’s write a second version of the stock analyzer UDF that we used in Chapter 6:

// java/com/acme/financial/AnalyzeStockV2.java
/**
 * This UDF takes a bag of information about a stock and
 * produces a floating-point score between 1 and 100, 
 * 1 being sell, 100 being buy.
 */
public class AnalyzeStockV2 extends EvalFunc<Float> {

    Random r = new Random();
    Properties myProperties = null;

    @Override
    public Float exec(Tuple input) throws IOException {
        if (myProperties == null) {
            // Retrieve our class-specific properties from UDFContext.
            myProperties =
                UDFContext.getUDFContext().getUDFProperties(this.getClass());
        }

        // Make sure the input isn't null and is of the right size.
        if (input == null || input.size() != 1) return null;

        DataBag b = (DataBag)input.get(0);
        for (Tuple t : b) {
            // Do some magic analysis, using properites from myProperties to
            // decide how ...
        }
        return r.nextFloat() * 100;
    }
    @Override
    public Schema outputSchema(Schema input) {
        try {
            // Read our properties file.
            Properties prop = new Properties();
            prop.load(new FileInputStream("/tmp/stock.properties"));
            // Get a properties object specific to this UDF class.
            UDFContext context = UDFContext.getUDFContext();
            Properties udfProp = context.getUDFProperties(this.getClass());
            // Copy our properties into it.  There is no need to pass it
            // back to UDFContext.
            for (Map.Entry<Object, Object> e : prop.entrySet()) {
                udfProp.setProperty((String)e.getKey(), (String)e.getValue());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
    }

}

Overloading UDFs

Sometimes you want different UDF implementations depending on the data type the UDF is processing. For example, MIN(long) should return a long, whereas MIN(int) should return an int. To enable this, EvalFunc provides the method getArgToFuncMapping. If this method returns a null, Pig will use the current UDF. To provide a list of alternate UDFs based on the input types, this function returns a list of FuncSpecs. A FuncSpec is a Pig class that describes a UDF. Each of these FuncSpecs describes a set of expected input arguments and the UDF, as a Java class, that should be used to handle them. Pig’s typechecker will use this list to determine which Java class to place in the execution pipeline (more on this later). The getArgToFuncMapping of Pig’s built-in MIN function looks like this:

// src/org/apache/pig/builtin/MIN.java
public List<FuncSpec> getArgToFuncMapping()
throws FrontendException {
  List<FuncSpec> funcList = new ArrayList<FuncSpec>();

  // The first element in the list is this class itself, which is built to
  // handle the case where the input is a bytearray.  So we return our own
  // classname and a schema that indicates this function expects a BAG with 
  // tuples that have one field, which is a bytearray.  generateNestedSchema is a
  // helper method that generates schemas of bags that have tuples with one
  // field.
  funcList.add(new FuncSpec(this.getClass().getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));

  // If our input schema is a bag with tuples with one field that is a double,
  // then we use the class DoubleMin instead of MIN to implement min.
  funcList.add(new FuncSpec(DoubleMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));

  // and so on...
  funcList.add(new FuncSpec(FloatMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));

  funcList.add(new FuncSpec(IntMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));

  funcList.add(new FuncSpec(LongMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));

  funcList.add(new FuncSpec(StringMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));

  return funcList;
}

Pig’s typechecker goes through a set of steps to determine which FuncSpec is the closest match, and thus which Java class it should place in this job’s execution pipeline. At each step, if it finds a match, it uses that match. If it finds more than one match at a given step, it will return an error that gives all the matching possibilities. If it finds no match in the whole list, it will also give an error. As an example of this, let’s consider another version of the Pow UDF we built above. We will call this one PowV2. It takes either two longs or two doubles as input. Its getArgToFuncMapping looks like the following:

// java/com/acme/math/PowV2.java
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
    List<FuncSpec> funcList = new ArrayList<FuncSpec>();
    Schema s = new Schema();
    s.add(new Schema.FieldSchema(null, DataType.DOUBLE));
    s.add(new Schema.FieldSchema(null, DataType.DOUBLE));
    funcList.add(new FuncSpec(this.getClass().getName(), s));
    s = new Schema();
    s.add(new Schema.FieldSchema(null, DataType.LONG));
    s.add(new Schema.FieldSchema(null, DataType.LONG));
    funcList.add(new FuncSpec(LongPow.class.getName(), s));
    return funcList;
}

In the typechecker’s search for the best UDF to use, step one is to look for an exact match, where all of the expected input declared by the UDF is matched by the actual input passed in Pig Latin. Pow(2.0, 3.1415) passes two doubles, so Pig Latin will choose PowV2. Pow(2L, 3L) passes two longs, so LongPow will be used.

Step two is to look for bytearrays that are passed into the UDF and see whether a match can be made by inserting casts for those bytearrays. For example, Pig will rewrite Pow(x, 2L), where x is a bytearray, as Pow((long)x, 2L) and use LongPow. This rule can confuse Pig when all arguments are bytearrays, because bytearrays can be cast to any type. Pow(x, y), where both x and y are bytearrays, results in an error message:

Multiple matching functions for com.acme.math.PowV2 with input schema: 
    ({double,double}, {long,long}). Please use an explicit cast.

Step three is to look for an implicit cast that will match one of the provided schemas. The implicit cast that is closest will be used. Implicit casting of numeric types goes from int to long to float to double, and by closest I mean the cast that requires the least steps in that list. So, Pow(2, 2) will use LongPow, whereas Pow(2.0, 2) will use PowV2.

Step four is to look for a working combination of steps two and three, bytearray casts plus implicit casts. Pow(x, 3.14f), where x is a bytearray, will use PowV2 by promoting 3.14f to a double and casting x to a double.

If after all these steps Pig still has not found a suitable method, it will fail and say it cannot determine which method to use. Pow('hello', 2) gives an error message:

Could not infer the matching function for com.acme.math.PowV2 as multiple or none of 
them fit. Please use an explicit cast.

Memory Issues in Eval Funcs

Some operations you will perform in your UDFs will require more memory than is available. As an example, you might want to build a UDF that calculates the cumulative sum of a set of inputs. This will return a bag of values because, for each input, it needs to return the intermediate sum at that input.

Pig’s bags handle spilling data to disk automatically when they pass a certain size threshold or when only a certain amount of heap space remains. Spilling to disk is expensive and should be avoided whenever possible. But if you must store large amounts of data in a bag, Pig will manage it.

Bags are the only Pig data type that know how to spill. Tuples and maps must fit into memory. Bags that are too large to fit in memory can still be referenced in a tuple or a map; this will not be counted as those tuples or maps not fitting into memory.

Algebraic Interface

I have already mentioned in a number of other places that there are significant advantages to using Hadoop’s combiner whenever possible. It lowers skew in your reduce tasks, as well as the amount of data sent over the network between map and reduce tasks. For details on the combiner and when it is run, see “Combiner Phase”.

Use of the combiner is interesting when you are working with sets of data, usually sets you intend to aggregate and then compute a single or small set of values for. There are two classes of functions that fit nicely into the combiner: distributive and algebraic. A function is distributive if the same result is obtained by 1) dividing its input set into subsets, applying the function to those subsets, and then applying the function to those results; or 2) applying the function to the original set. SUM is an example of this. A function is said to be algebraic if it can be divided into initial, intermediate, and final functions (possibly different from the initial function), where the initial function is applied to subsets of the input set, the intermediate function is applied to results of the initial function, and the final function is applied to all of the results of the intermediate function. COUNT is an example of an algebraic function, with count being used as the initial function and sum as the intermediate and final functions. A distributive function is a special case of an algebraic function, where the initial, intermediate, and final functions are all identical to the original function.

An EvalFunc can declare itself to be algebraic by implementing the Java interface Algebraic. Algebraic provides three methods that allow your UDF to declare Java classes that implement its initial, intermediate, and final functionality. These classes must extend EvalFunc:

// src/org/apache/pig/Algebraic.java
public interface Algebraic{
    
    /**
     * Get the initial function. 
     * @return A function name of f_init. f_init should be an eval func.
     */
    public String getInitial();

    /**
     * Get the intermediate function. 
     * @return A function name of f_intermed. f_intermed should be an eval func.
     */
    public String getIntermed();

    /**
     * Get the final function. 
     * @return A function name of f_final. f_final should be an eval func 
     * parameterized by the same datum as the eval func implementing this interface.
     */
    public String getFinal();
}

Each of these methods returns a name of a Java class, which should itself implement EvalFunc. Pig will use these UDFs to rewrite the execution of your script. Consider the following Pig Latin script:

input = load 'data' as (x, y);
grpd  = group input by x;
cnt   = foreach grpd generate group, COUNT(input);
store cnt into 'result';

The execution pipeline for this script would initially look like:

Map

load

Reduce

foreach(group, COUNT), store

After being rewritten to use the combiner, it would look like:

Map

load

foreach(group, COUNT.Initial)

Combine

foreach(group, COUNT.Intermediate)

Reduce

foreach(group, COUNT.Final), store

As an example, we will walk through the implementation for COUNT. Its algebraic functions look like this:

// src/org/apache/pig/builtin/COUNT.java
public String getInitial() {
    return Initial.class.getName();
}

public String getIntermed() {
    return Intermediate.class.getName();
}

public String getFinal() {
    return Final.class.getName();
}

Each of these referenced classes is a static internal class in COUNT. The implementation of Initial is:

// src/org/apache/pig/builtin/COUNT.java
static public class Initial extends EvalFunc<Tuple> {

    public Tuple exec(Tuple input) throws IOException {
        // Since Initial is guaranteed to be called
        // only in the map, it will be called with an
        // input of a bag with a single tuple - the 
        // count should always be 1 if bag is nonempty,
        DataBag bag = (DataBag)input.get(0);
        Iterator it = bag.iterator();
        if (it.hasNext()){
            Tuple t = (Tuple)it.next();
            if (t != null && t.size() > 0 && t.get(0) != null)
                return mTupleFactory.newTuple(Long.valueOf(1));
        }
        return mTupleFactory.newTuple(Long.valueOf(0));
    }
}

Even though the initial function is guaranteed to receive only one record in its input, that record will match the schema of the original function. So, in the case of COUNT, it will be a bag. Thus, this initial method determines whether there is a nonnull record in that bag. If so, it returns one; otherwise, it returns zero. The return type of the initial function is a tuple. The contents of that tuple are entirely up to you as the UDF implementer. In this case, the initial returns a tuple with one long field.

COUNT’s Intermediate class sums the counts seen so far:

// src/org/apache/pig/builtin/COUNT.java
static public class Intermediate extends EvalFunc<Tuple> {

    public Tuple exec(Tuple input) throws IOException {
        try {
            return mTupleFactory.newTuple(sum(input));
        } catch (ExecException ee) {
            ...
        }
    }
}

static protected Long sum(Tuple input)
throws ExecException, NumberFormatException {
    DataBag values = (DataBag)input.get(0);
    long sum = 0;
    for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
        Tuple t = it.next();
        sum += (Long)t.get(0);
    }
    return sum;
}

The input to the intermediate function is a bag of tuples that were returned by the initial function. The intermediate function may be called zero, one, or many times. So, it needs to output tuples that match the input tuples it expects. The framework will handle placing those tuples in bags. COUNT’s intermediate function returns a tuple with a long. As we now want to sum the previous counts, this function implements SUM rather than COUNT.

The final function is called in the reducer and is guaranteed to be called only once. Its input type is a bag of tuples that both the initial and intermediate implementations return. Its return type needs to be the return type of the original UDF, which in this case is long. In COUNT’s case, this is the same operation as the intermediate because it sums the intermediate sums:

// src/org/apache/pig/builtin/COUNT.java
static public class Final extends EvalFunc<Long> {
    public Long exec(Tuple input) throws IOException {
        try {
            return sum(input);
        } catch (Exception ee) {
            ...
        }
    }
}

Implementing Algebraic does not guarantee that the algebraic implementation will always be used. Pig chooses the algebraic implementation only if all UDFs in the same foreach statement are algebraic. This is because our testing has shown that using the combiner with data that cannot be combined significantly slows down the job. And there is no way in Hadoop to route some data to the combiner (for algebraic functions) and some straight to the reducer (for nonalgebraic). This means that your UDF must always implement the exec method, even if you hope it will always be used in algebraic mode. An additional motivation is to implement algebraic mode for your UDFs when possible.

Accumulator Interface

Some calculations cannot be done in an algebraic manner. In particular, any function that requires its records to be sorted before beginning is not algebraic. But many of these methods still do not need to see their entire input at once; they can work on subsets of the data as long as they are guaranteed it is all available. This means Pig does not have to read all of the records into memory at once. Instead, it can read a subset of the records and pass them to the UDF. To handle these cases, Pig provides the Accumulator interface. Rather than calling a UDF once with the entire input set in one bag, Pig will call it multiple times with a subset of the records. When it has passed all the records in, it will then ask for a result. Finally, it will give the UDF a chance to reset its state before passing it records for the next group:

// src/org/apache/pig/Accumulator.java
public interface Accumulator <T> {
    /**
     * Pass tuples to the UDF.
     * @param b A tuple containing a single field, which is a bag.  The bag will
     * contain the set of tuples being passed to the UDF in this iteration.
     */
    public void accumulate(Tuple b) throws IOException;

    /**
     * Called when all tuples from current key have been passed to accumulate.
     * @return the value for the UDF for this key.
     */
    public T getValue();
    
    /** 
     * Called after getValue() to prepare processing for next key. 
     */
    public void cleanup();
}

As an example, let’s look at COUNT’s implementation of the accumulator:

// src/org/apache/pig/builtin/COUNT.java
private long intermediateCount = 0L;

public void accumulate(Tuple b) throws IOException {
    try {
        DataBag bag = (DataBag)b.get(0);
        Iterator it = bag.iterator();
        while (it.hasNext()){
            Tuple t = (Tuple)it.next();
            if (t != null && t.size() > 0 && t.get(0) != null) {
                intermediateCount += 1;
            }
        }
    } catch (Exception e) {
      ...
    }
}

public void cleanup() {
    intermediateCount = 0L;
}

public Long getValue() {
    return intermediateCount;
}

By default, Pig passes accumulate 20,000 records at once. You can modify this value by setting the property pig.accumulative.batchsize either on the command line or using set in your script.

As mentioned earlier, one major class of methods that can use the accumulator are those that require sorted input, such as session analysis. Usually such a UDF will want records within the group sorted by timestamp. As an example, let’s say you have log data from your web servers that includes the user ID, timestamp, and the URL the user viewed, and you want to do session analysis on this data:

logs    = load 'serverlogs' as (id:chararray, ts: long, url: chararray);
byuser  = group logs by id;
results = foreach byuser {
            sorted = order logs by ts;
            generate group, SessionAnalysis(sorted);
};

Pig can move the sort done by the order statement to Hadoop, to be done as part of the shuffle phase. Thus, Pig is still able to read a subset of records at a time from Hadoop and pass those directly to SessionAnalysis. This important optimization allows accumulator UDFs to work with sorted data.

Whenever possible, Pig will choose to use the algebraic implementation of a UDF over the accumulator. This is because the accumulator helps avoid spilling records to disk, but it does not reduce network cost or help balance the reducers. If all UDFs in a foreach implement Accumulator and at least one does not implement Algebraic, Pig will use the accumulator. If at least one does not use the accumulator, Pig will not use the accumulator. This is because Pig already has to read the entire bag into memory to pass to the UDF that does not implement the accumulator, so there is no longer any value in the accumulator.

Python UDFs

Pig and Hadoop are implemented in Java, so Java is a natural choice for UDFs as well. But not being forced into Java would be nice. For simple UDFs of only a few lines, the cycle of write, compile, package into a JAR, and deploy is an especially heavyweight process. To allow users to write UDFs in scripting languages, we added support for UDFs in Python to Pig 0.8. We did it in such a way that supporting any scripting language that compiles down to the JVM requires only a few hundred lines of code. We hope to keep expanding the supported languages in the future.

Python UDFs consist of a single function that is used in place of the exec method of a Java function. They can be annotated to indicate their schema. The more advanced features of evaluation functions—such as overloading, constructor arguments, and algebraic and accumulator interfaces—are not available yet.

Python UDFs are executed using the Jython framework. The benefit is that Python UDFs can be compiled to Java bytecode and run with relatively little performance penalty. The downside is that Jython is compatible with version 2.5 of Python, so Python 3 features are not available to UDF writers.

To register and define your Python UDFs in Pig Latin, see “Registering Python UDFs”. In this section we will focus on writing the UDFs themselves. Let’s take a look at the production UDF we used in that earlier section:

# production.py
@outputSchema("production:float")
def production(slugging_pct, onbase_pct):
    return slugging_pct + onbase_pct

The code is self-explanatory. The annotation of @outputSchema tells Pig that this UDF will return a float and that the name of the field is production. The output schema annotation can specify any Pig type. The syntax for tuples and bags matches the syntax for declaring a field to be a tuple or a bag in load; see “Schemas” for details.

Sometimes schemas are variable and not statically expressible. For these cases you can provide a schema function that will define your schema. Let’s write a Python UDF that squares a number, always returning a number of the same type:

# square.py
@outputSchemaFunction("schema")
def square(num):
    return num * num


@schemaFunction("schema")
def schema(input):
    # Return whatever type we were handed
    return input

The input to the schema function is in the same format as the one specified in @outputSchema: colname:type. Its output is expected to be in the same format.

If neither @outputSchema nor @outputSchemaFunction is provided for a Python function, it will be assumed to return a single bytearray value. Because there will be no load function for the value, Pig will not be able to cast it to any other type, so it will be worthless for anything but store or dump.

In order to pass data between Java and Python, Pig must define a mapping of types. Table 10-1 describes the mapping between Pig and Python types.

Table 10-1. Pig-Python type translations
Pig type Python type
int number
long number
float number
double number
chararray string
bytearray string
map dictionary
tuple tuple
bag list of tuples

Any value that is null in Pig will be translated to the None object in Python. Similarly, any time the None object is returned by Python, Pig will map it to a null of the expected type.

One issue that Pig does not handle for your Python UDFs is bringing along dependent modules. If your Python file imports other modules, you will need to wrap those in a JAR and register that file as part of your Pig script.[27]

One last issue to consider is performance. What is the cost of using Python instead of Java? Of course it depends on your script, the computation you are doing, and your data. And because Python UDFs do not yet support advanced features such as algebraic mode, it can be harder to optimize them. Given all those caveats, tests have shown that Jython functions have a higher instantiation overhead. Once that is paid, they take about 1.2 times the amount of time as the equivalent Java functions. Due to the instantiation overhead, tests with few input lines (10,000 or so) took twice as long as their Java equivalents. These tests were run on simple functions that did almost no processing, so it is not a measure of Jython versus Java, but rather of Pig’s overhead in working with Jython.

Writing Filter Functions

Filter functions are evaluation functions that return a Boolean value. Pig does not support Boolean as a full-fledged type, so filter functions cannot appear in statements such as foreach where the results are output to another operator. However, filter functions can be used in filter statements. Consider a nearness function that, given two zip codes, returns true or false depending on whether those two zip codes are within a certain distance of each other:

/**
 * A filter UDF that determines whether two zip codes are within a given distance.
 */
public class CloseEnough extends FilterFunc {

    int distance;
    Random r = new Random();

    /*
     * @param miles - Distance in miles that two zip codes can be apart and
     * still be considered close enough.
     */
    public CloseEnough(String miles) {
        // UDFs can only take strings; convert to int here.
        distance = Integer.valueOf(miles);
    }

    public Boolean exec(Tuple input) throws IOException {
        // expect two strings
        String zip1 = (String)input.get(0);
        String zip2 = (String)input.get(1);
        // do some lookup on zip code tables
        return r.nextBoolean();
    }
}


[24] In 0.9, eval funcs can also be written in JavaScript, though this is experimental and has not yet been fully tested.

[25] Assuming there is one instance of your UDF in the script. Each reference to a UDF in a script becomes a separate instance on the backend, even if they are placed in the same map or reduce task.

[26] Recall that in Pig null means that the value is unknown, not that it is 0 or unset.

[27] Code has been checked in that allows Pig to determine the dependency tree for your Python code, fetch all the needed modules, and ship them as part of the job. As of this writing, it has not yet been released. See PIG-1824 for details.