Chapter 11. Writing Load and Store Functions

We will now consider some of the more complex and most critical parts of Pig: data input and output. Operating on huge data sets is inherently I/O-intensive. Hadoop’s massive parallelism and movement of processing to the data mitigates but does not remove this. Having efficient methods to load and store data is therefore critical. Pig provides default load and store functions for text data and for HBase, but many users find they need to write their own load and store functions to handle the data formats and storage mechanisms they use.

As with evaluation functions, the design goal for load and store functions was to make easy things easy and hard things possible. Also, we wanted to make load and store functions a thin wrapper over Hadoop’s InputFormat and OutputFormat. The intention is that once you have an input format and output format for your data, the additional work of creating and storing Pig tuples is minimal. In the same way evaluation functions were implemented, more complex features such as schema management and projection push down are done via separate interfaces to avoid cluttering the base interface. Pig’s load and store functions were completely rewritten between versions 0.6 and 0.7. This chapter will cover only the interfaces for 0.7 and later releases.

One other important design goal for load and store functions is to not assume that the input sources and output sinks are HDFS. In the examples throughout this book, A = load 'foo'; has implied that foo is a file, but there is no need for that to be the case. foo is a resource locator that makes sense to your load function. It could be an HDFS file, an HBase table, a database JDBC connection string, or a web service URL. Because reading from HDFS is the most common case, many defaults and helper functions are provided for this case.

In this chapter we will walk through writing a load function and a store function for JSON data on HDFS, JsonLoader and JsonStorage, respectively. These are located in the example code in udfs/java/com/acme/io. They use the Jackson JSON library, which is included in your Pig distribution. However, the Jackson JAR is not shipped to the backend by Pig, so when using these UDFs in your script, you will need to register the Jackson JAR in addition to the acme examples JAR:

register 'acme.jar';
register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';

These UDFs will serve as helpful examples, but they will not cover all of the functionality of load and store functions. For those sections not shown in these examples, we will look at other existing load and store functions.

Load Functions

Pig’s load function is built on top of a Hadoop InputFormat, the class that Hadoop uses to read data. InputFormat serves two purposes: it determines how input will be split between map tasks, and it provides a RecordReader that produces key-value pairs as input to those map tasks. The load function takes these key-value pairs and returns a Pig Tuple.

The base class for the load function is LoadFunc. This is an abstract class, which allows it to provide helper functions and default implementations. Many load functions will only need to extend LoadFunc.

Load functions’ operations are split between Pig’s frontend and backend. On the frontend, Pig does job planning and optimization, and load functions participate in this in several ways that we will discuss later. On the backend, load functions get each record from the RecordReader, convert it to a tuple, and pass it on to Pig’s map task. Load functions also need to be able to pass data between the frontend and backend invocations so they can maintain state.

Frontend Planning Functions

For all load functions, Pig must do three things as part of frontend planning: 1) it needs to know the input format it should use to read the data; 2) it needs to be sure that the load function understands where its data is located; and 3) it needs to know how to cast bytearrays returned from the load function.

Determining InputFormat

Pig needs to know which InputFormat to use for reading your input. It calls getInputFormat to get an instance of the input format. It gets an instance rather than the class itself so that your load function can control the instantiation: any generic parameters, constructor arguments, etc. For our example load function, this method is very simple. It uses TextInputFormat, an input format that reads text data from HDFS files:

// JsonLoader.java
public InputFormat getInputFormat() throws IOException {
    return new TextInputFormat();
}

Determining the location

Pig communicates the location string provided by the user to the load function via setLocation. So, if the load operator in Pig Latin is A = load 'input';, input is the location string. This method is called on both the frontend and backend, possibly multiple times. Thus you need to take care that this method does not do anything that will cause problems if done more than one time. Your load function should communicate the location to its input format. For example, JsonLoader passes the filename via a helper method on FileInputFormat (a superclass of TextInputFormat):

// JsonLoader.java
public void setLocation(String location, Job job) throws IOException {
    FileInputFormat.setInputPaths(job, location);
}

The Hadoop Job is passed along with the location because that is where input formats usually store their configuration information.

