Добавил:
Upload Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:
CSharp_Prog_Guide.doc
Скачиваний:
16
Добавлен:
16.11.2019
Размер:
6.22 Mб
Скачать

Синхронизация потока-производителя и потока-потребителя

В следующем примере демонстрируется синхронизация потоков между основным потоком и двумя рабочими потоками при помощи ключевого слова lock и классов AutoResetEvent и ManualResetEvent.

В этом примере создаются два дополнительных (то есть рабочих) потока. Один поток производит элементы и сохраняет их в универсальной очереди, не являющейся потокобезопасной. Другой поток потребляет элементы из этой очереди. Кроме того, главный поток периодически отображает содержимое очереди, то есть к очереди получают доступ три потока. Ключевое слово lock используется для синхронизации доступа к потоку, чтобы избежать повреждения состояния очереди.

Помимо запрета одновременного доступа с помощью ключевого слова lock, два объекта событий обеспечивают дополнительную синхронизацию. Один из них используется для передачи рабочим потокам команды завершения работы, другой используется потоком-производителем для того, чтобы сообщать потоку-потребителю о добавлении в очередь нового элемента. Эти два объекта событий инкапсулированы в класс SyncEvents. Это позволяет событиям легко передавать объекты, представляющие поток-потребитель и поток-производитель. Класс SyncEvents определяется следующим образом.

-------

The AutoResetEvent class is used for the "new item" event because you want this event to reset automatically every time that the consumer thread responds to this event. Alternatively, the ManualResetEvent class is used for the "exit" event because you want multiple threads to respond when this event is signaled. If you used AutoResetEvent instead, the event would revert to a non-signaled state after just one thread responded to the event. The other thread would not respond, and in this case, would fail to terminate.

The SyncEvents class creates the two events, and stores them in two different forms: as EventWaitHandle, which is the base class for both AutoResetEvent and ManualResetEvent, and in an array based on WaitHandle. As you will see in the consumer thread discussion, this array is necessary so that the consumer thread can respond to either event.

The consumer and producer threads are represented by classes named Consumer and Producer. Both of these define a method called ThreadRun. These methods are used as the entry points for the worker threads that the Main method creates.

The ThreadRun method defined by the Producer class resembles this:

// Producer.ThreadRun

public void ThreadRun()

{

int count = 0;

Random r = new Random();

while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))

{

lock (((ICollection)_queue).SyncRoot)

{

while (_queue.Count < 20)

{

_queue.Enqueue(r.Next(0,100));

_syncEvents.NewItemEvent.Set();

count++;

}

}

}

Console.WriteLine("Producer thread: produced {0} items", count);

}

Класс AutoResetEvent используется для события "new item", поскольку нужно автоматические выполнять сброс этого события каждый раз, когда поток-потребитель отвечает на это событие. В другом случае, класс ManualResetEvent используется для события "exit", поскольку при создании этого события нужен ответ нескольких потоков. Если вместо этого использовался класс AutoResetEvent, это событие будет возвращаться в выключенное состояние сразу после ответа одного потока. Другие потоки не ответят, и, в данном случае, не смогут завершить работу.

Класс SyncEvents создает два события и хранит их в двух разных формах: как EventWaitHandle, (базовый класс классов AutoResetEvent и ManualResetEvent) и как массив на базе WaitHandle. Как мы увидим в обсуждении потока-потребителя, этот массив необходим для того, чтобы поток-потребитель мог ответить на оба события.

Поток-потребитель и поток-производитель представлены классами Consumer и Producer. Оба класса определяют метод ThreadRun. Эти методы используются в качестве точек входа для рабочих потоков, создаваемых методом Main.

Метод ThreadRun, определенный классом Producer, выглядит следующим образом:

--------

This method loops until the "exit thread" event becomes signaled. The state of this event is tested with the WaitOne method, by using the ExitThreadEvent property defined by the SyncEvents class. In this case, the state of the event is checked without blocking the current thread because the first argument used with WaitOne is zero that indicates that the method should return immediately. If WaitOne returns true, then the event in question is currently signaled. If so, the ThreadRun method returns, which has the effect of terminating the worker thread executing this method.

Until the "exit thread" event is signaled, the Producer.ThreadStart method tries to keep 20 items in the queue. An item is just an integer between zero and 100. The collection must be locked before you add new items to prevent the consumer and primary threads from accessing the collection at the same time. This is done by using the lock keyword. The argument passed to lock is the SyncRoot field exposed through the ICollection interface. This field is provided specifically for synchronizing thread access. Exclusive access to the collection is granted for any instructions that are contained in the code block following lock. For each new item that the producer adds to the queue, a call to the Set method on the "new item" event is made. This signals the consumer thread to emerge from its suspended state to process the new item.

