Chapter 9. Threads

We take for granted that modern computer systems can manage many applications and operating system (OS) tasks running concurrently and make it appear that all the software is running simultaneously. While most systems today do have multiple processors and some processors can perform tricks to gain some degree of parallelism, for the most part a processor can only really handle one job at at time and what we are seeing is sleight of hand in the operating system, which juggles applications and turns its attention from one to the next so quickly that they appear to run at once.

In the old days, the unit of concurrency for such systems was the application or process. To the OS, a process was more or less a black box that decided what to do on its own. If an application required greater concurrency, it could get it only by running multiple processes and communicating between them, but this was a heavyweight approach and not very elegant. Later, the concept of threads was introduced. Threads provide fine-grained concurrency within a process under the application’s own control. Threads have existed for a long time, but have historically been tricky to use. In Java, support for threading is built into the language, making it easier to work with threads. The Java concurrency utilities address common patterns and practices in multithreaded applications and raise them to the level of tangible Java APIs. Collectively, this means that Java is a language that supports threading both natively and at a high level. It also means that Java’s APIs take full advantage of threading, so it’s important that you gain some degree of familiarity with these concepts early in your exploration of Java. Not all developers will need to write applications that explicitly use threads or concurrency, but most will use some feature that is impacted by them.

Threads are integral to the design of many Java APIs, especially those involved in client-side applications, graphics, and sound. For example, when we look at GUI programming later in this book, you’ll see that a component’s paint() method isn’t called directly by the application but rather by a separate drawing thread within the Java runtime system. At any given time, many such background threads may be performing activities in parallel with your application. On the server side, writing code that does explicit thread handling is less common and actively discouraged in the context of application servers and web applications. In those scenarios, the server environment should control the allocation of time. However, Java threads are there, servicing every request and running your application components. It’s important to understand how your code fits into that environment.

In this chapter, we’ll talk about writing applications that create and use their own threads explicitly. We’ll talk about the low-level thread support built into the Java language first and then discuss the java.util.concurrent thread utilities package in detail at the end of this chapter.

Introducing Threads

Conceptually, a thread is a flow of control within a program. A thread is similar to the more familiar notion of a process, except that threads within the same application are much more closely related and share much of the same state. It’s kind of like a golf course, which many golfers use at the same time. The threads cooperate to share a working area. They have access to the same objects, including static and instance variables, within their application. However, threads have their own copies of local variables, just as players share the golf course but do not share some personal items like clubs and balls.

Multiple threads in an application have the same problems as the golfers—in a word, synchronization. Just as you can’t have two sets of players blindly playing the same green at the same time, you can’t have several threads trying to access the same variables without some kind of coordination. Someone is bound to get hurt. A thread can reserve the right to use an object until it’s finished with its task, just as a golf party gets exclusive rights to the green until it’s done. And a thread that is more important can raise its priority, asserting its right to play through.

The devil is in the details, of course, and those details have historically made threads difficult to use. Fortunately, Java makes creating, controlling, and coordinating threads simpler by integrating some of these concepts directly into the language.

It is common to stumble over threads when you first work with them because creating a thread exercises many of your new Java skills all at once. You can avoid confusion by remembering that two players are always involved in running a thread: a Java language Thread object that represents the thread itself and an arbitrary target object that contains the method that the thread is to execute. Later, you will see that it is possible to play some sleight of hand and combine these two roles, but that special case just changes the packaging, not the relationship.

The Thread Class and the Runnable Interface

All execution in Java is associated with a Thread object, beginning with a “main” thread that is started by the Java VM to launch your application. A new thread is born when we create an instance of the java.lang.Thread class. The Thread object represents a real thread in the Java interpreter and serves as a handle for controlling and coordinating its execution. With it, we can start the thread, wait for it to complete, cause it to sleep for a time, or interrupt its activity. The constructor for the Thread class accepts information about where the thread should begin its execution. Conceptually, we would like to simply tell it what method to run, but because there are no pointers to methods in Java (not in this sense anyway), we can’t specify one directly. Instead, we have to take a short detour and use the java.lang.Runnable interface to create or mark an object that contains a “runnable” method. Runnable defines a single, general-purpose run() method:

    public interface Runnable {
         abstract public void run();
    }

Every thread begins its life by executing the run() method in a Runnable object, which is the “target object” that was passed to the thread’s constructor. The run() method can contain any code, but it must be public, take no arguments, have no return value, and throw no checked exceptions.

Any class that contains an appropriate run() method can declare that it implements the Runnable interface. An instance of this class is then a runnable object that can serve as the target of a new thread. If you don’t want to put the run() method directly in your object (and very often you don’t), you can always make an adapter class that serves as the Runnable for you. The adapter’s run() method can then call any method it wants after the thread is started. We’ll show examples of these options later.

Creating and starting threads

A newly born thread remains idle until we give it a figurative slap on the bottom by calling its start() method. The thread then wakes up and proceeds to execute the run() method of its target object. start() can be called only once in the lifetime of a thread. Once a thread starts, it continues running until the target object’s run() method returns (or throws an unchecked exception of some kind). The start() method has a sort of evil twin method called stop(), which kills the thread permanently. However, this method is deprecated and should no longer be used. We’ll explain why and give some examples of a better way to stop your threads later in this chapter. We will also look at some other methods you can use to control a thread’s progress while it is running.

Let’s look at an example. The following class, Animation, implements a run() method to drive its drawing loop:

    class Animation implements Runnable {
        boolean animate = true;

        public void run() {
            while ( animate ) {
                // draw Frames
                ...
            }
        }
    }

To use it, we create a Thread object, passing it an instance of Animation as its target object, and invoke its start() method. We can perform these steps explicitly:

    Animation happy = new Animation("Mr. Happy");
    Thread myThread = new Thread( happy );
    myThread.start();

We created an instance of our Animation class and passed it as the argument to the constructor for myThread. When we call the start() method, myThread begins to execute Animation’s run() method. Let the show begin!

This situation is not terribly object-oriented. More often, we want an object to handle its own threads, as shown in Figure 9-1, which depicts a Runnable object that creates and starts its own thread. We’ll show our Animation class performing these actions in its constructor, although in practice it might be better to place them in a more explicit controller method (e.g., startAnimation()):

n
    class Animation implements Runnable {
         Thread myThread;
         Animation (String name) {
             myThread = new Thread( this );
             myThread.start();
         }
         ...
    }
Interaction between Animation and its thread
Figure 9-1. Interaction between Animation and its thread

In this case, the argument that we pass to the Thread constructor is this, the current object (which is a Runnable). We keep the Thread reference in the instance variable myThread in case we want to interrupt the show or exercise some other kind of control later.

A natural-born thread

The Runnable interface lets us make an arbitrary object the target of a thread, as we did in the previous example. This is the most important general usage of the Thread class. In most situations in which you need to use threads, you’ll create a class (possibly a simple adapter class) that implements the Runnable interface.

However, we’d be remiss not to show you the other technique for creating a thread. Another design option is to make our target class a subclass of a type that is already runnable. As it turns out, the Thread class itself conveniently implements the Runnable interface; it has its own run() method, which we can override directly to do our bidding:

    class Animation extends Thread {
        boolean animate = true;

        public void run() {
            while ( animate ) {
                // draw Frames
                ...
            }
        }
    }

The skeleton of our Animation class looks much the same as before, except that our class is now a subclass of Thread. To go along with this scheme, the default constructor of the Thread class makes itself the default target—that is, by default, the Thread executes its own run() method when we call the start() method, as shown in Figure 9-2. Now our subclass can just override the run() method in the Thread class. (Thread itself defines an empty run() method.)

Animation as a subclass of Thread
Figure 9-2. Animation as a subclass of Thread

Next, we create an instance of Animation and call its start() method (which it also inherited from Thread):

    Animation bouncy = new Animation("Bouncy");
    bouncy.start();

Alternatively, we can have the Animation object start its thread when it is created, as before:

    class Animation extends Thread {

         Animation (String name) {
             start();
         }
         ...
    }

Here, our Animation object just calls its own start() method when an instance is created. (It’s probably better form to start and stop our objects explicitly after they’re created rather than starting threads as a hidden side effect of object creation, but this serves the example well.)

Subclassing Thread may seem like a convenient way to bundle a thread and its target run() method. However, this approach often isn’t the best design. If you subclass Thread to implement a thread, you are saying you need a new type of object that is a kind of Thread, which exposes all of the public API of the Thread class. While there is something satisfying about taking an object that’s primarily concerned with performing a task and making it a Thread, the actual situations where you’ll want to create a subclass of Thread should not be very common. In most cases, it is more natural to let the requirements of your program dictate the class structure and use Runnables to connect the execution and logic of your program.

Using an adapter

Finally, as we have suggested, we can build an adapter class to give us more control over how to structure the code. It is particularly convenient to create an anonymous inner class that implements Runnable and invokes an arbitrary method in our object. This almost gives the feel of starting a thread and specifying an arbitrary method to run, as if we had method pointers. For example, suppose that our Animation class provides a method called startAnimating(), which performs setup (loads the images, etc.) and then starts a thread to perform the animation. We’ll say that the actual guts of the animation loop are in a private method called drawFrames(). We could use an adapter to run drawFrames() for us:

    class Animation {

        public void startAnimating() {
            // do setup, load images, etc.
            ...
            // start a drawing thread
            Thread myThread = new Thread ( new Runnable() {
               public void run() { drawFrames(); }
            } );
            myThread.start();
        }

        private void drawFrames() {
            // do animation ...
        }
    }

In this code, the anonymous inner class implementing Runnable is generated for us by the compiler. We create a thread with this anonymous object as its target and have its run() method call our drawFrames() method. We have avoided implementing a generic run() method in our application code at the expense of generating an extra class.

Note that we could be even more terse in the previous example by simply having our anonymous inner class extend Thread rather than implement Runnable. We could also start the thread without saving a reference to it if we won’t be using it later:

    new Thread() {
       public void run() { drawFrames(); }
    }.start();

Controlling Threads

We have seen the start() method used to begin execution of a new thread. Several other instance methods let us explicitly control a thread’s execution:

  • The static Thread.sleep() method causes the currently executing thread to wait for a designated period of time, without consuming much (or possibly any) CPU time.

  • The methods wait() and join() coordinate the execution of two or more threads. We’ll discuss them in detail when we talk about thread synchronization later in this chapter.

  • The interrupt() method wakes up a thread that is sleeping in a sleep() or wait() operation or is otherwise blocked on a long I/O operation.[25]

Deprecated methods

We should also mention three deprecated thread control methods: stop(), suspend(), and resume(). The stop() method complements start(); it destroys the thread. start() and the deprecated stop() method can be called only once in the thread’s lifecycle. By contrast, the deprecated suspend() and resume() methods were used to arbitrarily pause and then restart the execution of a thread.

Although these deprecated methods still exist in the latest version of Java (and will probably be there forever), they shouldn’t be used in new code development. The problem with both stop() and suspend() is that they seize control of a thread’s execution in an uncoordinated, harsh way. This makes programming difficult; it’s not always easy for an application to anticipate and properly recover from being interrupted at an arbitrary point in its execution. Moreover, when a thread is seized using one of these methods, the Java runtime system must release all its internal locks used for thread synchronization. This can cause unexpected behavior and, in the case of suspend(), can easily lead to deadlock.

A better way to affect the execution of a thread—which requires just a bit more work on your part—is by creating some simple logic in your thread’s code to use monitor variables (flags), possibly in conjunction with the interrupt() method, which allows you to wake up a sleeping thread. In other words, you should cause your thread to stop or resume what it is doing by asking it nicely rather than by pulling the rug out from under it unexpectedly. The thread examples in this book use this technique in one way or another.

The sleep() method

We often need to tell a thread to sit idle, or “sleep,” for a fixed period of time. While a thread is asleep, or otherwise blocked from input of some kind, it doesn’t consume CPU time or compete with other threads for processing. For this, we can call the static method Thread.sleep(), which affects the currently executing thread. The call causes the thread to go idle for a specified number of milliseconds:

    try {
        // The current thread
        Thread.sleep( 1000 );
    } catch ( InterruptedException e ) {
        // someone woke us up prematurely
    }

The sleep() method may throw an InterruptedException if it is interrupted by another thread via the interrupt() method. As you see in the previous code, the thread can catch this exception and take the opportunity to perform some action—such as checking a variable to determine whether or not it should exit—or perhaps just perform some housekeeping and then go back to sleep.