setLocation is called on both the frontend and backend because input formats store their location in the Job object, as shown in the preceding example. For MapReduce jobs, which always have only one input, this works. For Pig jobs, where the same input format might be used to load multiple different inputs (such as in the join or union case), one instance of the input path will overwrite another in the Job object. To work around this, Pig remembers the location in an input-specific parameter and calls setLocation again on the backend so that the input format can get itself set up properly before reading.

For files on HDFS, the location provided by the user might be relative rather than absolute. To deal with this, Pig needs to resolve these to absolute locations based on the current working directory at the time of the load. Consider the following Pig Latin:

cd /user/joe;
input1 = load 'input';
cd /user/fred;
input2 = load 'input';

These two load statements should load different files. But Pig cannot assume it understands how to turn a relative path into an absolute path, because it does not know what that input is. It could be an HDFS path, a database table name, etc. So it leaves this to the load function. Before calling setLocation, Pig passes the location string to relativeToAbsolutePath to do any necessary conversion. Because most loaders are reading from HDFS, the default implementation in LoadFunc handles the HDFS case. If your loading will never need to do this conversion, it should override this method and return the location string passed to it.

Getting the casting functions

Some Pig functions, such as PigStorage and HBaseStorage, load data by default without understanding its type information, and place the data unchanged in DataByteArray objects. At a later time, when Pig needs to cast that data to another type, it does not know how to because it does not understand how the data is represented in the bytearray. Therefore, it relies on the load function to provide a method to cast from bytearray to the appropriate type.

Pig determines which set of casting functions to use by calling getLoadCaster on the load function. This should return either null, which indicates that your load function does not expect to do any bytearray casts, or an implementation of the LoadCaster interface, which will be used to do the casts. We will look at the methods of LoadCaster in “Casting bytearrays”.

Our example loader returns null because it provides typed data based on the stored schema and, therefore, does not expect to be casting data. Any bytearrays in its data are binary data that should not be cast.

Passing Information from the Frontend to the Backend

As with evaluation functions, load functions can make use of UDFContext to pass information from frontend invocations to backend invocations. For details on UDFContext, see “UDFContext”. One significant difference between using UDFContext in evaluation and load functions is determining the instance-specific signature of the function. In evaluation functions, constructor arguments were suggested as a way to do this. For load functions, the input location usually will be the differentiating factor. However, LoadFunc does not guarantee that it will call setLocation before other methods where you might want to use UDFContext. To work around this, setUDFContextSignature is provided. It provides an instance-unique signature that you can use when calling getUDFProperties. This method is guaranteed to be called before any other methods on LoadFunc in both the frontend and backend. Your UDF can then store this signature and use it when getting its property object:

// JsonLoader.java
private String udfcSignature = null;

public void setUDFContextSignature(String signature) {
    udfcSignature = signature;
}

setLocation is the only method in the load function that is guaranteed to be called on the frontend. It is therefore the best candidate for storing needed information to UDFContext. You might need to check that the data you are writing is available and nonnull to avoid overwriting your values when setLocation is called on the backend.

Backend Data Reading

On the backend, your load function takes the key-value pairs produced by its input format and produces Pig Tuples.

Getting ready to read

Before reading any data, Pig gives your load function a chance to set itself up by calling prepareToRead. This is called in each map task and passes a copy of the RecordReader, which your load function will need later to read records from the input. RecordReader is a class that InputFormat uses to read records from an input split. Pig obtains the record reader it passes to prepareToRead by calling getRecordReader on the input format that your store function returned from getInputFormat. Pig also passes an instance of the PigSplit that contains the Hadoop InputSplit corresponding to the partition of input this instance of your load function will read. If you need split-specific information, you can get it from here.

Our example loader, beyond storing the record reader, also reads the schema file that was stored into UDFContext in the frontend so that it knows how to parse the input file. Notice how it uses the signature passed in setUDFContextSignature to access the appropriate properties object. Finally, it creates a JsonFactory object that is used to generate a parser for each line:

// JsonLoader.java
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
    this.reader = reader;
        
    // Get the schema string from the UDFContext object.
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p =
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    String strSchema = p.getProperty("pig.jsonloader.schema");
    if (strSchema == null) {
        throw new IOException("Could not find schema in UDF context");
    }

    // Parse the schema from the string stored in the properties object.
    ResourceSchema schema =
        new ResourceSchema(Utils.getSchemaFromString(strSchema));
    fields = schema.getFields();

    jsonFactory = new JsonFactory();
}

Reading records

Now we have reached the meat of your load function, reading records from its record reader and returning tuples to Pig. Pig will call getNext and place the resulting tuple into its processing pipeline. It will keep doing this until getNext returns a null, which indicates that the input for this split has been fully read.

Pig does not copy the tuple that results from this method, but instead feeds it directly to its pipeline to avoid the copy overhead. This means this method cannot reuse objects, and instead must create a new tuple and contents for each record it reads. On the other hand, record readers may choose to reuse their key and value objects from record to record; most standard implementations do. So, before writing a loader that tries to be efficient and wraps the keys and values from the record reader directly into the tuple to avoid a copy, you must make sure you understand how the record reader is managing its data.

For information on creating the appropriate Java objects when constructing tuples for Pig, see “Interacting with Pig values”.

Our sample load function’s implementation of getNext reads the value from the Hadoop record (the key is ignored), constructs a JsonParser to parse it, parses the fields, and returns the resulting tuple. If there are parse errors, it does not throw an exception. Instead, it returns a tuple with null fields where the data could not be parsed. This prevents bad lines from causing the whole job to fail. Warnings are issued so that users can see which records were ignored:

// JsonLoader.java
public Tuple getNext() throws IOException {
    Text val = null;
    try {
        // Read the next key-value pair from the record reader.  If it's
        // finished, return null.
        if (!reader.nextKeyValue()) return null;

        // Get the current value.  We don't use the key.
        val = (Text)reader.getCurrentValue();
    } catch (InterruptedException ie) {
        throw new IOException(ie);
    }
    // Create a parser specific for this input line.  This might not be the
    // most efficient approach.
    ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes());
    JsonParser p = jsonFactory.createJsonParser(bais);

    // Create the tuple we will be returning.  We create it with the right
    // number of fields, as the Tuple object is optimized for this case.
    Tuple t = tupleFactory.newTuple(fields.length);

    // Read the start object marker.  Throughout this file if the parsing
    // isn't what we expect, we return a tuple with null fields rather than
    // throwing an exception.  That way a few mangled lines don't fail the job.
    if (p.nextToken() != JsonToken.START_OBJECT) {
        log.warn("Bad record, could not find start of record " + val.toString());
        return t;
    }

    // Read each field in the record.
    for (int i = 0; i < fields.length; i++) {
        t.set(i, readField(p, fields[i], i));
    }

    if (p.nextToken() != JsonToken.END_OBJECT) {
        log.warn("Bad record, could not find end of record " +
            val.toString());
        return t;
    }
    p.close();
    return t;
}

