Добавил:
Upload Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:
Базовые технологии платформы .NET.docx
Скачиваний:
13
Добавлен:
03.11.2018
Размер:
614.46 Кб
Скачать

28. Синхронизация потоков

При использовании многопоточности естественным образом возникает вопрос об управлении совместным доступом к данным и синхронизации потоков.

Если метод запускается в нескольких потоках, только локальные переменные метода будут уникальными для потока. Поля объектов по умолчанию разделяются между всеми потоками. В пространстве имён System определён атрибут [ThreadStatic], применяемый к статическим полям. Если поле помечено таким атрибутом, то каждый поток будет содержать свой экземпляр поля. Для [ThreadStatic]-полей не рекомендуется делать инициализацию при объявлении, так как код инициализации выполнится только в одном потоке.

public class SomeClass

{

public static int SharedField = 25;

[ThreadStatic]

public static int NonSharedField;

}

Для неразделяемых статических полей класса можно использовать тип ThreadLocal<T>. Перегруженный конструктор ThreadLocal<T> принимает функцию инициализации поля. Значение поля хранится в свойстве Value.

using System;

using System.Threading;

public class Slot

{

private static Random rnd = new Random();

private static int Shared = 25;

private static ThreadLocal<int> NonShared =

new ThreadLocal<int>(() => rnd.Next(1, 20));

public static void PrintData()

{

Console.WriteLine("Thread: {0}\t Shared: {1}\t NonShared: {2}",

Thread.CurrentThread.Name,

Shared, NonShared.Value);

}

}

public class MainClass

{

public static void Main()

{

// для тестирования запускаем три потока

new Thread(Slot.PrintData) {Name = "First"}.Start();

new Thread(Slot.PrintData) {Name = "Second"}.Start();

new Thread(Slot.PrintData) {Name = "Third"}.Start();

Console.ReadLine();

}

}

Отметим, что класс Thread имеет статические методы AllocateDataSlot(), AllocateNamedDataSlot(), GetNamedDataSlot(), FreeNamedDataSlot(), GetData(), SetData(), которые предназначены для работы с локальными хранилищами данных потока. Эти локальные хранилища могут рассматриваться как альтернатива неразделяемым статическим полям.

Синхронизация потоков – это координирование действий потоков для получения предсказуемого результата. Средства синхронизации потоков можно разделить на четыре категории:

  • простые методы приостановки выполнения потока (Suspend(), Resume(), Sleep(), Yield(), Join());

  • блокирующие конструкции;

  • конструкции подачи сигналов;

  • незадерживающие средства синхронизации.

Первые три категории используют для синхронизации приостановку потока. Приостановленный поток практически не потребляет времени процессора, однако его выполнение легко возобновляется системой при наступлении условия «пробуждения».

Блокирующие конструкции обеспечивают исключительный доступ к ресурсу (например, к полю или фрагменту кода), гарантируя, что в каждый момент времени с ресурсом работает только один поток. Блокировка позволяет потокам работать с общими данными, не мешая друг другу. Для организации блокировок платформа .NET предоставляет такие классы, как Monitor, Mutex, Semaphor, SemaphorSlim, а язык C# ‑ специальный оператор lock.

Рассмотрим следующий класс:

public class ThreadUnsafe

{

private static int x, y;

public void Go()

{

if (y != 0) Console.WriteLine(x / y);

y = 0;

}

}

Этот класс небезопасен с точки зрения выполнения потоков. Если вызвать метод Go() в разных потоках синхронно, может возникнуть ошибка деления на ноль, если поле y будет установлено в ноль в одном потоке как раз между проверкой условия и вызовом Console.WriteLine() в другом потоке. Чтобы сделать код потокобезопасным, необходимо гарантировать выполнение двух операторов, составляющих тело метода Go(), только одним потоком одновременно.

Подобные блоки кода называются критическими секциями. Оператор lock языка C# позволяет задать критическую секцию. Синтаксис оператора:

lock(<объект синхронизации>) { <операторы критической секции> }

Здесь <объект синхронизации> ‑ это любой объект, который будет идентификатором критической секции. Часто в качестве объекта синхронизации записывают поле или переменную, на которую накладывается блокировка.

Изменим метод Go(), чтобы сделать его потокобезопасным:

public class ThreadSafe

{

private static object locker = new object();

private static int x, y;

public void Go()

{

lock (locker)

{

if (y != 0) Console.WriteLine(x / y);

y = 0;

}

}

}

Рассмотрим ещё один пример, в котором необходимо использовать критическую секцию. Пусть имеется приложение с целочисленным массивом и методами обработки массива:

public class MyApp

{

// в buffer хранятся данные, с которыми работают потоки

private static int[] buffer;

private static Thread writer;

public static void Main()

{

// инициализируем buffer

buffer = Enumerable.Range(1, 100).ToArray();

// запустим поток для перезаписи данных

writer = new Thread(WriterFunc);

writer.Start();

// запустим 10 потоков для чтения данных

for (int i = 0; i < 10; i++)

{

new Thread(ReaderFunc).Start();

}

}

private static void ReaderFunc()

{

while (writer.IsAlive)

{

// считаем сумму элементов из buffer

int sum = buffer.Sum();

// если сумма неправильная, сигнализируем

if (sum != 5050)

{

Console.WriteLine("Error in sum!");

return;

}

}

}

private static void WriterFunc()

{

var rnd = new Random();

// "перетасовываем" данные 10 секунд

DateTime start = DateTime.Now;

while ((DateTime.Now - start).Seconds < 10)

{

int k = rnd.Next(100);

int tmp = buffer[0];

buffer[0] = buffer[k];

buffer[k] = tmp;

}

}

}