The Consumer object also defines a method called ThreadRun. Like the producer's version of ThreadRun, this method is executed by a worker thread created by the Main method. However, the consumer version of ThreadStart must respond to two events. The Consumer.ThreadRun method resembles this:

// Consumer.ThreadRun

public void ThreadRun()

{

int count = 0;

while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)

{

lock (((ICollection)_queue).SyncRoot)

{

int item = _queue.Dequeue();

}

count++;

}

Console.WriteLine("Consumer Thread: consumed {0} items", count);

}

Этот метод зацикливается до возникновения события "exit thread". Состояние этого события проверяется с помощью метода WaitOne путем применения свойства ExitThreadEvent, определенного классом SyncEvents. В этом случае состояние события проверяется без блокировки текущего потока, поскольку первый аргумент, используемый с WaitOne, равен нулю, что означает немедленный возврат метода. Если WaitOne возвращает значение true, то событие возникло в настоящий момент. В этом случае метод ThreadRun возвращается, то есть рабочий процесс, в котором выполняется этот метод, завершается.

До возникновения события "exit thread" метод Producer.ThreadStart пытается сохранить 20 элементов в очереди. Элемент — это случайное целое число, лежащее в интервале от 0 до 100. Коллекцию нужно заблокировать перед добавлением новых элементов, чтобы запретить одновременный доступ к коллекции потока-потребителя и главного потока. Для этого служит ключевое слово lock. Аргумент, передаваемый для lock, является полем SyncRoot, открытым посредством интерфейса ICollection. Это поле предоставлено специально для синхронизации доступа потоков. Монопольный доступ к коллекции предоставляется для всех инструкций, содержащихся в блоке кода после ключевого слова lock. Для каждого нового элемента, добавляемого производителем в очередь, осуществляется вызов метода Set для события "new item". Таким образом, поток-потребитель получает команду выхода из приостановленного состояния для обработки нового элемента.

Объект Consumer также определяет метод ThreadRun. Как и версия ThreadRun в потоке-производителе, этот метод выполняется в рабочем потоке, созданном методом Main. Однако в потоке-потребителе ThreadStart должен отвечать на два события. Метод Consumer.ThreadRun выглядит следующим образом:

------

This method uses WaitAny to block the consumer thread until any of the wait handles in the provided array become signaled. In this case, there are two handles in the array, one that terminates the worker threads, and one that indicates that a new item has been added to the collection. WaitAny returns the index of the event that became signaled. The "new item" event is the first in the array, so that an index of zero indicates a new item. In this case, check for an index of 1, which indicates the "exit thread" event, and this is used to determine whether this method continues to consume items. If the "new item" event was signaled, you get exclusive access to the collection with lock and consume the new item. Because this example produces and consumes thousands of items, you do not display each item consumed. Instead use Main to periodically display the contents of the queue, as will be demonstrated.

The Main method starts by creating the queue whose contents will be produced and consumed and an instance of SyncEvents, which you looked at earlier:

Queue<int> queue = new Queue<int>();

SyncEvents syncEvents = new SyncEvents();

Next, Main configures the Producer and Consumer objects for use with worker threads. This step does not, however, create or start the actual worker threads:

Producer producer = new Producer(queue, syncEvents);

Consumer consumer = new Consumer(queue, syncEvents);

Thread producerThread = new Thread(producer.ThreadRun);

Thread consumerThread = new Thread(consumer.ThreadRun);

Notice that the queue and the synchronization event object are passed to both the Consumer and Producer threads as constructor arguments. This provides both objects that have the shared resources that they need to perform their respective tasks. Two new Thread objects are then created, by using the ThreadRun method for each object as an argument. Each worker thread, when it is started, will use this argument as the entry point for the thread.

Данный метод использует WaitAny для блокирования потока-потребителя до тех пор, пока не будет включен любой из дескрипторов ожидания в предоставленном массиве. В этом случае в массиве есть два дескриптора: один завершает рабочие потоки, другой указывает, что в коллекцию добавлен новый элемент. WaitAny возвращает индекс происшедшего события. Событие "new item" является первым в массиве, поэтому нулевой индекс означает новый элемент. В этом случае следует проверить наличие индекса 1, который обозначает событие "exit thread" (это позволяет определить, продолжает ли этот метод потреблять элементы). Если произошло событие "new item", включается монопольный доступ к коллекции с помощью ключевого слова lock и новый элемент потребляется. В этом примере производятся и потребляются тысячи элементов, поэтому каждый потребляемый элемент не отображается. Вместо этого используется метод Main для периодического отображения содержимого очереди, как будет показано дальше.