private Object readField(JsonParser p,
                         ResourceFieldSchema field,
                         int fieldnum) throws IOException {
    // Read the next token.
    JsonToken tok = p.nextToken();
    if (tok == null) {
        log.warn("Early termination of record, expected " + fields.length
            + " fields bug found " + fieldnum);
        return null;
    }   

    // Check to see if this value was null.
    if (tok == JsonToken.VALUE_NULL) return null;

    // Read based on our expected type.
    switch (field.getType()) {
    case DataType.INTEGER:
        // Read the field name.
        p.nextToken();
        return p.getValueAsInt();

    case DataType.LONG:
        p.nextToken();
        return p.getValueAsLong();

    case DataType.FLOAT: 
        p.nextToken();
        return (float)p.getValueAsDouble();

    case DataType.DOUBLE: 
        p.nextToken();
        return p.getValueAsDouble();

    case DataType.BYTEARRAY:
        p.nextToken();
        byte[] b = p.getBinaryValue();
        // Use the DBA constructor that copies the bytes so that we own
        // the memory.
        return new DataByteArray(b, 0, b.length);

    case DataType.CHARARRAY:
        p.nextToken();
        return p.getText();

    case DataType.MAP:
        // Should be a start of the map object.
        if (p.nextToken() != JsonToken.START_OBJECT) {
            log.warn("Bad map field, could not find start of object, field "
                + fieldnum);
            return null;
        }
        Map<String, String> m = new HashMap<String, String>();
        while (p.nextToken() != JsonToken.END_OBJECT) {
            String k = p.getCurrentName();
            String v = p.getText();
            m.put(k, v);
        }
        return m;

    case DataType.TUPLE:
        if (p.nextToken() != JsonToken.START_OBJECT) {
            log.warn("Bad tuple field, could not find start of object, "
                + "field " + fieldnum);
            return null;
        }

        ResourceSchema s = field.getSchema();
        ResourceFieldSchema[] fs = s.getFields();
        Tuple t = tupleFactory.newTuple(fs.length);

        for (int j = 0; j < fs.length; j++) {
            t.set(j, readField(p, fs[j], j));
        }

        if (p.nextToken() != JsonToken.END_OBJECT) {
            log.warn("Bad tuple field, could not find end of object, "
                + "field " + fieldnum);
            return null;
        }
        return t;

    case DataType.BAG:
        if (p.nextToken() != JsonToken.START_ARRAY) {
            log.warn("Bad bag field, could not find start of array, "
                + "field " + fieldnum);
            return null;
        }

        s = field.getSchema();
        fs = s.getFields();
        // Drill down the next level to the tuple's schema.
        s = fs[0].getSchema();
        fs = s.getFields();

        DataBag bag = bagFactory.newDefaultBag();

        JsonToken innerTok;
        while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {
            if (innerTok != JsonToken.START_OBJECT) {
                log.warn("Bad bag tuple field, could not find start of "
                    + "object, field " + fieldnum);
                return null;
            }

            t = tupleFactory.newTuple(fs.length);
            for (int j = 0; j < fs.length; j++) {
                t.set(j, readField(p, fs[j], j));
            }

            if (p.nextToken() != JsonToken.END_OBJECT) {
                log.warn("Bad bag tuple field, could not find end of "
                    + "object, field " + fieldnum);
                return null;
            }
            bag.add(t);
        }
        return bag;

    default:
        throw new IOException("Unknown type in input schema: " +
            field.getType());
    }

}

Additional Load Function Interfaces

Your load function can provide more complex features by implementing additional interfaces. (Implementation of these interfaces is optional.)

Loading metadata

Many data storage mechanisms can record the schema along with the data. Pig does not assume the ability to store schemas, but if your storage can hold the schema, it can be very useful. This frees script writers from needing to specify the field names and types as part of the load operator in Pig Latin. This is user-friendly and less error-prone, and avoids the need to rewrite scripts when the schema of your data changes.

Some types of data storage also partition the data. If Pig understands this partitioning, it can load only those partitions that are needed for a particular script. Both of these functions are enabled by implementing the LoadMetadata interface.

getSchema in the LoadMetadata interface gives your load function a chance to provide a schema. It is passed the location string the user provides as well as the Hadoop Job object, in case it needs information in this object to open the schema. It is expected to return a ResourceSchema, which represents the data that will be returned. ResourceSchema is very similar to the Schema class used by evaluation functions. (See “Input and Output Schemas” for details.) There is one important difference, however. In ResourceFieldSchema, the schema object associated with a bag always has one field, which is a tuple. The schema for the tuples in the bag is described by that tuple’s ResourceFieldSchema.

Our example load and store functions keep the schema in a side file[28] named _schema in HDFS. Our implementation of getSchema reads this file and also serializes the schema into UDFContext so that it is available on the backend:

// JsonLoader.java
public ResourceSchema getSchema(String location, Job job)
throws IOException {
    // Open the schema file and read the schema.
    // Get an HDFS handle.
    FileSystem fs = FileSystem.get(job.getConfiguration());
    DataInputStream in = fs.open(new Path(location + "/_schema"));
    String line = in.readLine();
    in.close();

    // Parse the schema.
    ResourceSchema s = new ResourceSchema(Utils.getSchemaFromString(line));
    if (s == null) {
        throw new IOException("Unable to parse schema found in file " +
            location + "/_schema");
    }

    // Now that we have determined the schema, store it in our
    // UDFContext properties object so we have it when we need it on the
    // backend.
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p =
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    p.setProperty("pig.jsonloader.schema", line);

    return s;
}

