Добавил:
Upload Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:
Andrey Adamovich - Groovy 2 Cookbook - 2013.pdf
Скачиваний:
44
Добавлен:
19.03.2016
Размер:
26.28 Mб
Скачать

Chapter 10

Using actors to build message-based concurrency

The concept of actors as a strategy for running concurrent tasks has recently gained new popularity (thanks to Scala, Erlang, the Akka Framework, and other programming languages). Originally proposed by Carl Hewitt in 1973, actors offer a programming model that inherently guarantees concurrent code when compared with the traditional approach based on shared memory. Actors are similar to Object-Oriented objects (they follow the rule of encapsulation), but they can only communicate by sending immutable messages asynchronously to each other. The internal state of an actor is not exposed and can only be accessed from the outside by sending a message to the actor and receiving a reply.

Due to the asynchronous nature of the message passing pattern, an actor must be active in order to receive a message. One way to make an object active, is to allocate a system thread to it. Unfortunately, threads are a finite resource, and this is not good news for the scalable systems; actors often need to scale beyond the number of threads available on the system.

To overcome this scarcity of resources, actor implementation resorts to sharing threads among actors, leading to a system where unused actors do not consume any thread.

Traditionally, programming languages utilize system threads for concurrency. In this model, the execution of an algorithm is divided into concurrently running tasks. It is as if the algorithm is being executed multiple times; the difference being that each of these

copies operates on shared memory.

In an actor-based system, each message is processed by one thread and one thread only. This fundamental assumption makes the code implicitly thread-safe, removing the need for any other extra (synchronization or locking) effort.

In this recipe, we will demonstrate a simple actor-based implementation for concurrent calculation of word frequency in a large text.

355

www.it-ebooks.info

Concurrent Programming in Groovy

How to do it...

As for other recipes in this chapter, we are going to use the GPars framework to implement an actor-based system. This recipe will reuse the code and the example from the Splitting a large task into smaller parallel jobs recipe, but instead of using the "fork/join" framework, this time, we will use actors to spread the tasks.

1.The first step is to create the actual messages that actors will use to communicate between each other. Create a new file named ActorBasedFrequencyAnalyzer. groovy and add the following classes:

final class CalculateFrequencyMessage { List<String> tokens

}

final class StartFrequency {

}

final class FrequencyMapMessage { Map frequencyMap

}

2.Now, we add one of the two actors of this recipe. This actor is responsible for analyzing chunks of words. It's a "worker" actor.

final class WordFrequencyActor extends DynamicDispatchActor {

void onMessage(CalculateFrequencyMessage message) { reply new FrequencyMapMessage(

frequencyMap: calculateFrequency(message.tokens)

)

}

private Map calculateFrequency(List<String> words) {

//The code for this function can be found in the

//recipe Splitting a large task into smaller parallel jobs.

}

}

356

www.it-ebooks.info

Chapter 10

3.The second actor is responsible for coordinating the work of the workers. final class FrequencyMaster extends DynamicDispatchActor {

static final Integer THRESHOLD = 2000 Map totalFreqMap = [:]

List<String> tokensList = [] int numActors = 1

private final CountDownLatch startupLatch = new CountDownLatch(1)

private CountDownLatch doneLatch

private List createWorkers() { (1..numActors).collect {

new WordFrequencyActor().start()

}

}

private void beginFrequency() { def slaves = createWorkers() int cnt = 0

def partitioned = partition(tokensList, THRESHOLD) partitioned.each { sublist ->

slaves[cnt % numActors] <<

new CalculateFrequencyMessage(tokens: sublist) cntnt += 1

}

doneLatch = new CountDownLatch(partitioned.size())

}

Map waitUntilDone() { startupLatch.await() doneLatch.await() totalFreqMap

}

void onMessage(FrequencyMapMessage frequenceMapMessage) { if (frequenceMapMessage.frequencyMap) {

println ':::::: got a frequency map. ' + 'Content is ' + frequenceMapMessage.frequencyMap.size()

}

357

www.it-ebooks.info

Concurrent Programming in Groovy

totalFreqMap.putAll(frequenceMapMessage.frequencyMap)

doneLatch.countDown()

}

void onMessage(StartFrequency startFrequency) { beginFrequency()

startupLatch.countDown()

println ':::::: Start Frequency Operation'

}

}

4.The last bit of code that we require to run our example is a simple class responsible for instantiating the FrequencyMaster actor.