Метод Main сначала создает очередь, содержимое которой будет создаваться и потребляться, и экземпляр SyncEvents, который был рассмотрен ранее:

Queue<int> queue = new Queue<int>();

SyncEvents syncEvents = new SyncEvents();

Затем Main настраивает объекты Producer и Consumer для использования с рабочими потоками. Однако на этом этапе еще не создаются и не запускаются рабочие потоки:

----

Обратите внимание, что очередь и объект события синхронизации передаются потокам Consumer и Producer как аргументы конструктора. При этом оба объекта получают необходимые общие ресурсы для выполнения своих задач. Затем создаются два новых объекта Thread с помощью метода ThreadRun в качестве аргумента для каждого объекта. Каждый рабочий поток при запуске использует этот аргумент в качестве входной точки для потока.

Next Main launches the two worker threads with a call to the Start method, such as this:

producerThread.Start();

consumerThread.Start();

At this point, the two new worker threads are created and begin asynchronous execution, independent of the primary thread that is currently executing the Main method. In fact, the next thing Main does is suspend the primary thread with a call to the Sleep method. The method suspends the currently executing thread for a given number of milliseconds. Once this interval elapses, Main is reactivated, at which point it displays the contents of the queue. Main repeats this for four iterations, such as this:

for (int i=0; i<4; i++)

{

Thread.Sleep(2500);

ShowQueueContents(queue);

}

Finally, Main signals the worker threads to terminate by invoking the Set method of the "exit thread" event, and then calls the Join method on each worker thread to block the primary thread until each worker thread respond to the event and terminates.

There is one final example of thread synchronization: the ShowQueueContents method. This method, like the consumer and producer threads, uses lock to gain exclusive access to the queue. In this case, however, exclusive access is very important, because ShowQueueContents enumerates over all of the collection. To enumerate over a collection is an operation that is especially prone to data corruption by asynchronous operations because it involves traversing the contents of the entire collection.

Notice that ShowQueueContents, because it is called by Main, is executed by the primary thread. This means that this method, when it achieves exclusive access to the item queue, blocks both the producer and consumer threads from access to the queue. ShowQueueContents locks the queue and enumerates the contents:

private static void ShowQueueContents(Queue<int> q)

{

lock (((ICollection)q).SyncRoot)

{

foreach (int item in q)

{

Console.Write("{0} ", item);

}

}

Console.WriteLine();

}

Затем Main запускает два рабочих потока с вызовом метода Start:

producerThread.Start();

consumerThread.Start();

На этом этапе создаются два новых рабочих потока, и начинается их асинхронное выполнение независимо от основного потока, который в данное время выполняет метод Main. Затем Main приостанавливает основной поток с вызовом метода Sleep. Метод приостанавливает текущий выполняемый поток на заданное количество миллисекунд. После истечения интервала метод Main заново активируется и отображает содержимое очереди. Метод Main повторяет эти действия для четырех итераций:

for (int i=0; i<4; i++)

{

Thread.Sleep(2500);

ShowQueueContents(queue);

}

И, наконец, Main передает рабочим потокам команду на завершения путем вызова метода Set события "exit thread", а затем вызывает для каждого рабочего потока метод Join для блокирования основного потока до тех пор, пока каждый рабочий поток не ответит на это событие и не завершит работу.

Вот еще один пример синхронизации потоков: метод ShowQueueContents. Этот метод, как потоки-производители и потоки-потребители, использует lock для получения монопольного доступ к очереди. В этом случае монопольный доступ весьма важен, поскольку ShowQueueContents перечисляется всю коллекции. Операция перечисления коллекции подвержена повреждению данных асинхронными операциями, поскольку при этой операции осуществляется обход содержимого всей коллекции.

Обратите внимание, что метод ShowQueueContents выполняется главным потоком, поскольку его вызывает метод Main. Это означает, что данный метод при получении монопольного доступа к очереди элементов блокирует доступ потока-производителя и потока-потребителя к очереди. Метод ShowQueueContents блокирует очередь и перечисляет содержимое.

------

The complete example follows.

Example

using System;

using System.Threading;

using System.Collections;

using System.Collections.Generic;

public class SyncEvents

