Добавил:
Upload Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:
Andrey Adamovich - Groovy 2 Cookbook - 2013.pdf
Скачиваний:
44
Добавлен:
19.03.2016
Размер:
26.28 Mб
Скачать

Concurrent Programming in Groovy

The class entry point is the download method, which takes a Map and the number of parallel downloads to run. The actual parallel downloading process is carried out by the private function parallelDownload that accepts a Map containing the URL from where to download a file as a key, and the destination file as a value. The method uses the eachParallel method to concurrently execute the download operation on each entry of Map.

One interesting feature of this class is the use of the MapPartition category. Categories in Groovy are a very elegant way to add a method to a class not under your control. The MapPartition category allows us to "split" a Map into smaller maps in order to enable the "concurrency" feature of the FileDownloader class.

See also

ff http://gpars.org/guide/guide/single.html#dataParallelism_ parallelCollections

ff http://gpars.org/1.0.0/groovydoc/groovyx/gpars/GParsPool.html

Splitting a large task into smaller parallel jobs

CPUs are not getting any faster, so manufacturers are adding more cores to the processors. That means that single-threaded applications are not able to leverage the "parallelization" offered by a multi-core processor. But how to put those cores to work?

The concept of parallelization is based on the assumption that often large problems can be divided into smaller ones, which are solved "in parallel". The smaller task execution can be spread through several cores to complete the main task faster.

Concurrent programming is not easy, mostly because of synchronization issues and the pitfalls of shared data. Historically Java has offered excellent support for multi-threaded programming, partially shielding the developer from the complexity of writing code that runs many tasks in parallel.

One of the most useful algorithms to successfully leverage multiple cores is "fork/join". The "fork/join" algorithms essentially divide a problem into many smaller subproblems and apply the same algorithm to each of the subproblems recursively. Once the subproblem becomes small enough it is resolved directly.

Hierarchical problems such as sort algorithms or file system/tree navigation greatly benefit from "fork/join" (also known as divide and conquer).

This recipe will show how to use the "fork/join" implementation of GPars to calculate the frequency of words in a large text.

344

www.it-ebooks.info

Chapter 10

How to do it...

Let's add a class that contains the logic to execute the frequency counting algorithm using "fork/join".

1.Create a new class in the src/main/groovy/org/groovy/cookbook directory created in the Processing collections concurrently recipe.

package org.groovy.cookbook

import static groovyx.gpars.GParsPool.runForkJoin import static groovyx.gpars.GParsPool.withPool import static com.google.common.collect.Lists.*

class WordAnalyzer {

static final Integer THRESHOLD = 50000 static final int MAX_THREAD = 8

private Map calculateFrequency(List<String> words) { def frequencies = [:]

words.each {

Integer num = frequencies.get(it) frequencies.put(it, num ? num + 1 : 1)

}

frequencies

}

Map frequency(List<String> tokens) { def frequencyMap = [:]

def maps

withPool(MAX_THREAD) {

maps = runForkJoin(tokens) { words -> if (words.size() <= THRESHOLD) {

// No parallelism

return calculateFrequency(words)

}else {

partition(words, THRESHOLD).each { sublist -> forkOffChild(sublist)

}

// Collect all results. return childrenResults

}

}

345

www.it-ebooks.info

Concurrent Programming in Groovy

}

maps.each { frequencyMap.putAll(it)

}

// Reverse sort.

frequencyMap.sort { a,b -> b.value <=> a.value }

}

}

2.In order to test the previous code, we can use the following unit test, placed in the src/main/groovy/org/groovy/cookbook folder:

package org.groovy.cookbook import org.junit.*

import edu.stanford.nlp.process.PTBTokenizer

import edu.stanford.nlp.process.CoreLabelTokenFactory import edu.stanford.nlp.ling.CoreLabel

class WordAnalyzerTest {

@Test

void testFrequency() {

def bigText = 'http://norvig.com/big.txt'.toURL() def wa = new WordAnalyzer3()

def tokens = tokenize(bigText.text) long start = System.currentTimeMillis() def m = wa.frequency(tokens)

def timeSpent = (System.currentTimeMillis() - start) println "Execution time: ${timeSpent}ms"

println 'For calculating frequency over: ' println "${tokens.size()} tokens"

m.sort{ -it.value }.each { if (it.value > 50) {

println it

}

}

}

def tokenize(String txt) { List<String> words = []

PTBTokenizer ptbt = new PTBTokenizer( new StringReader(txt),

new CoreLabelTokenFactory(),

346

www.it-ebooks.info

Chapter 10

''

)

ptbt.each { entry -> words << entry.value()

}

words

}

}

3.The output of the unit test is as follows:

Execution time: 1007ms

For calculating frequency over: 1297801 tokens

the=1799

,=1720

of=1301

.=967

and=907

to=745

that=505

a=456

in=443

is=436

it=266

not=234

as=231

or=204

...

How it works...

The code in step 1 requires a bit of explanation. The core function is calculateFrequency. The function creates a Map containing the frequency of the words found in the List passed as an argument.

Once the Map is constructed from the frequency calculation, it looks as follows:

John | 10

Michael | 8

Jeff | 4

347

www.it-ebooks.info

Concurrent Programming in Groovy

The first column contains the words found in the analyzed document and the right column contains the number of times the word is found.

The algorithm is not very sophisticated and can probably be improved, but this is not the focus of the recipe.

The frequency function contains the call to the "fork/join" API. The code follows a simple pattern. If the job is small enough, then execute it directly. Otherwise, split the job and apply the calculateFrequency algorithm to the smaller chunks (in this case, sublists of words).

The smaller jobs are executed by concurrent threads. The number of these threads can be tuned by modifying the value passed to the withPool method.

Things get interesting in the else branch of the condition that determines if the job has to be divided into chunks.

The partition function is statically imported from the com.google.common.collect. Lists class, which belongs to the amazing Guava library. It slices up a list and returns consecutive sublists of the list, each of the same size (the last sublist may be smaller).

For each sublist, the forkOffChild method is invoked. This method returns immediately and schedules a task for execution (the task in this case is the calculateFrequency function).

When all the tasks are eventually executed, we can collect and return them by calling childrenResults, which contains a list of whatever is returned by runForkJoin. The runForkJoin factory method will execute the provided recursive code along with the supplied values and build a hierarchical "fork/join" calculation. The number of values passed to the runForkJoin method has to match the number of the closure's expected parameters, as well as the number of arguments passed into the forkOffChild method.

The last segment of the frequency function iterates on the Maps returned by the parallel computation and adds them to a master Map, which contains the full list of words, ordered by frequency.

The test executes the following steps:

1.Download a rather large chunk of text (over 1 million tokens).

2.Split it into tokens using the PTBTokenizer from the NLP library (better than using split(' ')) developed by the Stanford University.

3.Call the frequency method and compute the time it took to run the frequency operation;

4.Sort the map by frequency and print out the frequency values for words appearing more than 50 times.

348

www.it-ebooks.info

Соседние файлы в предмете [НЕСОРТИРОВАННОЕ]