class ActorBasedWordAnalyzer {

Map frequency(List<String> tokens) { def master = new FrequencyMaster(

tokensList: tokens, numActors: 5

).start()

master << new StartFrequency() master.waitUntilDone()

}

}

5.Let's write a simple test case to test the actor infrastructure: package org.groovy.cookbook

import org.junit.*

import edu.stanford.nlp.process.PTBTokenizer

import edu.stanford.nlp.process.CoreLabelTokenFactory import edu.stanford.nlp.ling.CoreLabel

class ActorBasedFrequencyTest {

@Test

void testFrequency() {

def bigText = 'http://norvig.com/big.txt'.toURL() def analyzer = new ActorBasedWordAnalyzer() analyzer.frequency(tokenize(bigText.text)) res.each {

println "[ ${it.key} ${it.value} ]"

}

}

358

www.it-ebooks.info

Chapter 10

def tokenize(String txt) { List<String> words = []

PTBTokenizer ptbt = new PTBTokenizer( new StringReader(txt),

new CoreLabelTokenFactory(),

''

)

ptbt.each { entry -> words << entry.value()

}

words

}

}

How it works...

In step 1, three message types are defined. Messages are simple classes; they can even be

String or Integer types. The most important aspect of a message in an actor based system is immutability. Immutability guarantees that an actor never has to lock to read a state, greatly simplifying the overall architecture of the message-passing semantic.

In step 2, we introduce the first actor. GPars actors can be divided into two subtypes, stateful and stateless.

Stateless actors keep no memory of processed messages; they simply handle messages as they arrive. A stateless actor extends DynamicDispatchActor. Stateful actors, on the contrary, are more sophisticated, because they need to maintain implicit state between subsequent message arrivals. Stateful actors extend DefaultActor. Stateless actors are obviously more efficient and highly performant than stateful ones. In this recipe, we use stateless actors, as the problem we need to solve doesn't require a complex logic based on

continuations. A continuation is an abstract representation of a state; in the actor model, an actor processes the request in chunks, separated by idle periods during which it awaits a new message. Each message may determine a modification of the actor state. Stateful actors allow for encoding such state transitions directly in the structure of the message handling flow.

The WordFrequencyActor only handles one type of message, through the onMessage function. The message is expected to contain a sublist of tokens to analyze. The calculateFrequency function was already described in the Splitting a large task into smaller parallel jobs recipe from this chapter. Once the analysis is done, the actor replies with FrequencyMapMessage containing the frequency results for the batch that was passed to the worker actor.

359

www.it-ebooks.info

Concurrent Programming in Groovy

The FrequencyMaster actor has a slightly more complex logic. For a start, it handles two

types of messages, StartFrequency and FrequencyMapMessage. The StartFrequency message triggers the business logic required to split the list of tokens to analyze into smaller chunks. Each chunk is passed to one of the worker actors and processed asynchronously. The onMessage function that handles the FrequencyMapMessage is actually handling the reply from the worker actor and adding the result to the Map containing the result of the analysis.

Step 4 shows the class that has the responsibility for sending the StartFrequency message and initiating the process.

def master = new FrequencyMaster( tokensList: tokens,

numActors: 5 ).start()

master << new StartFrequency()

Messages can be sent to actors in three ways:

ff Using the send method

ff Using the << operator, as in the example

ff Using the implicit call method, such as master new StartFrequency

The FrequencyMaster actor is initialized with the list of tokens to analyze and the number of active actors, 5 in this example.

The actors collectively share a big thread pool, but any given actor at any given time has at most one active thread. In its thread, an actor takes messages from its incoming queue, processes them, and sends out responses. It never does anything aside from this message-processing loop.

Finally, the class calls the waitUntilDone method to retrieve the Map containing the result of the analysis. The function is based on two CountDownLatch classes that are initialized and used to keep track of the workers' tasks. CountDownLatch is often used to wait for several threads to complete, and this is how it has been used in the example. The waitUntilDone method blocks until all the actors have done processing.

See also

ff http://gpars.codehaus.org/Actor

ff http://en.wikipedia.org/wiki/Actor_model ff http://en.wikipedia.org/wiki/Continuation

ff http://gpars.org/0.12/groovydoc/groovyx/gpars/actor/ DynamicDispatchActor.html

ff http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ CountDownLatch.html

360

www.it-ebooks.info

Соседние файлы в предмете [НЕСОРТИРОВАННОЕ]