Once your loader implements getSchema, load statements that use your loader do not need to declare their schemas in order for the field names to be used in the script. For example, if we had data with a schema of user:chararray, age:int, gpa:double, the following Pig Latin will compile and run:

register 'acme.jar';
register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';
      
A = load 'input' using com.acme.io.JsonLoader();
B = foreach A generate user;
dump B;

LoadMetadata also includes a getStatistics method. Pig does not yet make use of statistics in job planning; this method is for future use.

Using partitions

Some types of storage partition their data, allowing you to read only the relevant sections for a given job. The LoadMetadata interface also provides methods for working with partitions in your data. In order for Pig to request the relevant partitions, it must know how the data is partitioned. Pig determines this by calling getPartitionKeys. If this returns a null or the LoadMetadata interface is not implemented by your loader, Pig will assume it needs to read the entire input.

Pig expects getPartitionKeys to return an array of strings, where each string represents one field name. Those fields are the keys used to partition the data. Pig will look for a filter statement immediately following the load statement that includes one or more of these fields. If such a statement is found, it will be passed to setPartitionFilter. If the filter includes both partition and nonpartition keys and it can be split,[29] Pig will split it and pass just the partition-key-related expression to setPartitionFilter. As an example, consider an HCatalog[30] table web_server_logs that is partitioned by two fields, date and colo:

logs    = load 'web_server_logs' using HCatLoader();
cleaned = filter logs by date = '20110614' and NotABot(user_id);
...  

Pig will call getPartitionKeys, and HCatLoader will return two key names, date and colo. Pig will find the date field in the filter statement and rewrite the filter as shown in the following example, pushing down the date = '20110614' predicate to HCatLoader via setPartitionFilter:

logs    = load 'web_server_logs' using HCatLoader();
cleaned = filter logs by NotABot(user_id);
...  

It is now up to HCatalog loader to assure that it only returns data from web_server_logs where date is 20110614.

The one exception to this is fields used in eval funcs or filter funcs. Pig assumes that loaders do not understand how to invoke UDFs, so Pig will not push these expressions.

Our example loader works on file data, so it does not implement getPartitionKeys or setPartitionFilter. For an example implementation of these methods, see the HCatalog code at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?view=markup.

Casting bytearrays

If you need to control how binary data that your loader loads is cast to other data types, you can implement the LoadCaster interface. Because this interface contains a lot of methods, implementers often implement it as a separate class. This also allows load functions to share implementations of LoadCaster, since Java does not support multiple inheritance.

The interface consists of a series of methods: bytesToInteger, bytesToLong, etc. These will be called to convert a bytearray to the appropriate type. Starting in 0.9, there are two bytesToMap methods. You should implement the one that takes a ResourceFieldSchema; the other one is for backward-compatibility. The bytesToBag, bytesToTuple, and bytesToMap methods take a ResourceFieldSchema that describes the field being converted. Calling getSchema on this object will return a schema that describes this bag, tuple, or map, if one exists. If Pig does not know the intended structure of the object, getSchema will return null. Keep in mind that the schema of the bag will be one field, a tuple, which in turn will have a schema describing the contents of that tuple.

A default load caster, Utf8StorageConverter, is provided. It handles converting UTF8-encoded text to Pig types. Scalar conversions are done in a straightforward way. Maps are expected to be surrounded by [] (square brackets), with keys separated by values with # (hash) and key-value pairs separated by , (commas). Tuples are surrounded by () (parentheses) and have fields separated by , (commas). Bags are surrounded by {} (braces) and have tuples separated by , (commas). There is no ability to escape these special characters.

Pushing down projections

Often a Pig Latin script will need to read only a few fields in the input. Some types of storage formats store their data by fields instead of by records (for example, Hive’s RCFile). For these types of formats, there is a significant performance gain to be had by loading only those fields that will be used in the script. Even for record-oriented storage formats, it can be useful to skip deserializing fields that will not be used.