{

public SyncEvents()

{

_newItemEvent = new AutoResetEvent(false);

_exitThreadEvent = new ManualResetEvent(false);

_eventArray = new WaitHandle[2];

_eventArray[0] = _newItemEvent;

_eventArray[1] = _exitThreadEvent;

}

public EventWaitHandle ExitThreadEvent

{

get { return _exitThreadEvent; }

}

public EventWaitHandle NewItemEvent

{

get { return _newItemEvent; }

}

public WaitHandle[] EventArray

{

get { return _eventArray; }

}

private EventWaitHandle _newItemEvent;

private EventWaitHandle _exitThreadEvent;

private WaitHandle[] _eventArray;

}

Полный пример кода выглядит следующим образом.

Пример

-----

public class Producer

{

public Producer(Queue<int> q, SyncEvents e)

{

_queue = q;

_syncEvents = e;

}

// Producer.ThreadRun

public void ThreadRun()

{

int count = 0;

Random r = new Random();

while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))

{

lock (((ICollection)_queue).SyncRoot)

{

while (_queue.Count < 20)

{

_queue.Enqueue(r.Next(0,100));

_syncEvents.NewItemEvent.Set();

count++;

}

}

}

Console.WriteLine("Producer thread: produced {0} items", count);

}

private Queue<int> _queue;

private SyncEvents _syncEvents;

}

-------

public class Consumer

{

public Consumer(Queue<int> q, SyncEvents e)

{

_queue = q;

_syncEvents = e;

}

// Consumer.ThreadRun

public void ThreadRun()

{

int count = 0;

while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)

{

lock (((ICollection)_queue).SyncRoot)

{

int item = _queue.Dequeue();

}

count++;

}

Console.WriteLine("Consumer Thread: consumed {0} items", count);

}

private Queue<int> _queue;

private SyncEvents _syncEvents;

}

public class ThreadSyncSample

{

private static void ShowQueueContents(Queue<int> q)

{

lock (((ICollection)q).SyncRoot)

{

foreach (int item in q)

{

Console.Write("{0} ", item);

}

}

Console.WriteLine();

}

------

static void Main()

{

Queue<int> queue = new Queue<int>();

SyncEvents syncEvents = new SyncEvents();

Console.WriteLine("Configuring worker threads...");

Producer producer = new Producer(queue, syncEvents);

Consumer consumer = new Consumer(queue, syncEvents);

Thread producerThread = new Thread(producer.ThreadRun);

Thread consumerThread = new Thread(consumer.ThreadRun);

Console.WriteLine("Launching producer and consumer threads...");

producerThread.Start();

consumerThread.Start();

for (int i=0; i<4; i++)

{

Thread.Sleep(2500);

ShowQueueContents(queue);

}

Console.WriteLine("Signaling threads to terminate...");

syncEvents.ExitThreadEvent.Set();

producerThread.Join();

consumerThread.Join();

}

}

Configuring worker threads...

Launching producer and consumer threads...

22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87

79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59

0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21

38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92

Signalling threads to terminate...

Consumer Thread: consumed 1053771 items

Producer thread: produced 1053791 items

-----

How to: Use a Thread Pool

A thread pool is a collection of threads that can be used to perform several tasks in the background. (See Using Threading for background information.) This leaves the primary thread free to perform other tasks asynchronously.

Thread pools are often employed in server applications. Each incoming request is assigned to a thread from the thread pool, so that the request can be processed asynchronously, without tying up the primary thread or delaying the processing of subsequent requests.

Once a thread in the pool completes its task, it is returned to a queue of waiting threads, where it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.

Thread pools typically have a maximum number of threads. If all the threads are busy, additional tasks are put in queue until they can be serviced as threads become available.

You can implement your own thread pool, but it is easier to use the thread pool provided by the .NET Framework through the ThreadPool class.

The following example uses the .NET Framework thread pool to calculate the Fibonacci result for ten numbers between 20 and 40. Each Fibonacci result is represented by the Fibonacci class, which provides a method named ThreadPoolCallback that performs the calculation. An object that represents each Fibonacci value is created, and the ThreadPoolCallback method is passed to QueueUserWorkItem, which assigns an available thread in the pool to execute the method.

Because each Fibonacci object is given a semi-random value to compute, and because each thread will be competing for processor time, you cannot know in advance how long it will take for all ten results to be calculated. That is why each Fibonacci object is passed an instance of the ManualResetEvent class during construction. Each object signals the provided event object when its calculation is complete, which allows the primary thread to block execution with WaitAll until all ten Fibonacci objects have calculated a result. The Main method then displays each Fibonacci result.