Добавил:
Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:

Beginning Algorithms (2006)

.pdf
Скачиваний:
252
Добавлен:
17.08.2013
Размер:
9.67 Mб
Скачать

Chapter 4

this(queue, Integer.MAX_VALUE);

}

...

}

The BlockingQueue implements the Queue interface and holds a few instance variables. Two of the variables are pretty straightforward: the first, queue, holds a reference to the underlying queue in which the data will actually be stored; the second, _maxSize, holds the maximum allowable size of the queue. The third variable, _mutex, is the lock object described earlier.

There are also two constructors. The first takes a queue to be used for data storage and a maximum allowable size. This is the constructor that enables us to create a bounded queue. The second constructor only accepts a queue. It then calls the first constructor, passing in the largest possible integer value for the maximum queue size. Although there is still a limit, it is so large that you have effectively created an unbounded queue.

Now it’s time to look at how to go about implementing the desired behavior, starting with enqueue(). It may look a little spooky at first, but it’s really not that complicated:

public void enqueue(Object value) { synchronized (_mutex) {

while (size() == _maxSize) { waitForNotification();

}

_queue.enqueue(value); _mutex.notifyAll();

}

}

private void waitForNotification() { try {

_mutex.wait();

} catch (InterruptedException e) { // Ignore

}

}

How It Works

The first thing that enqueue does (and all other methods, for that matter) is ensure that no other threads can access the queue at the same time. In Java, this is achieved by using synchronized to obtain a lock on an object — in this case, our mutex. If another thread already has a lock, the current thread will be blocked until that thread releases its lock. Once obtained, no other threads will be able to access the queue until the current thread falls out of the synchronized block. This enables you to manipulate the underlying queue without worrying about stepping on the actions of another thread, or another thread unexpectedly manipulating the underlying queue.

Having obtained sole access to the queue, the next thing to do is ensure that the bounds are respected. If the queue is already at the maximum allowable size, you need to allow another thread the opportunity to free up some space. This is achieved in our call to the waitForNotification() method. This

84

Queues

method calls the mutex’s wait() method, effectively putting the thread to sleep. In putting the thread to sleep, you temporarily give up the lock on the queue. The only way this thread can be woken from this sleep is for another thread to call the notifyAll() method on the mutex, at which time enqueue() will regain control and try again.

Eventually, enough space becomes available and the new value is stored in the underlying queue. You then call notifyAll() on the mutex so that any other threads that might have been asleep are woken.

Try It Out

Implementing dequeue()

Implementing dequeue() is similar except that, of course, it retrieves from, rather than stores to, the queue:

public Object dequeue() throws EmptyQueueException { synchronized (_mutex) {

while (isEmpty()) { waitForNotification();

}

Object value = _queue.dequeue(); _mutex.notifyAll();

return value;

}

}

Just as was done for enqueue(), dequeue() obtains an exclusive lock to ensure that it is the only thread accessing the queue. It then waits until at least one item is available before calling dequeue() on the underlying queue.

How It Works

Again, as you did for enqueue(), once you’re done, you call notifyAll(). Because dequeue() retrieves items, you need to notify any threads that may have been blocked while calling enqueue() (such as when the queue reaches its maximum allowable size).

Try It Out

Implementing the clear() Method

The clear() method is even simpler:

public void clear() { synchronized (_mutex) {

_queue.clear();

_mutex.notifyAll();

}

}

How It Works

After first obtaining a lock in the usual manner, the underlying queue is cleared and, just as you did for dequeue(), all threads are notified in case some were blocked waiting to store items in a queue that had reached its size limit.

85

Chapter 4

Try It Out

Implementing the size() and isEmpty() Methods

Finally, here is the code for the last two methods, size() and isEmpty():

public int size() { synchronized (_mutex) {

return _queue.size();

}

}

public boolean isEmpty() { synchronized (_mutex) {

return _queue.isEmpty();

}

}

How It Works

Both of these methods simply wrap the underlying queue’s equivalent method inside some thread-safe synchronization code. In this case, however, no modification has been made to the underlying queue so there is no need to call notifyAll().

Example: A Call Center Simulator