As part of its optimizations, Pig analyzes Pig Latin scripts and determines what fields in an input it needs at each step in the script. It uses this information to aggressively drop fields it no longer needs. If the loader implements the LoadPushDown interface, Pig can go a step further and provide this information to the loader.

Once Pig knows the fields it needs, it assembles them in a RequiredFieldList and passes that to pushProjection. In the load function’s reply, it indicates whether it can meet the request. It responds with a RequiredFieldResponse, which is a fancy wrapper around a Boolean. If the Boolean is true, Pig will assume that only the required fields are being returned from getNext. If it is false, Pig will assume that all fields are being returned by getNext, and it will handle dropping the extra ones itself.

The RequiredField class used to describe which fields are required is slightly complex. Beyond allowing a user to specify whether a given field is required, it provides the ability to specify which subfields of that field are required. For example, for maps, certain keys can be listed as required. For tuples and bags, certain fields can be listed as required.

Load functions that implement LoadPushDown should not modify the schema object returned by getSchema. This should always be the schema of the full input. Pig will manage the translation between the schema having all of the fields and the results of getNext having only some.

Our example loader does not implement LoadPushDown. For an example of a loader that does, see HCatLoader at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?view=markup.

Store Functions

Pig’s store function is, in many ways, a mirror image of the load function. It is built on top of Hadoop’s OutputFormat. It takes Pig Tuples and creates key-value pairs that its associated output format writes to storage.

StoreFunc is an abstract class, which allows it to provide default implementations for some methods. However, some functions implement both load and store functionality; PigStorage is one example. Because Java does not support multiple inheritance, the interface StoreFuncInterface is provided. These dual load/store functions can implement this interface rather than extending StoreFunc.

Store function operations are split between the frontend and backend of Pig. Pig does planning and optimization on the frontend. Store functions have an opportunity at this time to check that a valid schema is being used and set up the storage location. On the backend, store functions take a tuple from Pig, convert it to a key-value pair, and pass it to a Hadoop RecordWriter. Store functions can pass information from frontend invocations to backend invocations via UDFContext.

Store Function Frontend Planning

Store functions have three tasks to fulfill on the frontend:

  • Instantiate the OutputFormat they will use to store data.

  • Check the schema of the data being stored.

  • Record the location where the data will be stored.

Determining OutputFormat

Pig calls getOutputFormat to get an instance of the output format that your store function will use to store records. This method returns an instance rather than the classname or the class itself. This allows your store function to control how the class is instantiated. The example store function JsonStorage uses TextOutputFormat. This is an output format that stores text data in HDFS. We have to instantiate this with a key of LongWritable and a value of Text to match the expectations of TextInputFormat:

// JsonStorage.java
public OutputFormat getOutputFormat() throws IOException {
    return new TextOutputFormat<LongWritable, Text>();
}

Setting the output location

Pig calls setStoreLocation to communicate the location string the user provides to your store function. Given the Pig Latin store Z into 'output';, output is the location string. This method, called on both the frontend and the backend, could be called multiple times; consequently, it should not have any side effects that will cause a problem if this happens. Your store function will need to communicate the location to its output format. Our example store function uses the FileOutputFormat utility function setOutputPath to do this:

// JsonStorage.java
public void setStoreLocation(String location, Job job) throws IOException {
    FileOutputFormat.setOutputPath(job, new Path(location));
}

The Hadoop Job is passed to this function as well. Most output formats store the location information in the job.

Pig calls setStoreLocation on both the frontend and backend because output formats usually store their location in the job, as we see in our example store function. This works for MapReduce jobs, where a single output format is guaranteed. But due to the split operator, Pig can have more than one instance of the same store function in a job. If multiple instances of a store function call FileOutputFormat.setOutputPath, whichever instance calls it last will overwrite the others. Pig avoids this by keeping output-specific information and calling setStoreLocation again on the backend so that it can properly configure the output format.

For HDFS files, the user might provide a relative path. Pig needs to resolve these to absolute paths using the current working directory at the time the store is called. To accomplish this, Pig calls relToAbsPathForStoreLocation with the user-provided location string before calling setStoreLocation. This method translates between relative and absolute paths. For store functions writing to HDFS, the default implementation in StoreFunc handles the conversion. If you are writing a store function that does not use file paths (e.g., HBase), you should override this method to return the string it is passed.