При работе приложения периодически возникают сообщения "Error in sum!". Причина в том, что метод WriterFunc() может изменить данные в массиве buffer во время подсчёта суммы в методе ReaderFunc(). Решение проблемы: объявим критическую секцию, содержащую код, работающий с массивом buffer. Так как работа с buffer происходит в двух функция, используем при объявлении секций один и тот же идентификатор.

private static void ReaderFunc()

{

while (writer.IsAlive)

{

int sum;

lock (buffer) // первая часть критической секции

{

sum = buffer.Sum();

}

if (sum != 5050)

{

Console.WriteLine("Error in sum!");

return;

}

}

}

private static void WriterFunc()

{

var rnd = new Random();

DateTime start = DateTime.Now;

while ((DateTime.Now - start).Seconds < 10)

{

int k = rnd.Next(100);

lock (buffer) // вторая часть критической секции

{

int tmp = buffer[0];

buffer[0] = buffer[k];

buffer[k] = tmp;

}

}

}

Оператор lock ‑ всего лишь скрытый способ работы со статическим классом Monitor. Объявление критической секции эквивалентно следующему коду:

Monitor.Enter(buffer);

try

{

// операторы критической секции

}

finally

{

Monitor.Exit(buffer);

}

Метод Monitor.Enter() определяет вход в критическую секцию, метод Monitor.Exit() – выход из секции. Аргументами методов является объект-идентификатор критической секции.

Кроме Enter() и Exit(), класс Monitor обладает ещё несколькими полезными методами. Например, метод Wait() применяется внутри критической секции и снимает с неё блокировку (при этом можно задать период времени, на который снимается блокировка). При вызове Wait() текущий поток останавливается, пока не будет вызван (из другого потока) метод Monitor.Pulse().

Иногда ресурс нужно блокировать так, чтобы читать его могли несколько потоков, а записывать ‑ только один. Для этих целей предназначен класс ReaderWriterLockSlim. Его экземплярные методы EnterReadLock() и ExitReadLock() задают секцию чтения ресурса, а методы EnterWriteLock() и ExitWriteLock() ‑ секцию записи ресурса.

using System.Collections.Generic;

using System.Threading;

public class SynchronizedCache

{

private Dictionary<int, string> cache = new Dictionary<int, string>();

private ReaderWriterLockSlim cacheLock = new ReaderWriterLockSlim();

public string Read(int key)

{

cacheLock.EnterReadLock(); // секция чтения началась

try

{

return cache[key]; // читать могут несколько потоков

}

finally

{

cacheLock.ExitReadLock(); // секция чтения закончилась

}

}

public void Add(int key, string value)

{

cacheLock.EnterWriteLock(); // секция записи началась

try

{

cache.Add(key, value);

}

finally

{

cacheLock.ExitWriteLock(); // секция запись закончилась

}

}

public bool AddWithTimeout(int key, string value, int timeout)

{

if (cacheLock.TryEnterWriteLock(timeout)) // таймаут входа

{

try

{

cache.Add(key, value);

}

finally

{

cacheLock.ExitWriteLock();

}

return true;

}

return false;

}

}

Потребность в синхронизации на основе подачи сигналов возникает, когда один поток ждёт прихода уведомления от другого потока. Для осуществления данной синхронизации используется базовый класс EventWaitHandle, его наследники AutoResetEvent и ManualResetEvent, а также класс ManualResetEventSlim. Имея доступ к объекту EventWaitHandle, поток может вызвать его метод WaitOne(), чтобы остановиться и ждать сигнала. Для отправки сигнала применяется вызов метода Set(). Если используются ManualResetEvent и ManualResetEventSlim, все ожидающие потоки освобождаются и продолжают выполнение. При использовании AutoResetEvent ожидающие потоки освобождаются и запускаются последовательно, на манер очереди1.

В качестве примера использования сигналов опишем шаблон проектирования «поставщик-потребитель». Данный шаблон представляет собой очередь, в которую независимые потоки (поставщики) помещают объекты, а один поток извлекает объекты и выполняет с ними заданное действие.

using System;

using System.Collections.Generic;

using System.Threading;

public class ActionQueue<T> : IDisposable where T : class

{

private readonly Action<T> _action;

private readonly Queue<T> _queue;

private readonly Thread _thread;

private readonly EventWaitHandle _waitHandle;

public ActionQueue(Action<T> action)

{

if (action == null)

{

throw new ArgumentNullException("action");

}

_action = action;

_queue = new Queue<T>();

_thread = new Thread(MainLoop) {IsBackground = true};

_waitHandle = new AutoResetEvent(false);

_thread.Start();

}

public void Dispose()

{

_waitHandle.Close();

}

public void EnqueueElement(T element)

{

lock (_queue)

{

_queue.Enqueue(element);

}

_waitHandle.Set();

}

public void Stop()

{

EnqueueElement(null);

_thread.Join();

_waitHandle.Reset();

}

private void MainLoop()

{

while (true)

{

T element = null;

lock (_queue)

{

if (_queue.Count > 0)

{

element = _queue.Dequeue();

if (element == null)

{

return;

}

}

}

if (element != null)

{

_action(element);

}

else

{

_waitHandle.WaitOne();

}

}

}

}

Чтобы организовать незадерживающую синхронизацию, используется статический класс System.Threading.Interlocked. Класс Interlocked имеет методы для инкремента, декремента и сложения аргументов типа int или long, а также методы присваивания значений числовым и ссылочным переменным. Каждый метод выполняется как атомарная операция.

int x = 10, y = 20;

Interlocked.Add(ref x, y); // x = x + y

Interlocked.Increment(ref x); // x++

Interlocked.Exchange(ref x, y); // x = y

Interlocked.CompareExchange(ref x, 50, y); // if (x == y) x = 50