Now it’s time to put our queues to use. This is where you get to take what you’ve learned so far and use it in a practical — if somewhat simplistic — context. You’ve already learned how queues can be used in allocating and prioritizing work, so in this section you’re going to take one of the example scenarios, a call center, and build a simulator that uses a blocking queue.

The main idea is pretty simple: Develop a system whereby calls are randomly made to a call center and thereby queued, ready to be answered by the next available customer service agent. Figure 4-4 gives you an idea of the main concepts involved.

 

Call Center

Customer

 

 

 

 

Service

 

 

Agent

 

Blocking Queue

 

Call

 

Customer

Call 3 Call 2 Call 1

Service

Generator

 

Agent

 

 

 

 

Customer

 

 

Service

 

 

Agent

Figure 4-4: High-level design for a call center simulation.

86

Queues

A call generator creates calls that are sent to a call center. The call center then stores them in a blocking queue where they wait to be answered by the next available customer service agent. As each agent completes a call, it returns to the queue and attempts to retrieve another. If there are more calls to be processed, the queue returns immediately with the next one. If, however, the queue is empty, it will block until a new call appears. In this way, a customer service agent need never worry whether there are more calls to be answered; all that logic is handled by the blocking queue.

Notice that the queue, along with the customer service agents, live within the call center. Also notice that there are multiple customer service agents, all working at the same time — just like in the real world. Because of this concurrent execution, each customer service agent needs to run in its own thread. Thankfully, our blocking queue implementation was designed specifically with multi-threading in mind; and because the queue will be the only point of thread contention, in the context of this example, there’s no need to worry about synchronizing any other parts of the application.

The simulator will be developed as a stand-alone application that will print log messages to the console as it runs so that you can see what is happening. The program enables you to run simulations under different scenarios based on the values of certain variables. These variables will be specified on the command line as follows:

Number of customer service agents

Number of calls

Maximum call duration

Maximum call interval

The number of customer service agents enables you to specify the number of threads consuming calls on the queue. The more agents (threads) you have, the faster the calls will be processed. The flip side to this is that depending on the rate of generated calls, the more threads you have, the more agents will be waiting for new calls to arrive if the queue is empty.

The number of calls determines how many calls in total to generate. This is purely a safety precaution to prevent the application from running forever. If you prefer, you can still set it to a very large number and see what happens.

The maximum call duration defines an upper limit on how long each call will take once answered. This enables you to simulate what happens when calls take longer or shorter amounts of time.

The maximum call interval defines an upper limit on how long to wait between generating each call.

The design itself is relatively straightforward — we’ve tried to keep it as simple as possible — and involves several classes in addition to using the BlockingQueue developed earlier. Each class is described fully in the next section.

Now that you have an idea of what we’re trying to achieve, it’s time to develop the application. Again, for reasons previously explained, we will forgo the usual tests and jump straight into the code. (Remember that tests are available with the downloadable source code, although we felt that an explanation within the text would confuse the issue.)

You’ll start by creating a class for each of the concepts depicted in Figure 4-4 and finish with a simple simulator application that can be run from the command line.

87

Chapter 4

So that you can monitor the behavior of a simulation as it runs, each class prints information to the console. When you run the application, you’ll see a flood of messages showing you just what is happening inside the simulator. At the end of the section, we’ve included some example output to give you an idea of what this diagnostic information looks like.

Try It Out

Creating the Call Class

The call represents a telephone call within the system. Calls are queued by a call center and subsequently answered by a customer service agent (both of which are discussed a little later):

package com.wrox.algorithms.queues;

public class Call { private final int _id;

private final int _duration; private final long _startTime;

public Call(int id, int duration) {

assert duration >= 0 : “callTime can’t be < 0”;

_id = id;

_duration = duration;

_startTime = System.currentTimeMillis();

}

public String toString() { return “Call “ + _id;

}

...

}

How It Works

Each call is assigned a unique id and a call duration. The id enables you to track the progress of a call through the system. The call duration determines how much time will be spent “answering” a call. Lastly, you record the time at which the call started. This will be used to determine how long each call has been waiting in the queue.

The only method in the call class is answer(). This method is used by a customer service agent to, you guessed it, answer the call:

public void answer() {

System.out.println(this + “ answered; waited “

+(System.currentTimeMillis() - _startTime)

+“ milliseconds”);

try { Thread.sleep(_duration);

}catch (InterruptedException e) {

//Ignore

}

}

88

Queues

Start by printing out the fact that the call was answered, along with the total time spent waiting in the queue. The method then goes to sleep for the duration specified when the call was constructed. In this way, the call is responsible for simulating the time taken to complete a call. Think of this as being like a customer who won’t hang up until they’re ready to do so.

Try It Out

Creating the CustomerService Agent Class

The next class is the CustomerServiceAgent — the consumer from Figure 4-1. This class is responsible for pulling calls off a queue and answering them:

package com.wrox.algorithms.queues;

public class CustomerServiceAgent implements Runnable {

// Don’t get hung on this just yet; it’s described in more detail further on public static final Call GO_HOME = new Call(-1, 0);

private final int _id; private final Queue _calls;

public CustomerServiceAgent(int id, Queue calls) { assert calls != null : “calls can’t be null”; _id = id;

_calls = calls;

}

public String toString() { return “Agent “ + _id;

}

...

}

Just like a call, an agent is also assigned a unique id. Again, this helps you identify which agent is doing what. Each agent also holds a reference to the queue from which to retrieve calls.

Notice that CustomerServiceAgent implements the Runnable interface. This enables each instance to be run in a separate thread, thereby enabling multiple agents to be run concurrently. Runnable specifies one method, run, that must be implemented; and this is where you’ll put the code that pulls calls from the queue and answers them:

public void run() {

System.out.println(this + “ clocked on”);

while (true) {

System.out.println(this + “ waiting”);

Call call = (Call) _calls.dequeue();

System.out.println(this + “ answering “ + call);

if (call == GO_HOME) { break;

}

call.answer();

89

Chapter 4

}

System.out.println(this + “ going home”);

}

How It Works

Each time a customer service agent is run, it prints a little message to say that it has started working. It then sits in a loop pulling calls from the queue and answering them. Each time a call is retrieved, a message is printed and the call is answered. Once the call has completed, the agent goes back to the queue for another.

You may have noticed there is no check to determine whether anything actually exists before calling dequeue(). You would be forgiven for thinking that because of this, it won’t be long before you encounter an EmptyQueueException; this is where the blocking queue comes in. Recall that a blocking queue, besides being thread-safe, waits — as opposed to throwing an exception — when the queue is empty.

The other odd thing about this method is the following piece of code:

if (call == GO_HOME) { break;

}

Without this check, an agent would continue looping forever, waiting for more calls to arrive. Imagine what would happen when the call center closes for the day and stops accepting calls. As just discussed, the blocking queue will wait, leaving our poor customer service agent sitting there all night with nothing to do!

This is actually a fairly common problem when dealing with work queues. Fortunately, there is a very common solution as well. The idea is to create a special value that is understood to mean “stop processing.” This example defined a constant, GO_HOME, right at the start of the class definition. Anytime this call appears on the queue, the customer service agent knows it’s time to finish for the day.

Try It Out

Creating the CallCenter Class

Now that you have your calls and customer service agents, you can finally create the call center. This class is responsible for managing — starting and stopping — the agents, and for placing calls on to a queue for the agents to process:

package com.wrox.algorithms.queues;

import com.wrox.algorithms.iteration.Iterator; import com.wrox.algorithms.lists.ArrayList; import com.wrox.algorithms.lists.List;

public class CallCenter {

private final Queue _calls = new BlockingQueue(new ListFifoQueue());

private final List _threads; private final int _numberOfAgents;

public CallCenter(int numberOfAgents) {

90

Queues

_threads = new ArrayList(numberOfAgents); _numberOfAgents = numberOfAgents;

}

...

}

Before you can process calls, you must open the call center — just like in the real world. For this, you have the aptly named method open():

public void open() {

assert _threads.isEmpty() : “Already open”;

System.out.println(“Call center opening”);

for (int i = 0; i < _numberOfAgents; ++i) { Thread thread =

new Thread(new CustomerServiceAgent(i, _calls));

thread.start(); _threads.add(thread);

}

System.out.println(“Call center open”);

}