Checking the schema

As part of frontend planning, Pig gives your store function a chance to check the schema of the data to be stored. If you are storing data to a system that expects a certain schema for the output (such as an RDBMS) or you cannot store certain data types, this is the place to perform those checks. Oddly enough, this method returns a void rather than a Boolean. So if you detect an issue with the schema, you must throw an IOException.

Our example store function does not have limitations on the schemas it can store. However, it uses this function as a place to serialize the schema into UDFContext so that it can be used on the backend when writing data:

// JsonStorage.java

public void checkSchema(ResourceSchema s) throws IOException {
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p = 
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    p.setProperty("pig.jsonstorage.schema", s.toString());
}

Store Functions and UDFContext

Store functions work with UDFContext exactly as load functions do, but with one exception: the signature for store functions is passed to the store function via setStoreFuncUDFContextSignature. See “Passing Information from the Frontend to the Backend” for a discussion of how load functions work with UDFContext. Our example store function stores the signature in a member variable for later use:

// JsonStorage.java
public void setStoreFuncUDFContextSignature(String signature) {
    udfcSignature = signature;
} 

Writing Data

During backend processing, the store function is first initialized, and then takes Pig tuples and converts them to key-value pairs to be written to storage.

Preparing to write

Pig calls your store function’s prepareToWrite method in each map or reduce task before writing any data. This call passes a RecordWriter instance to use when writing data. RecordWriter is a class that OutputFormat uses to write individual records. Pig will get the record writer it passes to your store function by calling getRecordWriter on the output format your store function returned from getOutputFormat. Your store function will need to keep this reference so that it can be used in putNext.

The example store function JsonStorage also uses this method to read the schema out of the UDFContext. It will use this schema when storing data. Finally, it creates a JsonFactory for use in putNext:

// JsonStorage.java
public void prepareToWrite(RecordWriter writer) throws IOException {
    // Store the record writer reference so we can use it when it's time
    // to write tuples.
    this.writer = writer;

    // Get the schema string from the UDFContext object.
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p =
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    String strSchema = p.getProperty("pig.jsonstorage.schema");
    if (strSchema == null) {
        throw new IOException("Could not find schema in UDF context");
    }

    // Parse the schema from the string stored in the properties object.
    ResourceSchema schema =
        new ResourceSchema(Utils.getSchemaFromString(strSchema));
    fields = schema.getFields();

    // Build a Json factory.
    jsonFactory = new JsonFactory();
    jsonFactory.configure(
        JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}

Writing records

putNext is the core method in the store function class. Pig calls this method for every tuple it needs to store. Your store function needs to take these tuples and produce the key-value pairs that its output format expects. For information on the Java objects in which the data will be stored and how to extract them, see “Interacting with Pig values”.

JsonStorage encodes the contents of the tuple in JSON format and writes the resulting string into the value field of TextOutputFormat. The key field is left null:

// JsonStorage.java
public void putNext(Tuple t) throws IOException {
    // Build a ByteArrayOutputStream to write the JSON into.
    ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE);
    // Build the generator.
    JsonGenerator json =
        jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8);

    // Write the beginning of the top-level tuple object.
    json.writeStartObject();
    for (int i = 0; i < fields.length; i++) {
        writeField(json, fields[i], t.get(i));
    }
    json.writeEndObject();
    json.close();

    // Hand a null key and our string to Hadoop.
    try {   
        writer.write(null, new Text(baos.toByteArray()));
    } catch (InterruptedException ie) {
        throw new IOException(ie);
    }
}