The join() method

Finally, if you need to coordinate your activities with another thread by waiting for it to complete its task, you can use the join() method. Calling a thread’s join() method causes the caller to block until the target thread completes. Alternatively, you can poll the thread by calling join() with a number of milliseconds to wait. This is a very coarse form of thread synchronization. Later in this chapter, we’ll look at a much more general and powerful mechanism for coordinating thread activity: wait(), notify(), and even higher-level APIs in the java.util.concurrent package.

The interrupt() method

Earlier, we described the interrupt() method as a way to wake up a thread that is idle in a sleep(), wait(), or lengthy I/O operation. Any thread that is not running continuously (not a “hard loop”) must enter one of these states periodically and so this is intended to be a point where the thread can be flagged to stop. When a thread is interrupted, its interrupt status flag is set. This can happen at any time, whether the thread is idle or not. The thread can test this status with the isInterrupted() method. isInterrupted(boolean), another form, accepts a Boolean value indicating whether or not to clear the interrupt status. In this way, a thread can use the interrupt status as a flag and a signal.

This is indeed the prescribed functionality of the method. However, historically, this has been a weak spot, and Java implementations have had trouble getting it to work correctly in all cases. In early Java VMs (prior to version 1.1), interrupt did not work at all. More recent versions still have problems with interrupting I/O calls. By an I/O call, we mean when an application is blocked in a read() or write() method, moving bytes to or from a source such as a file or the network. In this case, Java is supposed to throw an InterruptedIOException when the interrupt() is performed. However, this has never been reliable across all Java implementations. To address this in Java 1.4, a new I/O framework (java.nio) was introduced with one of its goals being to specifically address these problems. When the thread associated with an NIO operation is interrupted, the thread wakes up and the I/O stream (called a “channel”) is automatically closed. (See Chapter 12 for more about the NIO package.)

Death of a Thread

A thread continues to execute until one of the following happens:

  • It explicitly returns from its target run() method.

  • It encounters an uncaught runtime exception.

  • The evil and nasty deprecated stop() method is called.

What happens if none of these things occurs, and the run() method for a thread never terminates? The answer is that the thread can live on, even after what is ostensibly the part of the application that created it has finished. This means we have to be aware of how our threads eventually terminate, or an application can end up leaving orphaned threads that unnecessarily consume resources or keep the application alive when it would otherwise quit.

In many cases, we really want to create background threads that do simple, periodic tasks in an application. The setDaemon() method can be used to mark a thread as a daemon thread that should be killed and discarded when no other nondaemon application threads remain. Normally, the Java interpreter continues to run until all threads have completed. But when daemon threads are the only threads still alive, the interpreter will exit.

Here’s a devilish example using daemon threads:

    class Devil extends Thread {
        Devil() {
            setDaemon( true );
            start();
        }
        public void run() {
            // perform evil tasks
        }
    }

In this example, the Devil thread sets its daemon status when it is created. If any Devil threads remain when our application is otherwise complete, the runtime system kills them for us. We don’t have to worry about cleaning them up.

Daemon threads are primarily useful in standalone Java applications and in the implementation of server frameworks, but not in component applications such as applets. Since an applet runs inside another Java application, any daemon threads it creates can continue to live until the controlling application exits—probably not the desired effect. A browser or any other application can use ThreadGroups to contain all the threads created by subsystems of an application and then clean them up if necessary.

One final note about killing threads gracefully. A very common problem new developers encounter the first time they create an application using an AWT or Swing component is that their application never exits; the Java VM seems to hang indefinitely after everything is finished. When working with graphics, Java has created an AWT thread to process input and painting events. The AWT thread is not a daemon thread, so it doesn’t exit automatically when other application threads have completed, and the developer must call System.exit() explicitly. (If you think about it, this makes sense. Because most GUI applications are event-driven and simply wait for user input, they would otherwise simply exit after their startup code completed.)

Threading an Applet

Applets are embeddable Java applications that are expected to start and stop themselves on command, possibly many times in their lifetime. A Java-enabled web browser normally starts an applet when the applet is displayed and stops it when the user moves to another page or (in theory) when the user scrolls the applet out of view. To conform to this API, we would like an applet to cease its nonessential activity when it is stopped and resume it when started again. We’ll talk about applets in Chapter 23, but it’s not really essential to know about them here. We’ll just use this as a more realistic example and as a transition to talk about our next topic, synchronization.

In this section, we will build UpdateApplet, a simple base class for an applet that maintains a thread to automatically update its display at regular intervals. UpdateApplet handles the basic creation and termination of the thread in the Applet’s start() and stop() methods:

    public class UpdateApplet extends java.applet.Applet
        implements Runnable
    {
        Thread thread;
        boolean running;
        int updateInterval = 1000;

        public void run() {
            while ( running )
            {
                repaint();
                try {
                    Thread.sleep( updateInterval );
                } catch ( InterruptedException e ) {
                    System.out.println("interrupted...");
                    return;
                }
            }
        }

        public void start() {
            System.out.println("starting...");
            if ( !running ) // naive approach
            {
                running = true;
                thread = new Thread(this);
                thread.start();
            }
        }

        public void stop() {
            System.out.println("stopping...");
            thread.interrupt();
            running = false;
        }
    }

UpdateApplet is a Runnable object that alternately sleeps and calls its repaint() method. (There’s nothing to paint, though, so running this applet is kind of boring. Later we’ll subclass it to implement a digital clock.) It has two other public methods: start() and stop(). These are methods of the Applet class we are overriding; don’t confuse them with the similarly named methods of the Thread class. These start() and stop() methods are called by the web browser or applet viewer to tell the applet when it should and should not be running.

UpdateApplet illustrates an environmentally friendly way to deal with threads in a simple applet. UpdateApplet simply dismisses its thread each time the applet is stopped and recreates it if the applet is restarted. When UpdateApplet’s start() method is called, we first check to make sure there is no currently running thread by checking the running flag. We then create one to begin our execution. When our applet is subsequently asked to stop, we set the flag indicating that it should stop and make sure the thread is awake by invoking its interrupt() method. In this way, we are sure to catch the thread either at the beginning of its next iteration or when it goes to sleep.

With UpdateApplet doing all the work for us, we can create the world’s simplest clock applet with just a few lines of code. Figure 9-3 shows our Clock.

The Clock applet
Figure 9-3. The Clock applet

Here’s the code:

    //file: Clock.java
    public class Clock extends UpdateApplet {
        public void paint( java.awt.Graphics g ) {
            g.drawString( new java.util.Date().toString(), 10, 25 );
       }
    }

The java.util.Date().toString() method creates a string that contains the current time.

Issues Lurking