Once a call center is open, it can begin accepting calls:

public void accept(Call call) {

assert !_threads.isEmpty() : “Not open”;

_calls.enqueue(call);

System.out.println(call + “ queued”);

}

Eventually, you need to close the call center and send all the customer service agents home:

public void close() {

assert !_threads.isEmpty() : “Already closed”;

System.out.println(“Call center closing”);

for (int i = 0; i < _numberOfAgents; ++i) { accept(CustomerServiceAgent.GO_HOME);

}

Iterator i = _threads.iterator();

for (i.first(); !i.isDone(); i.next()) { waitForTermination((Thread) i.current());

91

Chapter 4

}

_threads.clear();

System.out.println(“Call center closed”);

}

private void waitForTermination(Thread thread) { try {

thread.join();

}catch (InterruptedException e) {

//Ignore

}

}

How It Works

The first thing CallCenter does is create a queue — more specifically, an instance of a BlockingQueue. This enables us to happily run multiple customer service agents, each in its own thread, all accessing the same queue. Note that because you are starting multiple threads, it must also stop them all as well. For this reason, you maintain a list of currently running threads. Lastly, you store the number of agents you will be starting.

The open() method is responsible for starting as many agents as were specified at construction. Each CustomerServiceAgent is constructed with an id — here you’ve just used the value of the iteration variable — and the call queue. Once created, it is started in its own thread and added to the list.

Each call, when you get around to placing it on the queue, waits to be answered by the “next available operator,” which is not to say that your call isn’t important to us, just that you won’t be able to answer all the calls straightaway.

To send the agents home, the first thing you do is place a special call on the queue — one to tell all the customer service agents to finish for the day. For each agent you have running, place the special GO_HOME call onto the queue. Simply telling the agents to go home is not enough, however, as there may still be other calls waiting in the queue; you’re a friendly sort of call center and you don’t just hang up on your customers. After placing the GO_HOME call, you still need to wait for them to finish before turning off the lights and locking the doors.

The method waitForTermination() uses Thread.join() to effectively sleep until the thread finishes execution.

You’re almost done now. Only two classes to go.

Try It Out

Creating the CallGenerator Class

A call generator, as the name suggests, is responsible for the actual generation of phone calls:

package com.wrox.algorithms.queues;

public class CallGenerator {

private final CallCenter _callCenter; private final int _numberOfCalls;

92

Queues

private final int _maxCallDuration; private final int _maxCallInterval;

public CallGenerator(CallCenter callCenter, int numberOfCalls, int maxCallDuration, int maxCallInterval) {

assert callCenter != null : “callCenter can’t be null”; assert numberOfCalls > 0 : “numberOfCalls can’t be < 1”; assert maxCallDuration > 0 : “maxCallDuration can’t be < 1”; assert maxCallInterval > 0 : “maxCallInterval can’t be < 1”;

_callCenter = callCenter; _numberOfCalls = numberOfCalls; _maxCallDuration = maxCallDuration; _maxCallInterval = maxCallInterval;

}

...

}

Besides the constructor, there is only one other public method, which, as you might imagine, actually performs the call generation:

public void generateCalls() {

for (int i = 0; i < _numberOfCalls; ++i) { sleep();

_callCenter.accept(

new Call(i, (int) (Math.random() * _maxCallDuration)));

}

}

private void sleep() { try {

Thread.sleep((int) (Math.random() * _maxCallInterval)); } catch (InterruptedException e) {

// Ignore

}

}

How It Works

The method generateCalls() sits in a loop and generates as many calls as configured. Each call is generated with a random duration before being sent to the call center for processing. The method then waits for a random interval between calls — again, all specified at construction time.

Try It Out

Creating the CallCenterSimulator Class

The last class is the call center simulator itself. This is a small application that can be run from the command line. It ties together a call center and a call generator. Most of the real simulation is performed by the classes already discussed. The CallCenterSimulator class is concerned primarily with reading and parsing command-line arguments:

package com.wrox.algorithms.queues;

public final class CallCenterSimulator {

93