private void writeField(JsonGenerator json,
                        ResourceFieldSchema field,
                        Object d) throws IOException {

    // If the field is missing or the value is null, write a null.
    if (d == null) {
        json.writeNullField(field.getName());
        return;
    }

    // Based on the field's type, write it out.
    switch (field.getType()) {
    case DataType.INTEGER:
        json.writeNumberField(field.getName(), (Integer)d);
        return;

    case DataType.LONG:
        json.writeNumberField(field.getName(), (Long)d);
        return;

    case DataType.FLOAT:
        json.writeNumberField(field.getName(), (Float)d);
        return;

    case DataType.DOUBLE:
        json.writeNumberField(field.getName(), (Double)d);
        return;

    case DataType.BYTEARRAY:
        json.writeBinaryField(field.getName(), ((DataByteArray)d).get());
        return;

    case DataType.CHARARRAY:
        json.writeStringField(field.getName(), (String)d);
        return;

    case DataType.MAP:
        json.writeFieldName(field.getName());
        json.writeStartObject();
        for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) {
            json.writeStringField(e.getKey(), e.getValue().toString());
        }
        json.writeEndObject();
        return;

    case DataType.TUPLE:
        json.writeFieldName(field.getName());
        json.writeStartObject();

        ResourceSchema s = field.getSchema();
        if (s == null) {
            throw new IOException("Schemas must be fully specified to use "
                + "this storage function.  No schema found for field " +
                field.getName());
        }
        ResourceFieldSchema[] fs = s.getFields();

        for (int j = 0; j < fs.length; j++) {
            writeField(json, fs[j], ((Tuple)d).get(j));
        }
        json.writeEndObject();
        return;

    case DataType.BAG:
        json.writeFieldName(field.getName());
        json.writeStartArray();
        s = field.getSchema();
        if (s == null) {
            throw new IOException("Schemas must be fully specified to use "
                + "this storage function.  No schema found for field " +
                field.getName());
        }
        fs = s.getFields();
        if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
            throw new IOException("Found a bag without a tuple "
                + "inside!");
        }
        // Drill down the next level to the tuple's schema.
        s = fs[0].getSchema();
        if (s == null) {
            throw new IOException("Schemas must be fully specified to use "
                + "this storage function.  No schema found for field " +
                field.getName());
        }
        fs = s.getFields();
        for (Tuple t : (DataBag)d) {
            json.writeStartObject();
            for (int j = 0; j < fs.length; j++) {
                writeField(json, fs[j], t.get(j));
            }
            json.writeEndObject();
        }
        json.writeEndArray();
        return;
    }
}

Failure Cleanup

When jobs fail after execution has started, your store function may need to clean up partially stored results. Pig will call cleanupOnFailure to give your store function an opportunity to do this. It passes the location string and the job object so that your store function knows what it should clean up. In the HDFS case, the default implementation handles removing any output files created by the store function. You need to implement this method only if you are storing data somewhere other than HDFS.

Storing Metadata

If your storage format can store schemas in addition to data, your store function can implement the interface StoreMetadata. This provides a storeSchema method that is called by Pig as part of its frontend operations. Pig passes storeSchema a ResourceSchema, the location string, and the job object so that it can connect to its storage. The ResourceSchema is very similar to the Schema class described in “Input and Output Schemas”. There is one important difference, however. In ResourceFieldSchema, the schema object associated with a bag always has one field, which is a tuple. The schema for the tuples in the bag is described by that tuple’s ResourceFieldSchema.

The example store function JsonStorage stores the schema in a side file named _schema in the same directory as the data. The schema is stored as a string, using the toString method provided by the class:

// JsonStorage.java
public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
    // Store the schema in a side file in the same directory.  MapReduce
    // does not include files starting with "_" when reading data for a job.
    FileSystem fs = FileSystem.get(job.getConfiguration());
    DataOutputStream out = fs.create(new Path(location + "/_schema"));
    out.writeBytes(schema.toString());
    out.writeByte('\n');
    out.close();
}

StoreMetadata also has a storeStatistics function, but Pig does not use this yet.



[28] A file in the same directory that is not a part file. Side files start with an underscore character. MapReduce’s FileInputFormat knows to ignore them when reading input for a job.

[29] Meaning that the filter can be broken into two filters—one that contains the partition keys and one that does not—and produce the same end result. This is possible when the expressions are connected by and but not when they are connected by or.

[30] HCatalog is a table-management service for Hadoop. It includes Pig load and store functions. See “Metadata in Hadoop” for more information on HCatalog.