Our applet seems pretty straightforward and, in fact, works as advertised. But some things in it should concern us when we’re thinking about threads. Let’s look at that quick check of the running flag before we start our new thread:

    if ( !running ) // naive approach
    {
        running = true;
        ... /* start thread */

Now, an Applet’s start() and stop() methods are guaranteed to be called in sequence and probably by the same controlling thread. As a result, this check for the existence of the running thread in start() may not seem necessary here. The stop() method should always be called before the start() method is invoked again. But, in the style of defensive programming the test seems like a good thing to do, right? That may be so, but, in general, it’s not enough to prevent bad things from happening. The test may prevent a simple case of misaligned stop() and start() calls, but the bigger question lurking here is, What happens if start() and stop() were called repeatedly or in very quick succession in a multithreaded environment? In the extreme case, it would be possible for two threads to enter the test at about the same time and there is the chance that we could end up with multiple threads started and out of our control. What is needed is a real way to gain exclusive access to a resource (our flag) for a period of time. That’s what synchronization is all about, and we’ll cover it in detail in the next section and throughout the rest of this chapter.

With synchronization, we might also consider more complex scenarios for our applet, such as keeping our thread alive but dormant while the applet is stopped. This would allow us to preserve expensive setup like network connections and clean them up later if necessary.

Synchronization

Every thread has a mind of its own. Normally, a thread goes about its business without any regard for what other threads in the application are doing. Threads may be time-sliced, which means they can run in arbitrary spurts and bursts as directed by the operating system. On a multiprocessor system, it is even possible for many different threads to be running simultaneously on different CPUs. This section is about coordinating the activities of two or more threads so that they can work together and not collide in their use of the same variables and methods (coordinating their play on the golf course).

Java provides a few simple structures for synchronizing the activities of threads. They are all based on the concept of monitors, a widely used synchronization scheme. You don’t have to know the details about how monitors work to be able to use them, but it may help you to have a picture in mind.

A monitor is essentially a lock. The lock is attached to a resource that many threads may need to access, but that should be accessed by only one thread at a time. It’s very much like a restroom with a lock on the door; if it’s unlocked, you can enter and lock the door while you are using it. If the resource is not being used, the thread can acquire the lock and access the resource. When the thread is done, it relinquishes the lock, just as you unlock the restroom door and leave it open for the next person. However, if another thread already has the lock for the resource, all other threads must wait until the current thread is done and has released the lock. This is just like when the restroom is occupied when you arrive: you have to wait until the current user is done and unlocks the door.

Fortunately, Java makes the process of synchronizing access to resources fairly easy. The language handles setting up and acquiring locks; all you need to do is specify the resources that require synchronization.

Serializing Access to Methods

The most common need for synchronization among threads in Java is to serialize their access to some resource (an object)—in other words, to make sure that only one thread at a time can manipulate an object or variable.[26] In Java, every object has an associated lock. To be more specific, every class and every instance of a class has its own lock. The synchronized keyword marks places where a thread must acquire the lock before proceeding.

For example, suppose we implemented a SpeechSynthesizer class that contains a say() method. We don’t want multiple threads calling say() at the same time because we wouldn’t be able to understand anything being said. So we mark the say() method as synchronized, which means that a thread must acquire the lock on the SpeechSynthesizer object before it can speak:

    class SpeechSynthesizer {
        synchronized void say( String words ) {
            // speak
        }
    }

Because say() is an instance method, a thread must acquire the lock on the SpeechSynthesizer instance it’s using before it can invoke the say() method. When say() has completed, it gives up the lock, which allows the next waiting thread to acquire the lock and run the method. It doesn’t matter whether the thread is owned by the SpeechSynthesizer itself or some other object; every thread must acquire the same lock, that of the SpeechSynthesizer instance. If say() were a class (static) method instead of an instance method, we could still mark it as synchronized. In this case, because no instance object is involved, the lock is on the class object itself.

Often, you want to synchronize multiple methods of the same class so that only one method modifies or examines parts of the class at a time. All static synchronized methods in a class use the same class object lock. By the same token, all instance methods in a class use the same instance object lock. In this way, Java can guarantee that only one of a set of synchronized methods is running at a time. For example, a SpreadSheet class might contain a number of instance variables that represent cell values as well as some methods that manipulate the cells in a row:

    class SpreadSheet {
        int cellA1, cellA2, cellA3;

        synchronized int sumRow() {
            return cellA1 + cellA2 + cellA3;
        }

        synchronized void setRow( int a1, int a2, int a3 ) {
            cellA1 = a1;
            cellA2 = a2;
            cellA3 = a3;
        }
        ...
    }

In this example, methods setRow() and sumRow() both access the cell values. You can see that problems might arise if one thread were changing the values of the variables in setRow() at the same moment another thread was reading the values in sumRow(). To prevent this, we have marked both methods as synchronized. When threads are synchronized, only one runs at a time. If a thread is in the middle of executing setRow() when another thread calls sumRow(), the second thread waits until the first one finishes executing setRow() before it runs sumRow(). This synchronization allows us to preserve the consistency of the SpreadSheet. The best part is that all this locking and waiting is handled by Java; it’s invisible to the programmer.

In addition to synchronizing entire methods, the synchronized keyword can be used in a special construct to guard arbitrary blocks of code. In this form, it also takes an explicit argument that specifies the object for which it is to acquire a lock:

    synchronized ( myObject ) {
        // Functionality that needs exclusive access to resources
    }

This code block can appear in any method. When it is reached, the thread has to acquire the lock on myObject before proceeding. In this way, we can synchronize methods (or parts of methods) in different classes in the same way as methods in the same class.

A synchronized instance method is, therefore, equivalent to a method with its statements synchronized on the current object. Thus:

    synchronized void myMethod () {
        ...
}

is equivalent to:

    void myMethod () {
        synchronized ( this ) {
            ...
        }
    }

Accessing class and instance Variables from Multiple Threads

In the SpreadSheet example, we guarded access to a set of instance variables with a synchronized method in order to avoid changing one of the variables while someone was reading the others. We wanted to keep them coordinated. But what about individual variable types? Do they need to be synchronized? Normally, the answer is no. Almost all operations on primitives and object reference types in Java happen atomically: that is, they are handled by the VM in one step, with no opportunity for two threads to collide. This prevents threads from looking at references while they are in the process of being accessed by other threads.

But watch out—we did say almost. If you read the Java VM specification carefully, you will see that the double and long primitive types are not guaranteed to be handled atomically. Both of these types represent 64-bit values. The problem has to do with how the Java VM’s stack handles them. It is possible that this specification will be beefed up in the future. But for now, to be strict, you should synchronize access to your double and long instance variables through accessor methods, or use the volatile keyword or an atomic wrapper class, which we’ll describe next.

Another issue, independent of the atomicity of the values, is the notion of different threads in the VM caching values for periods of time—that is, even though one thread may have changed the value, the Java VM may not be obliged to make that value appear until the VM reaches a certain state known as a “memory barrier.” While this should not be a problem in most real-world programming cases, you can address this by declaring the variable with the volatile keyword. This keyword indicates to the VM that the value may be changed by external threads and effectively synchronizes access to it automatically.

Finally, the java.util.concurrent.atomic package provides synchronized wrapper classes for all primitive types and references. These wrappers provide not only simple set() and get() operations on the values but also specialized “combo” operations, such as compareAndSet(), that work atomically and can be used to build higher-level synchronized application components. The classes in this package were designed specifically to map down to hardware-level functionality in many cases and can be very efficient. We’ll talk more about them later in this chapter.

Reentrant locking

The locks acquired by Java upon entering a synchronized method or block of code are reentrant, meaning that the thread holding onto the lock may acquire the same lock again any number of times and never blocks waiting for itself. In most cases, this means that the code behaves as you’d expect; a thread can call a synchronized method recursively and can itself call upon other synchronized methods within the same object.

The wait() and notify() Methods

With the synchronized keyword, we can serialize the execution of methods and blocks of code so that only one thread at a time can execute a synchronized item. The wait() and notify() methods of the Object class extend this capability by allowing us to explicitly coordinate the waiting and running threads. Every object in Java is a subclass of Object, so every object inherits these methods. By using wait() and notify(), a thread can effectively give up its hold on a lock at an arbitrary point and then wait for another thread to give it back before continuing. All of the coordinated activity still happens inside synchronized blocks, and still only one thread is executing at a given time.

By executing wait() from a synchronized block, a thread gives up its hold on the lock and goes to sleep. A thread might do this if it needs to wait for something to happen in another part of the application, as we’ll see shortly. Later, when the necessary event happens, the running thread calls notify() from a block synchronized on the same object. The first thread wakes up and begins trying to acquire the lock again. When the first thread manages to reacquire the lock, it continues from where it left off. However, the thread that was waiting may not get the lock immediately (or perhaps ever). It depends on when the second thread eventually releases the lock and which thread manages to snag it next. The first thread won’t wake up from the wait() unless another thread calls notify(). An overloaded version of wait(), however, allows us to specify a timeout period. If another thread doesn’t call notify() in the specified period, the waiting thread automatically wakes up.

Let’s look at a simple scenario to see what’s going on. In the following example, we’ll assume there are three threads—one waiting to execute each of the three synchronized methods of the MyThing class. We’ll call them the waiter, notifier, and related threads. Here’s a code fragment to illustrate:

    class MyThing {
        synchronized void waiterMethod() {
            // do some stuff
            wait();   // now wait for notifier to do something
            // continue where we left off
        }
        synchronized void notifierMethod() {
            // do some stuff
            notify();  // notify waiter that we've done it
            // continue doing stuff
        }
        synchronized void relatedMethod() {
            // do some related stuff
        }
        ...
    }

Let’s assume that a thread named waiter gets through the gate first and begins executing waiterMethod(). The two other threads are initially blocked when trying to acquire the lock for the MyThing object. When waiter executes the wait() method, it relinquishes its hold on the lock and goes to sleep. Now two viable threads are waiting for the lock. Which thread gets it depends on several factors, including chance and the priorities of the threads. (We’ll discuss thread scheduling in the next section.)

Let’s suppose that notifier is the next thread to acquire the lock, so it begins to run notifierMethod(). waiter continues to sleep, and related languishes, waiting for its turn. When notifier executes the call to notify(), the runtime system prods the waiter thread, effectively telling it something has changed. waiter wakes up and rejoins related in vying for the MyThing lock. It doesn’t receive the lock automatically; it just changes its state from “Leave me alone” to “I want the lock.”

At this point, notifier still owns the lock and continues to hold it until the synchronized notifierMethod() returns, or perhaps executes a wait() itself. At that point, the other two methods get to fight over the lock. waiter would like to continue executing waiterMethod() from the point where it left off, while related, which has been patient, would like to get started. We’ll let you choose your own ending for the story.

For each call to notify(), the runtime system wakes up just one thread that is asleep in a wait() call. The group of threads waiting on a lock is called the wait set. If multiple threads are waiting, Java picks a thread on an arbitrary basis, which may be implementation-dependent. The Object class also provides a notifyAll() call to wake up all waiting threads. In most cases, you’ll probably want to use notifyAll() rather than notify(). Keep in mind that notify() really means, “Hey, something related to this object has changed. The condition you are waiting for may have changed, so check it again.” In general, there is no reason to assume only one thread at a time is interested in the change or able to act upon it. Different threads might look upon whatever has changed in different ways.

Wait conditions

In general, our waiter thread is waiting for a particular condition to change, and we will want it to sit in a loop like the following:

    while ( condition != true )
         wait();

This test is called the wait condition. Other synchronized threads call notify() or notifyAll() when they have modified the environment so that the condition can be checked again. It’s important to use a loop on the wait condition to be sure that the thread has been awakened for the right reason. Threads may also use a timed version of wait() to do periodic work while checking the condition in this way. Using wait conditions like this is also an alternative to polling and sleeping, as you’ll see in the following section.

Passing Messages

We’ll next illustrate a classic interaction between two threads: a Producer and a Consumer. A producer thread creates messages and places them into a queue while a consumer reads and displays them. To be realistic, we’ll give the queue a maximum depth. And to make things really interesting, we’ll have our consumer thread be lazy and run much more slowly than the producer. This means that Producer occasionally has to stop and wait for Consumer to catch up. The Java concurrency package has a BlockingQueue interface that provides exactly this kind of functionality, but we’ll build it ourselves here using basic synchronization techniques first and then take a look at Queues and all of the collection classes in Chapter 11.

Here are the Producer and Consumer classes:

    import java.util.*;

    public class Consumer implements Runnable {
        Producer producer;

        Consumer( Producer producer ) {
            this.producer = producer;
        }

        public void run() {
            while ( true ) {
                String message = producer.getMessage();
                System.out.println("Got message: " + message);
                try {
                    Thread.sleep( 2000 );
                } catch ( InterruptedException e ) { }
            }
        }

        public static void main(String args[]) {
            Producer producer = new Producer();
            new Thread( producer ).start();
            Consumer consumer = new Consumer( producer );
            new Thread( consumer ).start();
        }
    }

    public class Producer implements Runnable{
        static final int MAXQUEUE = 5;
        private List messages = new ArrayList();

        public void run() {
            while ( true ) {
                putMessage();
                try {
                    Thread.sleep( 1000 );
                } catch ( InterruptedException e ) { }
            }
        }

        // called by Producer internally
        private synchronized void putMessage()
        {
            while ( messages.size() >= MAXQUEUE )
                try {
                    wait();
                } catch( InterruptedException e ) { }

            messages.add( new java.util.Date().toString() );
            notify();
        }

        // called by Consumer externally
        public synchronized String getMessage()
        {
            while ( messages.size() == 0 )
                try {
                    notify();
                    wait();
                } catch( InterruptedException e ) { }
            String message = (String)messages.remove(0);
            notify();
            return message;
        }
    }

For convenience, we have included a main() method in the Consumer class that runs the complete example. It creates a Consumer that is tied to a Producer and starts the two classes. You can run the example as follows:

    % java Consumer

This produces the timestamp messages created by the Producer:

    Got message: Sun Dec 19 03:35:55 CST 2006
    Got message: Sun Dec 19 03:35:56 CST 2006
    Got message: Sun Dec 19 03:35:57 CST 2006
    ...

The timestamps initially show a spacing of one second even though they appear every two seconds. Our Producer runs faster than our Consumer. Producer would like to generate a new message every second, while Consumer gets around to reading and displaying a message only every two seconds. Can you see how long it will take the message queue to fill up? What happens when it does?

Let’s look at the code. We are using a few new tools here. Producer and Consumer implement the Runnable interface, and each has a thread associated with it. The Producer and Consumer classes pass messages through an instance of a java.util.List object. We haven’t discussed the List class yet, but it is essentially a dynamic array of elements. We use this one as a queue by simply adding and removing elements in first-in, first-out order. The List has no maximum capacity of its own, but we impose one with our own check.

The important activity is in the synchronized methods: putMessage() and getMessage(). Although one of the methods is used by the Producer thread and the other by the Consumer thread, they both live in the Producer class so that we can coordinate them simply by declaring them synchronized. Here, they both implicitly use the Producer object’s lock. If the queue is empty, the Consumer blocks in a call in the Producer, waiting for another message.

Another design option would implement the getMessage() method in the Consumer class and use a synchronized code block to synchronize explicitly on the Producer object. In either case, synchronizing on the Producer enables us to have multiple Consumer objects that feed from the same Producer. We’ll do that later in this section.

putMessage()’s job is to add a new message to the queue. It can’t do this if the queue is already full, so it first checks the number of elements in messages. If there is room, it stuffs in another timestamp message. If the queue is at its limit, however, putMessage() has to wait until there’s space. In this situation, putMessage() executes a wait() and relies on the consumer to call notify() to wake it up after a message has been read. Here, we have putMessage() testing the condition in a loop. In this simple example, the test might not seem necessary; we could assume that when putMessage() wakes up, there is a free spot. However, it’s important to always test our wait condition in a loop like this when we synchronize threads because there is no other way to be certain why our thread has been awakened. Before it finishes, putMessage() calls notify() itself to prod any Consumer that might be waiting on an empty queue.

getMessage() retrieves a message for the Consumer. It enters a loop like that of putMessage(), waiting for the queue to have at least one element before proceeding. If the queue is empty, it executes a wait() and expects the Producer to call notify() when more items are available. Notice that getMessage() makes its own calls to notify(). It does this any time the queue is empty, to prod a producer that might be sleeping and also after it consumes a message, to give the producer the go-ahead to fill the queue again. These scenarios are more plausible if there are more consumers, as we’ll see next.

Let’s add another consumer to the scenario, just to make things more interesting. Most of the necessary changes are in the Consumer class; here’s the code for the modified class, now called NamedConsumer:

    public class NamedConsumer implements Runnable
    {
        Producer producer;
        String name;

        NamedConsumer(String name, Producer producer) {
            this.producer = producer;
            this.name = name;
        }

        public void run() {
            while ( true ) {
                String message = producer.getMessage();
                System.out.println(name + " got message: " + message);
                try {
                    Thread.sleep( 2000 );
                } catch ( InterruptedException e ) { }
            }
        }

        public static void main(String args[]) {
            Producer producer = new Producer();
            new Thread( producer ).start();

            NamedConsumer consumer = new NamedConsumer( "One", producer );
            new Thread( consumer ).start();
            consumer = new NamedConsumer( "Two", producer );
            new Thread( consumer ).start();
        }
    }

The NamedConsumer constructor takes a string name to identify each consumer. The run() method uses this name in the call to println() to identify which consumer received the message.

The only required modification to the Producer code is to change the notify() calls to notifyAll() calls in putMessage() and getMessage(). (We could have used notifyAll() in the first place.) Now, instead of the consumer and producer playing tag with the queue, we can have many players waiting for the condition of the queue to change. We might have a number of consumers waiting for a message, or we might have the producer waiting for a consumer to take a message. Any time the condition of the queue changes, we prod all of the waiting methods to reevaluate the situation by calling notifyAll().

Here is some sample output when two NamedConsumers are running, as in the main() method shown previously:

    One got message: Sat Mar 18 20:00:01 CST 2006
    Two got message: Sat Mar 18 20:00:02 CST 2006
    One got message: Sat Mar 18 20:00:03 CST 2006
    Two got message: Sat Mar 18 20:00:04 CST 2006
    One got message: Sat Mar 18 20:00:05 CST 2006
    Two got message: Sat Mar 18 20:00:06 CST 2006
    One got message: Sat Mar 18 20:00:07 CST 2006
    Two got message: Sat Mar 18 20:00:08 CST 2006
    ...

We see nice, orderly alternation between the two consumers as a result of the calls to sleep() in the various methods. Interesting things would happen, however, if we were to remove all calls to sleep() and let things run at full speed. The threads would compete, and their behavior would depend on whether the system is using time-slicing. On a time-sliced system, there should be a fairly random distribution between the two consumers, while on a non-time-sliced system, a single consumer could monopolize the messages. We’ll talk shortly about how threads compete for time when we discuss thread priority and scheduling.

Food for thought

Many things could be improved in this simple example. What we’ve tried to emphasize is a defensive style of programming with respect to notifications by threads. You need to rely on real-world conditions that you can test when synchronizing threads; it’s not robust to simply assume that you’ll get the right notifications in the right place at the right time. With that said, our example does generate extraneous notifications that wake up threads at times when there may not be work for them. For example, we generate notifications both when the queue is empty and when it’s full. A better design might split these cases and use two different object locks. Fortunately, most programmers won’t have to deal with issues at this level, especially because Java provides real Queues and other high-level synchronization constructs.

ThreadLocal Objects

A common issue that arises is the need to maintain some information or state on a per-thread basis. For example, we might want to carry some context with the current thread as it executes our application. Or we might simply want to have a value that is different for different threads in the same way that each thread “sees” its own local variables in a method. Java supports this through the ThreadLocal class. A ThreadLocal is an object wrapper that automatically maintains a separate value for any thread calling it. For example:

    ThreadLocal userID = new ThreadLocal();
    userID.set("Pat");  // called by thread 1
    userID.set("Bob"); // called by thread 2
    userID.get(); // thread 1 gets "Pat"
    userID.get(); // thread 2 gets "Bob"

You can use an instance of ThreadLocal anywhere you might use a static or instance variable to automatically maintain separate values for each thread. You can also extend ThreadLocal and override its initialValue() method. The ThreadLocal will then use this method to initialize its value once, the first time get() is called:

    class MyThreadLocalFactory extends ThreadLocal<Factory> {
        protected Factory initialValue() { return new MyFactory(); }
    }

ThreadLocals are implemented using a Map attached to each Thread instance, so their values will disappear when the Thread is no longer used and garbage is collected.

A useful addition in Java 7 is the ThreadLocalRandom class, which is an extension of the java.util.Random class discussed in Chapter 11. The ThreadLocalRandom class eliminates contention (waiting due to synchronization) on the random-number generator when called from different threads.

Scheduling and Priority

Java makes few guarantees about how it schedules threads. Almost all of Java’s thread scheduling is left up to the Java implementation and, to some degree, the application. Although it might have made sense (and would certainly have made many developers happier) if Java’s developers had specified a scheduling algorithm, a single algorithm isn’t necessarily suitable for all the roles that Java can play. Instead, Java’s designers put the burden on you to write robust code that works no matter the scheduling algorithm, and let the implementation tune the algorithm for the best fit.[27]

The priority rules that we describe next are carefully worded in the Java language specification to be a general guideline for thread scheduling. You should be able to rely on this behavior overall (statistically), but it is not a good idea to write code that relies on very specific features of the scheduler to work properly. You should instead use the control and synchronization tools that we have described in this chapter to coordinate your threads.[28]

Every thread has a priority value. In general, any time a thread of a higher priority than the current thread becomes runnable (is started, stops sleeping, or is notified), it preempts the lower-priority thread and begins executing. By default, threads with the same priority are scheduled round-robin, which means once a thread starts to run, it continues until it does one of the following:

  • Sleeps, by calling Thread.sleep() or wait()

  • Waits for a lock, in order to run a synchronized method

  • Blocks on I/O, for example, in a read() or accept() call

  • Explicitly yields control, by calling yield()

  • Terminates, by completing its target method or with a stop() call (deprecated)

This situation looks something like Figure 9-4.

Priority preemptive, round-robin scheduling
Figure 9-4. Priority preemptive, round-robin scheduling

Thread State

At any given time, a thread is in one of five general states that encompass its lifecycle and activities. These states are defined in the Thread.State enumeration and queried via the getState() method of the Thread class:

NEW

The thread has been created but not yet started.

RUNNABLE

The normal active state of a running thread, including the time when a thread is blocked in an I/O operation, like a read or write or network connection.

BLOCKED

The thread is blocked, waiting to enter a synchronized method or code block. This includes the time when a thread has been awakened by a notify() and is attempting to reacquire its lock after a wait().

WAITING, TIMED_WAITING

The thread is waiting for another thread via a call to wait() or join(). In the case of TIMED_WAITING, the call has a timeout.

TERMINATED

The thread has completed due to a return, an exception, or being stopped.

We can show the state of all threads in Java (in the current thread group) with the following snippet of code:

    Thread [] threads = new Thread [ 64 ]; // max threads to show
    int num = Thread.enumerate( threads );
    for( int i = 0; i < num; i++ )
       System.out.println( threads[i] +":"+ threads[i].getState() );

You will probably not use this API in general programming, but it is interesting and useful for experimenting and learning about Java threads.

Time-Slicing

In addition to prioritization, all modern systems (with the exception of some embedded and “micro” Java environments) implement thread time-slicing. In a time-sliced system, thread processing is chopped up so that each thread runs for a short period of time before the context is switched to the next thread, as shown in Figure 9-5.

Priority preemptive, time-sliced scheduling
Figure 9-5. Priority preemptive, time-sliced scheduling

Higher-priority threads still preempt lower-priority threads in this scheme. The addition of time-slicing mixes up the processing among threads of the same priority; on a multiprocessor machine, threads may even be run simultaneously. This can introduce a difference in behavior for applications that don’t use threads and synchronization properly.

Strictly speaking, because Java doesn’t guarantee time-slicing, you shouldn’t write code that relies on this type of scheduling; any software you write should function under round-robin scheduling. If you’re wondering what your particular flavor of Java does, try the following experiment:

    public class Thready {
        public static void main( String args [] ) {
            new ShowThread("Foo").start();
            new ShowThread("Bar").start();
        }

        static class ShowThread extends Thread {
            String message;

            ShowThread( String message ) {
                this.message = message;
            }
            public void run() {
                while ( true )
                    System.out.println( message );
            }
        }
    }

The Thready class starts up two ShowThread objects. ShowThread is a thread that goes into a hard loop (very bad form) and prints its message. Because we don’t specify a priority for either thread, they both inherit the priority of their creator, so they have the same priority. When you run this example, you will see how your Java implementation does its scheduling. Under a round-robin scheme, only “Foo” should be printed; “Bar” never appears. In a time-slicing implementation, you should occasionally see the “Foo” and “Bar” messages alternate (which is most likely what you will see).

Priorities

As we said before, the priorities of threads exist as a general guideline for how the implementation should allocate time among competing threads. Unfortunately, with the complexity of how Java threads are mapped to native thread implementations, you cannot rely upon the exact meaning of priorities. Instead, you should only consider them a hint to the VM.

Let’s play with the priority of our threads:

    class Thready {
        public static void main( String args [] ) {
            Thread foo = new ShowThread("Foo");
            foo.setPriority( Thread.MIN_PRIORITY );
            Thread bar = new ShowThread("Bar");
            bar.setPriority( Thread.MAX_PRIORITY );
            bar.start();
        }
    }

We would expect that with this change to our Thready class, the Bar thread would take over completely. If you run this code on the Solaris implementation of Java 5.0, that’s what happens. The same is not true on Windows or with some older versions of Java. Similarly, if you change the priorities to values other than min and max, you may not see any difference at all. The subtleties relating to priority and performance relate to how Java threads and priorities are mapped to real threads in the OS. For this reason, thread priorities should be reserved for system and framework development.

Yielding

Whenever a thread sleeps, waits, or blocks on I/O, it gives up its time slot and another thread is scheduled. As long as you don’t write methods that use hard loops, all threads should get their due. However, a thread can also signal that it is willing to give up its time voluntarily at any point with the yield() call. We can change our previous example to include a yield() on each iteration:

    ...
    static class ShowThread extends Thread {
        ...
        public void run() {
            while ( true ) {
                System.out.println( message );
                yield();
            }
        }
    }

You should see “Foo” and “Bar” messages strictly alternating. If you have threads that perform very intensive calculations or otherwise eat a lot of CPU time, you might want to find an appropriate place for them to yield control occasionally. Alternatively, you might want to drop the priority of your compute-intensive thread so that more important processing can proceed around it.

Unfortunately, the Java language specification is very weak with respect to yield(). It is another one of those things that you should consider an optimization hint rather than a guarantee. In the worst case, the runtime system may simply ignore calls to yield().

Thread Groups

The ThreadGroup class allows us to deal with threads wholesale: we can use it to arrange threads in groups and deal with the groups as a whole. A thread group can contain other thread groups in addition to individual threads, so our arrangements can be hierarchical. Thread groups are particularly useful when we want to start a task that might create many threads of its own. By assigning the task a thread group, we can later identify and control all the task’s threads. Thread groups are also the subject of restrictions that can be imposed by the Java Security Manager, so we can restrict a thread’s behavior according to its thread group. For example, we can forbid threads in a particular group from interacting with threads in other groups. This is one way web browsers can prevent threads started by Java applets from stopping important system threads.

When we create a thread, it normally becomes part of the thread group to which the currently running thread belongs. To create a new thread group of our own, we can call the constructor:

    ThreadGroup
            
    myTaskGroup = new ThreadGroup("My Task Group");

The ThreadGroup constructor takes a name, which a debugger can use to help you identify the group. (You can also assign names to the threads themselves.) Once we have a group, we can put threads in the group by supplying the ThreadGroup object as an argument to the Thread constructor:

    Thread myTask = new Thread( myTaskGroup, taskPerformer );

Here, myTaskGroup is the thread group, and taskPerformer is the target object (the Runnable object that performs the task). Any additional threads that myTask creates also belong to the myTaskGroup thread group.

Working with ThreadGroups

The ThreadGroup class exists so that you can control threads in batches. It has methods that parallel the basic Thread control methods—even the deprecated stop(), suspend(), and resume(). These methods operate on all the threads in a thread group. You can also mark a thread group as a “daemon”; a daemon thread group is automatically removed when all of its children are gone. If a thread group isn’t a daemon, you have to call destroy() in order to remove it when it is empty.

We can set the maximum priority for threads created in a thread group by calling setMaximumPriority(). Thereafter, no threads can be created in the thread group with a priority to be higher than the maximum; threads that change their priority can’t set their new priority to be higher than the maximum.

Finally, you can get a list of all threads in a group. The method activeCount() tells you how many threads are in the group; the method enumerate() gives you a list of them. We used the enumerate() method earlier when we showed the state of all threads in the default thread group using the static Thread.enumerate()method. The argument to enumerate() is an array of Threads that enumerate() fills in with the group’s threads. Both activeCount() and enumerate() operate recursively on all thread groups that are contained in the group.

Uncaught Exceptions

In Java, unchecked exceptions that are not caught by any method eventually bubble up to the run() method of the running thread and are thrown from there. By default, Java deals with these by simply printing them to the system error stream or log and terminating the thread. However, you can specify your own “catchall” behavior for these exceptions by subclassing ThreadGroup and overriding the uncaughtException() method. When an uncaught exception is generated, it is handed to this method, which can take some action or throw it again before the thread terminates.

In Java 5.0, this pattern was extended by defining an interface, Thread.UncaughtExceptionHandler, and adding both per-thread and systemwide uncaught exception handlers in addition to the per-ThreadGroup exception handler. We can handle uncaught exceptions for a single thread like this:

    Thread thread = new Thread();
    thread.setUncaughtExceptionHandler(
        new Thread.UncaughtExceptionHandler() {
            public void uncaughtException( Thread t, Throwable e ) {
                System.err.println( t + " threw exception: " + e );
            }
    } );

This example prints the exception before the thread dies. We could have set the same handler on the ThreadGroup in the same way or assigned it for all exceptions using the static Thread.setDefaultUncaughtExceptionHandler() method.

Thread Performance

The way that applications use threads and the associated costs and benefits have greatly impacted the design of many Java APIs. We will discuss some of the issues in detail in other chapters. But it is worth briefly mentioning some aspects of thread performance and how the use of threads has dictated the form and functionality of several recent Java packages.

The Cost of Synchronization

The act of acquiring locks to synchronize threads takes time, even when there is no contention. In older implementations of Java, this time could be significant. With newer VMs, it is almost negligible. However, unnecessary low-level synchronization can still slow applications by blocking threads where legitimate concurrent access otherwise could be allowed. Because of this, two important APIs, the Java Collections API and the Swing GUI API, were specifically crafted to avoid unnecessary synchronization by placing it under the developer’s control.

The java.util Collections API replaces earlier, simple Java aggregate types—namely, Vector and Hashtable—with more fully featured and, notably, unsynchronized types (List and Map). The Collections API instead defers to application code to synchronize access to collections when necessary and provides special “fail fast” functionality to help detect concurrent access and throw an exception. It also provides synchronization “wrappers” that can provide safe access in the old style. Special concurrent-access-friendly implementations of the Map and Queue collections are included as part of the java.util.concurrent package. These implementations go even further in that they are written to allow a high degree of concurrent access without any user synchronization. We’ll talk about these in Chapter 11.

The Java Swing GUI, which grew out of AWT, has taken a different approach to providing speed and safety. Swing dictates that modification of its components (with notable exceptions) must all be done by a single thread: the main event queue. Swing solves performance problems as well as nasty issues of determinism in event ordering by forcing a single super-thread to control the GUI. The application may access the event queue thread indirectly by pushing commands onto a queue through a simple interface.

Thread Resource Consumption

A fundamental pattern in Java, which will be illustrated in Chapters 12 and 13, is to start many threads to handle asynchronous external resources, such as socket connections. For maximum efficiency, a web server might be tempted to create a thread for each client connection it is servicing. With each client having its own thread, I/O operations may block and restart as needed. But as efficient as this may be in terms of throughput, it is a very inefficient use of server resources. Threads consume memory; each thread has its own “stack” for local variables, and switching between running threads (context switching) adds overhead to the CPU. While threads are relatively lightweight (in theory, it is possible to have hundreds or thousands running on a large server), at a certain point, the resources consumed by the threads themselves start defeating the purpose of starting more threads. Often, this point is reached with only a few dozen threads. Creating a thread per client is not always a scalable option.

An alternative approach is to create “thread pools” where a fixed number of threads pull tasks from a queue and return for more when they are finished. This recycling of threads makes for solid scalability, but it has historically been difficult to implement efficiently for servers in Java because stream I/O (for things like sockets) has not fully supported nonblocking operations. This changed with Java 1.4 and the introduction of the NIO (new I/O) package, java.nio. The NIO package introduced asynchronous I/O channels: nonblocking reads and writes plus the ability to “select” or test the readiness of streams for moving data. Channels can also be asynchronously closed, allowing threads to work with them gracefully. With the NIO package, it is possible to create servers with much more sophisticated, scalable thread patterns.

With Java 5.0, thread pools and job “executor” services were codified as utilities as part of the new java.util.concurrent package, meaning you don’t have to write these yourself. We’ll talk about them next when we discuss the concurrency utilities in Java.

Concurrency Utilities

So far in this chapter, we’ve demonstrated how to create and synchronize threads at a low level, using Java language primitives. The java.util.concurrent package and subpackages introduced with Java 5.0 build on this functionality, adding important threading utilities and codifying some common design patterns by supplying standard implementations. Roughly in order of generality, these areas include:

Thread-aware Collections implementations

The java.util.concurrent package augments the Java Collections API with several implementations for specific threading models. These include timed wait and blocking implementations of the Queue interface, as well as nonblocking, concurrent-access optimized implementations of the Queue and Map interfaces. The package also adds “copy on write” List and Set implementations for extremely efficient “almost always read” cases. These may sound complex, but actually cover some fairly simple cases very well. We’ll cover the Collections API in Chapter 11.

Executors

Executors run tasks, including Runnables, and abstract the concept of thread creation and pooling from the user. Executors are intended to be a high-level replacement for the idiom of creating new threads to service a series of jobs. Along with Executors, the Callable and Future interfaces are introduced, which expand upon Runnable to allow management, value return, and exception handling.

Low-level synchronization constructs

The java.util.concurrent.locks package holds a set of classes, including Lock and Condition, that parallels the Java language-level synchronization primitives and promotes them to the level of a concrete API. The locks package also adds the concept of nonexclusive reader/writer locks, allowing for greater concurrency in synchronized data access.

High-level synchronization constructs

This includes the classes CyclicBarrier, CountDownLatch, Semaphore, and Exchanger. These classes implement common synchronization patterns drawn from other languages and systems and can serve as the basis for new high-level tools.

Atomic operations (sounds very James Bond, doesn’t it?)

The java.util.concurrent.atomic package provides wrappers and utilities for atomic, “all-or-nothing” operations on primitive types and references. This includes simple combination atomic operations like testing a value before setting it and getting and incrementing a number in one operation.

With the possible exception of optimizations done by the Java VM for the atomic operations package, all of these utilities are implemented in pure Java, on top of the standard Java language synchronization constructs. This means that they are in a sense only convenience utilities and don’t truly add new capabilities to the language. Their main role is to offer standard patterns and idioms in Java threading and make them safer and more efficient to use. A good example of this is the Executor utility, which allows a user to manage a set of tasks in a predefined threading model without having to delve into creating threads at all. Higher-level APIs like this both simplify coding and allow for greater optimization of the common cases.

We’ll look at each of these areas in the remainder of this chapter, with the exception of the Collections implementations. We’ll discuss those when we cover the Java Collections APIs in Chapter 11.

Before we dive in, we should give a shout-out to Doug Lea, the author of Concurrent Programming in Java (Addison-Wesley), who led the group that added these packages to Java and is largely responsible for creating them.

Executors

In this chapter, we’ve created a lot of Threads and hopefully shown how to use them effectively. But in the grand scheme of things, threads are a fairly low-level programming tool and, without care, can be error-prone. When we recognize certain common patterns that developers reproduce over and over again using threads, it’s natural to want to elevate a pattern to the level of an API. One such related pair of patterns is the concept of an executor service that manages tasks and that of a thread pool that services tasks in an efficient way.

Thread pools have been implemented and reimplemented by vast numbers of developers in one way or another over the years and when you add in features like scheduling different threading models, they can get quite complex. To address these issues, the java.util.concurrent package includes interfaces for many default implementations of the executor pattern for common threading models. This includes sophisticated scheduling as well as asynchronous collection of results from the tasks, if they require it. In general, you can use an Executor as a replacement for creating one-off threads anywhere you need to execute Runnable objects. The advantage is that understanding and modifying the behavior of your code later is a lot easier when you work at this level.

For the simple case of running a number of tasks and watching for their completion, we can consider the base Executor interface, which executes Runnable objects for us. A convenient thing about Executor is that its companion utility class Executors is a factory for creating different kinds of Executor implementations. We’ll talk about the various types it can produce in a bit, but for now let’s use the method called newFixedThreadPool(), which, as its name suggests, returns an Executor that is implemented using a thread pool of a fixed size:

    Executor executor = Executors.newFixedThreadPool( 3 ) ; // 3 threads
     
    List<Runnable> runnables = ... ;
    for( Runnable task : runnables )
        executor.execute( task );

Here, we are submitting a number of Runnable tasks to our Executor, which executes them using a pool with a maximum of three threads. If our list contains more than three tasks, then some of them will have to wait until a thread is free to service it. So, what happens when we submit the fourth item? The Executor interface doesn’t really specify that. It’s up to the particular implementation to decide. Without specifying more about its type, we don’t know if an Executor is going to queue tasks, or if it will use a pool to service them. Some Executor implementations may block or even execute the Runnable right on the execute() call in the caller’s thread. But in this case (and for all Executor implementations created for us by the Executors factory methods), tasks are effectively put onto an unbounded queue. In the example, our loop submits all of the tasks immediately and they are queued by the executor until the three threads have serviced them.

With just a line or two of code in our example, we’ve been able to throttle the concurrency of our task list and avoid the details of constructing any threads ourselves. Later, if we decide we’d rather execute the tasks one at a time, the change is trivial (allocate just one thread!). Next, we’ll take a step up and look at manageable tasks that produce values and executors that can schedule tasks for us.

Tasks with results: Callable and Future

Because the Runnable interface was created for Threads to consume, its API doesn’t allow for direct feedback to the caller. The new Callable interface, which is effectively a replacement for Runnable, rectifies this situation by providing a call() method that both returns a result and can throw exceptions. Callable is a generic class that is parameterized by the type it returns. The following examples create a Callable that returns an integer:

    class MyCallable implements Callable<Integer> {
        public Integer call() { return 2+2; }
    }

    // or anonymously
    Callable<Integer> callable = new Callable<Integer>() {
        public Integer call() { return 2+2; }
    };

There is also a convenience method for bridging Runnables to Callables in the Executors class. It takes a Runnable and a fixed value to return as a value when it completes:

    Callable<Integer> callable = Executors.callable( runnable, 
        42 /*return value*/ );

The new Future class is used with Callable and serves as a handle to wait for and retrieve the result of the task or cancel the task before it is executed. A Future is returned by the submit() methods of an ExecutorService, which is essentially a beefed-up Executor. We’ll discuss ExecutorServices in the next section.

    Future<Integer> result = executorService.submit( callable );
    int val = result.get();  // blocks until ready

Future is also a generic interface, which is parameterized by its return type. This explains the somewhat cute name. For example, a Future<Integer> could be read as “a future integer.” Future has both blocking and timed-wait get() methods to retrieve the result when it is ready, as well as an isDone() test method and a cancel() method to stop the task if it hasn’t started yet. If the task has been cancelled, you get a CancellationException if you attempt to retrieve the result.

Enough said about these interfaces. Next, we’ll look at the ExecutorService, which uses them.

ExecutorService

Our first Executor was little more than a sinkhole for Runnables and, as we described, required knowledge of the implementation to know how it would handle tasks. By contrast, an ExecutorService is intended to be an asynchronous task handler. Instead of an execute() method, it has submit() methods that accept a Callable (or Runnable) and return immediately with a Future object that can be used to manage the task and collect the result later. In addition to that, an ExecutorService has a lifecycle defined by its shutdown() method and related methods that can be used to stop the service gracefully after tasks are completed.

ExecutorService extends Executor. In fact, all of the implementations returned by the Executors factory methods are actually ExecutorServices—including the one we used in our first example. We’ll look at these factory methods to see what kind of services are offered.

Executors offers three types of ExecutorService implementations:

newFixedThreadPool(int)

This is the classic thread pool with a specified maximum pool size and an unbounded queue for task submission. If a thread dies for some reason while handling a task, a new one will be created to replace it. Threads are never removed from the pool until the service is shut down.

newCachedThreadPool()

This pool uses an open-ended number of threads that grows and shrinks with demand. The main advantage of this service is that threads are cached for a period of time and reused, eliminating the overhead of creating new threads for short-lived tasks. Threads that are not used for one minute are removed. Tasks are submitted directly to threads; there is no real queuing.

newSingleThreadExecutor()

This ExecutorService uses a single thread to execute tasks from an unbounded queue. In this sense, it is identical to a fixed thread pool with a pool size of 1.

Let’s look at a more realistic usage of an ExecutorService, drawn from the TinyHttpd example in Chapter 13. In that chapter, we create a mini-web server to illustrate features of the networking APIs. Here, we won’t show the networking details, but we’ll implement the main request dispatching loop for the example using a thread pool executor service. (Flip to Chapter 13 to see the implementation of the Runnable client-connection handler class. That class works equally well with both examples.) Here we go:

    public class ExecutorHttpd
    {
      ExecutorService executor = Executors.newFixedThreadPool(3);

      public void start( int port ) throws IOException
      {
        final ServerSocket ss = new ServerSocket( port );
        while ( !executor.isShutdown() )
          executor.submit( new TinyHttpdConnection( ss.accept() ) );
      }

      public void shutdown() throws InterruptedException {
        executor.shutdown();
        executor.awaitTermination( 30, TimeUnit.SECONDS );
        executor.shutdownNow();
      }

      public static void main( String argv[] ) throws Exception
      {
        new ExecutorHttpd().start( Integer.parseInt(argv[0]) );
      }
    }

The ExecutorHttpd class holds an instance of a fixed thread pool ExecutorService with three threads to service client connections. In the start() method of our class, we create a ServerSocket that accepts incoming network connections. We then enter a loop that runs as long as our service is not flagged to shut down. Inside the loop, we create a new connection handler (a Runnable instance of TinyHttpdConnection) for each connection and submit it to the executor. The shutdown() method of our class illustrates a graceful termination. First, we call shutdown() on the executor, which causes the service to stop accepting new tasks and allow the currently running ones to complete. Then we wait a reasonable period of time for all web requests to finish (30 seconds), using the awaitTermination() method before trying a less graceful ending with shutdownNow(). shutdownNow() attempts to interrupt or otherwise stop threads as quickly as possible. We leave things there, but the method actually returns a List of tasks that remain hung after the attempt. Finally, we have a main() method that exercises our example by creating an instance of ExecutorHttpd on a port specified as an argument to the program.

Collective tasks

In addition to its individual task submit() methods, ExecutorService also offers a set of collective invokeAll() and invokeAny() executor methods that submit multiple tasks as a group and return results either when they are all complete or when the first one completes, respectively. With this, we could reproduce our first example using a List of Callables like this:

    List<Callable<Integer>> taskList = ...;
    ExecutorService execService = Executors.newFixedThreadPool(3);
    List<Future<Integer>> resultList = execService.invokeAll( taskList );

By contrast, the invokeAny() method returns just the first successfully completed task’s result (cancelling all the remaining unexecuted tasks):

    int result = execService.invokeAny( taskList );

Both methods also offer timed wait versions that time out after a specified period of time.

Scheduled tasks

For tasks that you’d like to run at a future time or on a periodic basis, use the ScheduledExecutorService. ScheduledExecutorService is an ExecutorService with additional “schedule” methods that take a delay for a Runnable or Callable or a period specification for a Runnable. Two additional factory methods of Executors produce scheduled executors:

    Executors.newScheduledThreadPool(int);
    Executors.newSingleThreadScheduledExecutor();

These are exactly like the similarly named methods for regular executor services, with the exception of returning a scheduled executor type.

To execute a task in the future, you specify a delay from the current time. For example:

    ScheduledExecutorService exec = Executors.newScheduledThreadPool(3);

    exec.schedule( runnable, 60, TimeUnit.SECONDS ); // run one minute in the 
                                                     // future

    // run at specified date and time
    Calendar futureDate = ...; // convertfrom calendar
    Date date = futureDate.getTime(); // to Date
    long delay  = date.getTime() - System.currentTimeMillis(); // to relative 
                                                               // millis
    exec.schedule( runnable, delay, TimeUnit.MILLISECONDS ); // run at specified 
                                                             // date

For periodic work, there are two kinds of recurring schedules—fixed delay and fixed rate. Fixed delay means that a fixed amount of time elapses between the end of the task’s execution and the beginning of the next execution. Fixed rate means that the task should begin execution at fixed time intervals, regardless of how long the task takes. The difference comes into play when the time to execute the task is long relative to the interval. The following snippet schedules a logfile cleanup to occur in 12 hours and every 12 hours thereafter:

    Runnable cleanup = new Runnable() {
        public void run() { cleanUpLogFiles(); }
    };

    long period = 12*60*60, delay = period; // seconds

    Future<?> logService = executionService.scheduleAtFixedRate(
        cleanup, delay, period, TimeUnit.SECONDS );

Because the task for periodic schedules is a Runnable, the Future object does not return a useful value (it returns null) so we don’t specify a parameter type in its generic type instantiation. The Future is still useful for cancelling the task at a later time if we wish:

    logService.cancel();

We should mention that the ScheduledExecutorService bears a great deal of similarity to the java.util.Timer class that we’ll discuss in Chapter 11, especially with regard to the periodic schedules. A java.util.Timer is always single-threaded, however.

CompletionService

A CompletionService is a lightweight queue-like frontend to an executor. The CompletionService provides submit() methods, which delegate their tasks to a particular instance of Executor, and then provides take() and poll() methods for retrieving Future results for completed tasks. Think of a CompletionService as a babysitter for the Futures, allowing you to easily gather up only completed results (as opposed to having to check each Future yourself to see which ones have finished and in what order). ExecutorCompletionService is a concrete implementation of CompletionService that takes an Executor in its constructor:

    Executor executor = Executors.newFixedThreadPool(3);
    CompletionService<Integer> completionService =
        new ExecutorCompletionService<Integer>( executor );

    completionService.submit( callable );
    completionService.submit( runnable, resultValue );

    // poll for result
    Future<Integer> result = completionService.poll();
    if ( result != null )
        // use value...

    // block, waiting for result
    Future<Integer> result = completionService.take();

The ThreadPoolExecutor implementation

At various times in this chapter, we’ve referred to the different executor services produced by the Executors factory as different implementations of ExecutorService. But these implementations are just different configurations of a single, highly flexible implementation of ExecutorService called ThreadPoolExecutorService. You can use this implementation directly if you want; it offers some additional features. The primary constructor for ThreadPoolExecutorService allows you to specify both a “core” thread pool size and a maximum size, as well as a thread timeout value for removing idle threads. The core size is a minimum number of threads which, once created, are allowed to live indefinitely. The constructor also allows you to provide the task queue (an implementation of BlockingQueue) on which new tasks are placed. This last feature allows you to govern the queuing policy yourself. You could specify a queue with a limited capacity:

        ExecutorService executorService = new ThreadPoolExecutor(
        corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, taskQueue );

The ThreadPoolExecutor implementation also has methods that allow you to change the core and maximum pool size while the service is active or to “prestart” the core threads before the service is used.

Actually, these last features bring up an interesting issue. If we know that our executor service is an implementation of ThreadPoolExecutor, we can cast it at runtime to get access to these extra methods and do things like change the pool size. This may not be what the designers of some services had in mind; in fact, it could be downright dangerous in the wrong hands. For this reason, Executors offers a number of “unconfigurable” wrapper methods that act something like the “unmodifiable” collection methods we’ll see in the Java Collections API. These methods wrap an executor service in a delegator object that does not expose the implementation to the caller:

    ExecutorService tweakable = Executors.newFixedThreadPool();
    ExecutorService safe = Executors.unconfigurableExecutorService( tweakable );

An application server might, for example, wrap a service to protect itself from individual applications modifying (intentionally or accidentally) a global service used by many applications.

Thread production

We said that the Executor pattern is a general replacement for using Threads to run simple tasks. Although Executors shield us from Thread creation, there still may be cases where we want some control over how the threads used in our various thread pool implementations are constructed or set up. For this reason and to standardize Thread production in general, the concurrency package adds an explicit, factory API for thread creation.

The ThreadFactory interface provides a newThread() method. One of these factories is used by all service implementations that create threads. All of the factory methods of Executors have an additional form that accepts an explicit ThreadFactory as an argument. You can get the default thread factory used by these with the Executors.defaultThreadFactory() method. You could supply your own ThreadFactory to perform custom setup, such as ThreadLocal values or priorities.

The Fork/Join framework

So far we’ve seen how the Java concurrency utilities can be used to manage simple parallel programming scenarios. We’ve seen that we can submit many tasks to an ExecutorService and collect result values if needed through Futures. We’ve seen that we can schedule tasks to run at specified times and with specified frequencies. We’ve seen that we can delve into the details of the pooling and control the degree of parallelism (how many threads are used) if we wish. Later in this chapter, we’ll look at APIs that help us coordinate threads so that we can do more complex jobs that require cooperation or explicit phases of operation in their data handling. In this section, we’ll look at an API that helps you coordinate tasks in another way—by helping you take “scaleable” tasks and divide them up to match the processing power available at any given time.

Let’s imagine that you have a task that performs a complex computation like rendering video or generating a complicated image. A natural place to start in parallizing it would be to divide the work for one frame or image into a fixed number of parts and feed them to an executor service. The executor service would be tuned to have as many threads as you wish to use (perhaps the same number as the number of CPUs or “cores” on your machine) and would assign each part to its own thread. If each task (each chunk of the image) requires about the same amount of work to complete and nothing else is competing for time on your computer, then this scenario is pretty optimal. We’d expect that each part of the image would be finished at about the same time and we’ll be able to stitch them all together effectively. But what if some parts of the image are dramatically harder to render than other parts? What if one chunk takes ten or a hundred or a thousand times as much CPU power as another? (Imagine how much faster it may be to render a empty part of an image, for example.) Then we may find ourselves in a situation where many of the threads sit idle, while a few threads churn away doing all of the hard work. What can we do to address this?

Well, one approach would be to simply make our tasks more finely grained. We could make our individual jobs so small that no single one could possibly monopolize a thread for long. However, when tasks can vary in degree of difficulty by many orders of magnitude, this could lead to creating a very large number of tiny tasks and would probably be very inefficient, with threads switching jobs and, even worse, moving data around to accommodate the somewhat random order in which they would service the work requests. What is needed is a way for each task to keep itself busy, but allow other tasks to help when they get overloaded. Ideally, we’d also like to minimize discontinuities in the workflow and for data-intensive tasks, to avoid giving threads jobs that require completely new data to be loaded at each turn.

The Fork/Join framework is a new API added in Java 7 that provides just this—a way for you to structure your tasks so that they can be split up as needed to keep all of the available threads busy working on data with as much continuity as possible. Specifically, the Fork/Join framework relies on tasks that can be split up recursively (i.e., divided in two or more parts, with those parts then subdivided if needed, and so on). When a task is deemed too large to be completed quickly, the task is simply split up and the (now smaller) pieces are placed into a queue for the current thread. The framework then implements what is known as a “work stealing” algorithm, allowing threads that are free to grab unstarted tasks from their neighbors’ queues. The combination of these techniques has some powerful advantages. First, it avoids unecessarily randomizing of the workload. Threads will tend to get to work on contiguous parts of their tasks as long as they stay busy. For data-intensive tasks, this may mean less loading of data across threads. However, when necessary, a thread can grab work from a neighbor. And therein comes the second advantage: by the nature of the recursive splitting of tasks, the largest/least-broken-up pieces of tasks will sit at the bottom of each thread’s queue, which is exactly where a neighbor thread will look to steal work if needed. This means that when work is snatched, it will be redistributed in the largest possible chunks, further stabilizing the workload per thread, reducing stealing operations and context switching. It’s a very clever algorithm that originated in the Cilk programming language.

To show off the Fork/Join framework, we will do some image rendering, which we’ll use as an excuse to draw some fractals! Fractals are amazing mathematical shapes that arise from relatively simple iterative processes. The one that we’ll be drawing is called the Mandelbrot set. Our Mandelbrot example code will do its drawing using the Fork/Join framework to break up the job of drawing the image to the available number of processors and keep them all busy (Figure 9-6). Before we start, a few caveats. First, we won’t give a very good explanation of the drawing part that actually calculates the fractal. In the interest of keeping the example focused on the framework, we have compacted that code down to just a few lines that are very cryptic. Please see the footnotes for a link to a proper explanation of what it is doing. Next, our example will blindly break up the image chunks until they reach a fixed minimum size. While work stealing will indeed happen between threads in this case, a better algorithm might make the determination about when to split the job based on the actual rendering performance and reduce the overhead of unecessary splitting. (We won’t have a large amount of data driving the rendering and so we’re mainly focused on keeping the threads busy rather than minimizing splitting.)

Mandelbrot Fork/Join
Figure 9-6. Mandelbrot Fork/Join

The Fork/Join framework API centers on a ForkJoinPool and various implementations of a kind of Future called a ForkJoinTask. The Fork/Join framework can be used in many different ways depending on how you wish to structure the tasks and make decisions about their division (forking) and collecting results (joining); however, we are only going to look at one common case. We will be using a kind of ForkJoinTask called RecursiveAction, which is just a ForkJoinTask that returns no value. We will subclass RecursiveAction with our MandelbrotTask and implement the one required abstract method: compute(). Within the compute() method, we will simply make a decision as to whether to split up the task or do the work immediately. Here is the code:

public class Mandelbrot extends JFrame
{
    @Override public void paint( Graphics g ) {
        BufferedImage image = new BufferedImage( getWidth(), getHeight(), 
            BufferedImage.TYPE_INT_RGB );
        ForkJoinPool pool = new ForkJoinPool(); // defaults thread per processor
        pool.invoke( new MandelbrotTask( image, 0, image.getWidth()-1, 0, 
            image.getHeight()-1 ) );
        g.drawImage( image, 0, 0, null );
    }

    public static void main( String[] args ) {
        Mandelbrot mandy = new Mandelbrot();
        mandy.setSize( 768, 768 );
        mandy.setVisible( true );
    }
}

class MandelbrotTask extends RecursiveAction
{
    private static double size = 3.0, offsetX = -0.7, thresholdSq = 100;
    private static int maxIterations = 30;
    private BufferedImage image;
    private int xStart, xEnd, yStart, yEnd;
    private static int taskSplitThreshold = 1024;

    MandelbrotTask( BufferedImage image, int xStart, int xEnd, int yStart, 
        int yEnd ) {

        this.image = image;

        this.xStart = xStart;
        this.xEnd = xEnd;
        this.yStart = yStart;
        this.yEnd = yEnd;
    }

    public void render()
    {
        for ( int x = xStart; x <= xEnd; x++ ) {
            for ( int y = yStart; y <= yEnd; y++ )  {
                double r = x * size / image.getWidth() -size/2 + offsetX;
                double i = y * size / image.getHeight() -size/2;
                double zr=0, zi=0;
                int iter;
                for ( iter = 0; iter < maxIterations; iter++ ) {
                    double nzr = zr*zr - zi*zi + r;
                    double nzi = 2*zr*zi + i;
                    if ( nzr*nzr + nzi*nzi > thresholdSq ) { break; }
                    zr = nzr; zi=nzi;
                }
                image.setRGB( x, y, Color.HSBtoRGB( 0.5f * iter / maxIterations,
                    1.0f, 1.0f) );
            }
        }
    }

    @Override protected void compute()
    {
        int width = xEnd-xStart,  height = yEnd-yStart;
        if ( width*height < taskSplitThreshold ) {
            render();
        } else {
            invokeAll(
                new MandelbrotTask( image, xStart, xStart+width/2, yStart,
                    yStart+height/2 ),
                new MandelbrotTask( image, xStart+width/2+1, xEnd, yStart,
                    yStart+height/2 ),
                new MandelbrotTask( image, xStart, xStart+width/2, 
                    yStart+height/2+1, yEnd ),
                new MandelbrotTask( image, xStart+width/2+1, xEnd, 
                    yStart+height/2+1, yEnd )
            );
        }
    }
}

Try running the example and then dragging the window out to different sizes. Watch how it redraws as the window is dragged out to a large size. The fractal is generated by treating each point in the image as a complex number (a two-dimensional number) and applying a simple formula to it repeatedly: Z=Z2+C, where Z is initially zero and C is related to the coordinate of the point. Then we color-code the point based on how fast that value grows. In some areas of the image, the values will grow quickly and we’ll stop iterating on them; in other areas, we’ll go on until we reach a number (maxIterations) of iterations. This means that some regions will take longer than others to generate and some threads will therefore steal work from others.

The main() method of the Mandelbrot class creates the main window, a JFrame, for us. (We saw some simple GUI programming in the introduction to the book and we’ll return to it in Chapter 16 when we talk about Swing.) The main thing that we need to know here is that the JFrame’s paint() method is displaying a buffered image and our various MandelbrotTasks are competing to render small rectangles of that image.

When the paint() method is invoked to redraw the screen, it creates a new ForkJoinPool and constructs a single MandelbrotTask. The MandelbrotTask encapsulates knowledge about a region of the image to draw—initially the entire image—and contains the method to render it. The initial MandelbrotTask is passed to the ForkJoinPool’s invoke()method, which is a blocking form of the submit method that will wait for the task to complete before returning. The paint() method will then draw the fully rendered image. As you can see, from the point of view of the paint() method, it has prescribed one task for the entire image and simply asked the ForkJoinPool to invoke it. All of the recursive division of labor is handled by the task in cooperation with the framework.

Within the MandelbrotTask’s compute() method, we check to see how many pixels the task is being asked to render. If the number of pixels exceeds a specified threshold, we split the region into four quadrants and create a new MandelbrotTask for each of them. The four subtasks are then passed to the inherited invokeAll() method, which executes them and waits for all of them to complete before moving on (it effectively performs a join operation on them). If the number of pixels is under the threshold, the compute() method directly invokes the render() method to generate the fractal for that small portion of the image.

In our case, the division of tasks will proceed until the threshold has been reached and each of the threads in the pool is busy rendering regions of the screen. Then the tree of tasks will collapse back up, with each subdivided MandelbrotTask returning from its invokeAll() method invocation until the initial, top-level task is completed.

One last thing before we move on: an exercise for you if you are really interested in this topic. If you would like to visualize which threads are drawing which regions, you can do the following purely as an experiment: within the render() method, look up the name of the currently executing thread with the ThreadgetName() method. While this name will not be meaningful, it will be unique to a thread. Use a HashMap to assign that name a unique number and map it to that number each time you see it. Then use that number to determine the color of the rendered pixel instead of the fractal logic (or combine them to add a little tint or shade). This will allow you to see which threads are rendering which patches of the screen. On a fast system, this may not be very interesting, but if you stress the rendering by dragging the image to a very large size you should see some variations.

Locks

The java.util.concurrent.locks package holds classes that mimic and expand upon the built-in Java language synchronization primitives, adding “read/write” locks among other things. As we mentioned, these classes are utilities written in Java and don’t strictly add anything new to the language semantics. However, they do provide more flexible usage at the expense of some of the built-in safety of Java language synchronization.

At the heart of the locks package are the Lock and Condition interfaces. Lock represents the same concept as a Java lock (monitor) that is associated with each object and class for use with synchronized methods and blocks. The Lock class provides for exclusive access by the owner of the lock by allowing only one party to hold the lock at a time through the lock() and unlock() methods. In Java language synchronization, this is accomplished implicitly with the synchronized keyword:

    // synchronized method
    synchronized void writeData() { ... }

    // synchronized block
    synchronized ( someObject ) {
      ...
    }

Upon entry to the synchronized method or block, Java acquires the lock and automatically releases it upon exiting. Even if an exception is thrown or the thread dies unexpectedly, Java automatically releases all of the locks it acquired. Using the Lock class instead requires us (or allows us, depending on how you look at it) to explicitly lock when we want the resource and remember to unlock it when we are through. The locking is not tied to any particular scope, such as a single method or code block. To reproduce the effect of the synchronized method in the example, we’d use something like:

    Lock lock = new ReentrantLock();

    // method or block
    lock.lock();
    try {
         // body of method or block ...
    } finally {
        lock.unlock()
    }

The first caller to lock() acquires the lock and proceeds. Subsequent calls by other threads block until the lock is released. We perform the body of our locked operation in a try/finally block. This is generally important in order to ensure that we always unlock before we leave, but you are free to implement arbitrary protocols at your own risk.

The lock implementation in this example is called ReentrantLock. The name implies that this kind of lock acts like Java locks do in that the lock is associated with the caller’s thread. The owner of a lock may reacquire (“relock”) the lock as many times as it wishes. For example, a recursive method that locks a resource upon entry is fine.

In addition to the standard-looking lock() method, the Lock interface has tryLock() methods that do not block or that block for a specified period of time in order to acquire the lock. These conditional and timed wait locking forms are something that ordinary Java locks do not provide. The ReentrantLock implementation also has a notion of “fairness” that can be turned on or off when it is constructed. When fair is on, the lock attempts to hand out the lock to callers in the order in which they request it. Normal Java locks (and the default, unfair policy of ReentrantLock) do not make this guarantee.

Read and write locks

The ReadWriteLock interface is a gateway to two different locks, one for reading and one for writing. The idea behind read/write locks is that for most resources it is OK for many “readers” to be viewing data, as long as it is not changing. Conversely, a writer of the data generally requires exclusive access to it. This is just what read/write locks do. Any number of readers may acquire the read lock as long as no write lock is in place. Only one party may hold the write lock, and no readers may hold read locks while the write lock is out. A writer may have to wait for readers to finish before acquiring the write lock, and readers may have to wait for a writer to finish before they are allowed to acquire read locks:

    ReadWriteLock rwLock = new ReentrantReadWriteLock( fair );

    // reader thread 1
    rwLock.readLock().lock();
    // reader thread 2
    rwLock.readLock().lock();

    // writer thread
    rwLock.writeLock().lock(); // blocks on threads 1 & 2

In this code snippet, two readers hold read locks while a writer blocks waiting on the write lock. When both readers have unlock()ed their read locks, the writer gains exclusive access to the lock and any subsequent readers block until the writer is finished.

The owner of a write lock can acquire a read lock, too, but not vice versa. Acquiring a read lock and then releasing the write lock is called downgrading the lock. Trying to acquire a write lock while holding a read lock (upgrading) is not allowed and causes a deadlock.

Conditions

To complete the picture of Lock as a parallel for Java language synchronization, we need an analog to the wait(), notify(), and notifyAll()mechanism. The Condition interface represents this functionality with its await(), signal(), and signalAll() methods. A Condition is associated with a Lock by the lock’s newCondition() method. Unlike a normal Java lock, a Lock may have multiple Condition objects that represent multiple wait sets of threads.

The Condition await() method is used just like the wait() method of a Java object within a synchronized block:

    Lock lock = ...
    Condition condition = lock.newCondition();
    lock.lock();
    condition.await(); // block, waiting for signal()
    lock.unlock();

    // meanwhile, in another thread...
    lock.lock();
    condition.signal();
    lock.unlock();

Like wait(), the Condition await() method can be called only when the thread is the owner of the lock associated with the condition and the signal() method may be called only by another thread that has acquired the lock. Interestingly, though, in this case, these restrictions are implementation details of the java.util.concurrent package; some other implementation of these classes could conceivably change those restrictions in some way.

With the exception of the new reader/writer locks and some timed wait lock methods, it may not seem that the Locks package adds a great deal to Java. However, if you delve into it deeper, you’ll find that it’s also a toolkit for building new kinds of synchronization primitives and higher-level constructs. The locks package opens up a concrete implementation of Java’s synchronization mechanism for all to tinker with and extend. A brief look at the implementation classes reveals nifty methods like getOwner() to tell you which thread owns a lock or getReadLockCount() to tell you how many readers are working on your data. Lots of things are possible with an open implementation like this, including specialized synchronization packages that do things like automatically detect deadlocks or tune themselves based on external information. There may also be cases where using the explicit lock API provided by this package performs better than language-level synchronization. But that probably doesn’t justify the additional burden on developers except in special cases. Next, we’ll move up a bit and look at some higher-level synchronization tools.

Synchronization Constructs

The java.util.concurrent package adds several higher-level synchronization utilities borrowed from other languages, including CountDownLatch, Semaphore, CyclicBarrier, and Exchanger.

CountDownLatch

The CountDownLatch is a very simple synchronization utility that allows any number of threads to block, waiting for a countdown value to reach 0 before being “released” to continue their activities. The CountDownLatch is initialized with the count when constructed. Thereafter, threads may block using the await() method or block for a limited period of time using the timed wait version of await(). Any running thread may decrement the counter at any time, whether threads are blocked or not. Once the counter reaches 0, all waiting threads unblock and continue. Thereafter, any calls to await() do not block and the await() method returns false, indicating that the count has passed. The count cannot be reset.

    CountDownLatch latch = new CountDownLatch( 2 ); // count from 2

    // thread 1
    latch.await(); // blocks thread 1

    // thread 2
    latch.countDown(); // count is 1
    latch.countDown(); // count is 0, thread 1 proceeds

Countdown latches are used in a wide variety of synchronization schemes to coordinate a number of threads on one result or cause a thread to wait for a number of other threads to produce results. Later we’ll talk about a related utility, CyclicBarrier, that explicitly waits for a number of threads to synchronize in order to coordinate an action.

Semaphore

Semaphores are a very old synchronization construct that has been used in many other languages. Conceptually, a semaphore is a pool of permits—intangible permission slips to perform some activity. The semaphore is initialized with a specified number of permits. Callers can then use the acquire() and release() methods to take and return these permits. Calling acquire() when no permits are available causes the caller to block until one is released. In this way, for example, a semaphore could be used to limit access to some resource to a specified number of threads:

    int concurrentReaders = 5;
    boolean fair = true;
    Semaphore sem = new Semaphore( concurrentReaders, fair );

    Data readData() throws InterruptedException {
        sem.acquire();
        // read data ...
        sem.release();

        return data;
    }

In this code snippet, readData() effectively limits itself to five concurrent reading threads at any given time. Additional threads are blocked in the acquire() method until a permit is free. In this sense, a semaphore is vaguely like a lock with multiple owners. This is where the similarity ends, however.

In actuality, a semaphore differs from a lock in several ways. First, the “pool of permits” is really just a number. No actual value is returned by acquire() and no association is made with the acquirer of the lock. This means that any actual locking behavior is strictly cooperative (by convention in the application). It also means that “permits” can be acquired and released by different callers without respect to who actually “acquired” them. It’s really just incrementing or decrementing the number. Also, because there is no real association with an “owner,” semaphores are not reentrant in the way that real locks are. That is, if a thread calls acquire() multiple times, it simply decrements the counter multiple times. This behavior could be useful in some cases to count levels of recursion for security APIs, for example, but is not like a lock, in which one caller “owns” multiple permits. Finally, because the permits pool is really just a number, calling acquire() and release() out of sync can increase the permit pool beyond its starting point or decrement it below zero. It can even be initialized with a negative number if you wish to require releases before anyone acquires a permit.

In addition to acquire(), Semaphore has a tryAcquire() method that parallels the tryLock() method of Lock. It returns immediately, acquiring a permit if one was available and returning false otherwise. Another form of tryAcquire() accepts a timed wait period. Semaphores also have a notion of “fairness” in the ordering of acquire requests. By default, requests are not guaranteed to be ordered, but if the “fair” flag is set when the Semaphore is constructed, acquire() doles out permits in first-in-first-out (FIFO) order. The tradeoff is that ordering may impact performance a bit, depending on the implementation.

CyclicBarrier

The CyclicBarrier class is a synchronization point where a specified number of related threads meet after finishing their activities. When all of the threads have arrived, an optional, shared barrier action can be executed and then all of the threads are “released” to continue. The class is termed cyclic because it can then be used again in the case where the threads repeat their activities in this manner. CyclicBarrier is an alternative to using the join() method, which collects threads only after they have completed and returned from their run() method.

The following example, SiteTimer, accepts a number of URLs on the command line and times how long it takes to connect to each one, printing the results in sorted order. It performs the connections in parallel using a dedicated thread per site and uses a CyclicBarrier for the threads to rendezvous after each timing cycle. Then it prints the coordinated results before they begin again. This example also illustrates a number of Java features, including generics, collections, formatted printing, autoboxing, and an inner class. Although we haven’t yet discussed collections or the network portion of the example, the usage is fairly simple, and you can return to the example after reading the relevant chapters later in this book.

    import java.util.*;
    import java.util.concurrent.*;
    import java.net.*;
    import java.io.IOException;

    public class SiteTimer
    {
       CyclicBarrier barrier;
       List<Result> results = new ArrayList<Result>();

       private class Result implements Comparable<Result>
       {
          Long time;
          String site;
          Result( Long time, String site ) {
             this.time = time;
             this.site = site;
          }
          public int compareTo( Result r ) { return time.compareTo( r.time ); }
       }

       static long timeConnect( String site ) {
          long start = System.currentTimeMillis();
          try {
             new URL( site ).openConnection().connect();
          } catch ( IOException e ) {
             return -1;
          }
          return System.currentTimeMillis() - start;
       }

       void showResults() {
          Collections.sort( results );
          for( Result result : results )
             System.out.printf( "%-30.30s : %d\n", result.site, result.time );
          System.out.println("------------------");
       }

       public void start( String [] args )
       {
          Runnable showResultsAction = new Runnable() {
             public void run() {
                showResults();
                results.clear();
             } };
          barrier = new CyclicBarrier( args.length, showResultsAction );

          for ( final String site : args )
             new Thread() {
                public void run() {
                   while( true ) {
                      long time = timeConnect( site );
                      results.add( new Result( time, site ) );
                      try {
                         barrier.await();
                      } catch ( BrokenBarrierException e ) { return;
                      } catch ( InterruptedException e ) { return; }
                   }
                }
             }.start();
       }

       public static void main( String [] args ) throws IOException {
          new SiteTimer().start( args );
       }
    }

The start() method constructs the barrier, specifying the number of threads that must be present before the group has fully arrived and the action to perform when all of the threads are ready. For each site, a thread is created that loops, timing the connection to the site and adding a result object to the list before blocking on the barrier’s await() method. When all of the threads reach the await() method, the barrier action fires, printing the results. All of the threads are then released to begin the next cycle.

If any of the waiting threads is interrupted or times out (using the timed wait version of the await() method) the barrier is said to be “broken” and all of the waiting threads receive a BrokenBarrierException. In theory, the barrier can be “fixed” by calling its reset() method, but this is complicated because only one thread from the group can reset the barrier properly. A reset() while any other thread is waiting causes the barrier to be broken and the waiting threads to receive the exception again, so it is probably best to start over at this point.

One more detail: the await() method returns an integer that indicates the order in which the threads arrived at the barrier. This can be used to divide up work for the next iteration of the threads. For example, if the threads’ jobs are not identical, you could use the number to “elect” a leader thread or divide the threads into two or more groups.

Phaser

No Star Trek jokes here. Java 7 introduced a new concurrency utility called Phaser. Phaser is very similar to the CyclicBarrier except that it provides a bit more flexibility. Phaser draws its name in part from the fact that it assigns a number to each cycle of its threads (a phase number). Participating threads and bystanders can read this number to monitor activity in the barrier. In CyclicBarrier, the number of threads that are tracked by the barrier is fixed; new threads cannot join the party during its lifecycle. This differs from Phaser, where the number of participants can change over the life of the activity.

Exchanger

The Exchanger is a synchronization point for a pair of threads to exchange data items. An item of the same type is passed in each direction using the exchange() method. The first method to arrive at the Exchanger blocks, waiting for its mate. When the second method arrives, they each receive the other’s argument to the exchange() method. Any number of actual threads may be using the Exchanger; they are simply paired in some order when they arrive. Exchanger is a generic class that is parameterized by the type of object to be passed:

    Exchanger<ByteBuffer> xchange = new Exchanger<ByteBuffer>();

    // thread 1
    Buffer nextBuf = xchange.exchange( buffer1 ); // blocks

    // thread 2
    Buffer nextBuf = xchange.exchange( buffer2 );

    // buffers exchanged, both threads continue...

The Exchanger pattern is primarily useful for reusing data objects or buffers between threads, as indicated in this code snippet. Say that you have a reader thread filling buffers with data and a writer thread writing the contents of the buffers somewhere. Using an Exchanger, the reader and writer can trade a pair of buffers back and forth without creating new ones. This may seem a bit arcane, but it has applications when using the NIO advanced I/O package, which we discuss in Chapters 12 and 13.

We should note that the Exchanger is similar to the SynchronousQueue, which we’ll discuss in Chapter 11 when we cover the Collections API. The Exchanger, however, passes data in both directions, whereas SynchronousQueue simply passes elements in one direction.

Atomic Operations

The java.util.concurrent.atomic package holds an interesting set of wrapper classes for atomic, “all-or-nothing” operations on certain primitive types and reference values. An atomic operation is a kind of transaction where some sequence of events either completes or fails as a unit and there is no potential for any intermediate state to be seen. In this case, the transactions we’re talking about are very simple operations that either set or get a value, possibly in combination with a simple test or mathematical operation. There are atomic wrapper classes for the following types: Booleans, integers, and long values as well as arrays of integers and longs and object references:

    AtomicBoolean.java
    AtomicInteger.java
    AtomicIntegerArray.java
    AtomicLong.java
    AtomicLongArray.java
    AtomicReference.java
    AtomicReferenceArray.java

The AtomicBoolean class (which, by the way, has to compete with java.awt.Robot for coolest class name in Java) serves as a good example. At first glance, it seems like an oxymoron. After all, normal operations on Booleans in Java are atomic already. There is supposed to be no possible “in between” state for a Boolean to be misread by any fiesty multithreaded code (as there theoretically could be for long and double values). Instead, the usefulness of the AtomicBoolean wrapper is in its combination operations: compareAndSet() and getAndSet():

    AtomicBoolean bool = new AtomicBoolean( true );
    bool.compareAndSet( expectedValue, newValue );

The compareAndSet() method first performs a comparison to an expected value (true or false in the case of a Boolean) and only if the value matches does it assign the new value. The interesting thing is that both of these operations happen “atomically,” together. This means that there is no possibility of someone changing the value between the time that we checked it and assigned the new value. That may sound like a slim chance anyway, but it’s very important for guaranteeing the semantics of flags. For example, suppose we have a master “shutdown” switch in our application and the thread that sets it wants to perform cleanup on the way out. Using compareAndSet() to test first, we can guarantee that only one thread can possibly set the flag and perform the procedure.

The getAndSet() method simply assigns the new value and returns the old value in the same, safe way. It’s a little harder to see how this applies to a Boolean, so let’s move on to AtomicInteger and AtomicLong. These numeric types have additional arithmetic combination operations:

    int getAndIncrement()
    int getAndDecrement()
    int getAndAdd(int delta)
    int incrementAndGet()
    int decrementAndGet()
    int addAndGet(int delta)

getAndIncrement() increments the value and then returns the previous value. incrementAndGet() does the converse, returning the new value. These operations are very useful for generating unique serial numbers. For example:

    AtomicInteger serialNum = new AtomicInteger(0);

    public int nextSerialNumber() {
        return serialNum.getAndIncrement();
    }

We could have accomplished the same thing by synchronizing the method, but this is simpler and may be much faster.

Object-type references can also be wrapped for atomic operations, including compareAndSet() and getAndSet(). The AtomicReference class is generic and parameterized by the type of reference it wraps:

    AtomicReference<Node> ref = new AtomicReference<Node>( node );
    ref.compareAndSet( null, newNode );

Weak implementations

The compareAndSet() method has a strange twin named weakCompareAndSet(), which has the dubious distinction that it simply may not work when called. It is, however, nice enough to tell you when it doesn’t work by returning false. What’s the point of this? Well, by allowing this fuzziness, Java may be able to make the implementation of the weak method much faster than the “certain” one. You can loop and retry the weak method instead and it may improve performance on some architectures. This is all because the Java VM may be able to map these kinds of atomic operations all the way down to the hardware level for performance, but restrictions may apply that make it difficult to guarantee.

Field updaters

The atomic package also supplies a set of “field update” utilities for each of the types that it can wrap. These utilities use reflection (see Chapter 7) to perform the kinds of atomic operations we described previously on “naked” primitive types that are not already wrapped in their atomic wrapper classes. The field updaters work on variables in an object by name and type. The catch is that atomicity is guaranteed only with respect to other callers that use the field updaters or the regular atomic wrapper classes. No guarantees are made with respect to other threads that address the variables in arbitrary ways.

Conclusion

Java was one of the first mainstream languages to provide support for threading at the language level and is now one of the first languages to standardize high-level threading utilities and APIs as well. At this point, we’ve come to the end of our discussion of threads in Java and also, in a way, to the end of the first part of this book. In Chapters 1 through 9, we discussed the Java language: its syntax and “built-in” features. In the remainder of the book, we will focus mainly on the APIs and libraries that make up the rest of the Java platform. We will see that the real appeal of Java is the combination of this simple language married with powerful tools and standards.



[25] interrupt() has not worked consistently in all Java implementations historically.

[26] Don’t confuse the term serialize in this context with Java object serialization, which is a mechanism for making objects persistent. The underlying meaning (to place one thing after another) does apply to both, however. In the case of object serialization, the object’s data is laid out, byte for byte, in a certain order.

[27] A notable alternative to this is the real-time Java specification that defines specialized thread behavior for certain types of applications. It was developed under the Java community process and can be found at https://rtsj.dev.java.net/.

[28] Java Threads by Scott Oaks and Henry Wong (O’Reilly) includes a detailed discussion of synchronization, scheduling, and other thread-related issues.