Добавил:
Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:

Рихтер Дж., Назар К. - Windows via C C++. Программирование на языке Visual C++ - 2009

.pdf
Скачиваний:
6266
Добавлен:
13.08.2013
Размер:
31.38 Mб
Скачать

268 Часть II. Приступаем к работе

~CQueue();

BOOL IsFull();

BOOL IsEmpty(int nThreadNum); void AddElement(ELEMENT e);

BOOL GetNewElement(int nThreadNum, ELEMENT& e);

}

Открытая структура ELEMENT из этого класса определяет вид элементов очереди, а их содержимое не представляет особой важности. В этом примере клиентские потоки записывают в элементы очереди свой номер, чтобы серверные потоки, обработав их, смогли отобразить эту информацию в своих списках. Реальным приложениям эта информация, как правило, не нужна. Другая структура, INNER_ELEMENT, является оболочкой для ELEMENT. Эта оболочка отслеживает порядок элементов с помощью поля m_nStamp, значение которого увеличивается при вставке каждого элемента. К закрытым членам структуры относится поле m_pElements, ссылающееся на массив структур INNER_ELEMENT фиксированного размера. Эти данные необходимо защитить от одновременного обращения клиентских и серверных потоков. Размер массива определяется значением, которое записывается в поле m_nMaxElements при создании объекта CQueue. Следующее поле, m_nCurrentStamp, содержит целочисленное значение, которое увеличивается каждый раз при добавлении элемента в очередь. Закрытая функция GetFreeSlot возвращает индекс первой структуры INNER_ELEMENT, хранящейся в поле m_pElements с m_nStamp = 0 (это означает, этот элемент уже прочитан либо пуст). Если подходящий элемент не найден, функция возвращает -1.

int CQueue::GetFreeSlot() {

// поиск первого элемента с индексом 0

for (int current = 0; current < m_nMaxElements; current++) { if (n_pElements[current].m_nStamp == 0)

return(current);

}

// очередь заполнена return(-1);

}

Закрытая вспомогательная функция GetNextSlot возвращает индекс ft4NER_ELEMENT с наименьшим номером, отличным от нуля (то есть не пустого и не прочитанного еще элемента, добавленного первым) в массиве m_pElements of the INNER_ELEMENT. Если все элементы очереди были прочитаны (и получили отметку 0), возвращается -1.

Глава 8. Синхронизация потоков в пользовательском режиме.docx 269

int CQueue::GetNextSlot(int nThreadNum) {

//по умолчанию данному потоку не разрешается добавлять элементы

//в очередь

int firstSlot = -1;

//Индекс элемента не может быть больше, чем номер элемента,

//добавленного последним

int firstStamp = m_nCurrentStamp+1;

//Поиск непрочитанных элементов с четными (для обработки потоком 0)

//либо нечетными (для обработки потоком 1) номерами

for (int current = 0; current < m_nMaxElements; current++) {

//Следующий код нужен для отслеживания элементов, добавленными

//первыми

//(элементов с наименьшим номером), для реализации очереди типа

//«первым вошел, первым вышел»

if ((m_pElements[current].m_nStamp l= 0) && // прочитанный

// элемент

((m_pElements[current].m_element.m_nRequestNum % 2) == nThreadNum) && (m_pElements[current].m_nStamp < firstStamp)) {

firstStamp = m_pElements[current].m_nStamp; firstSlot = current;

}

}

return(firstSlot);

}

Думаю, вы и сами без труда разберетесь в устройстве конструктора, деструктора, а также методов IsFull и IsEmpty объекта CQueue. Лучше поговорим о функции AddElement, которую вызывает клиентские потоки для добавления запросов в очередь:

void CQueue::AddElement(ELEMENT e) {

//если очередь заполнена, ничего не предпринимаем int nFreeSlot = GetFreeSlot();

if (nFreeSlot == -1) return;

//копирование содержимое элемента

n_pElements[nFreeSlot].m_element = e;

270 Часть II. Приступаем к работе

// Обновить номер элемента m_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp;

}

Если в m_pElements есть место, в него записывается переданная как параметр структура ELEMENT, а значение в поле номера увеличивается и содержит текущее число элементов в очереди. Пытаясь обработать запрос, серверный поток вызывает функцию GetNewElement, передавая ей свой номер (0 или 1), а также структуру ELEMENT, в которую следует записать данные нового запроса:

B00L CQueue::GetNewElement(int nThreadNum, ELEMENT& e) { int nNewSlot = GetNextSlot(nThreadNum);

if (nNewSlot == -1) return(FALSE);

// Копируем содержимого элемента

e = m_pElements[nNewSlot].m_element;

// Помечаем элемент как прочитанный m_pElements[nNewSlot].m_nStamp = 0;

return(TRUE);

}

Вспомогательная функция GetNextSlot выполняет основную работу по поиску первого элемента, подходящего для обработки данным серверным потоком. Если такой элемент есть в очереди, GetNewElement копирует его данные в предоставленную вызывающим потоком структуру и записывает в поле m_nStamp значение

0.

Здесь нет ничего сложного, поэтому вы можете заподозрить CQueue в неезопасном поведении в многопоточном окружении. Да, ваши подозрения справедливы. В следующей главе я расскажу о том, как с помощью синхронизирующих объектов ядра создать версию объекта CQueue, безопасную в многопоточной среде. В приложении 08-Queue.exe за синхронизацию доступа к глобальному экземпляру очереди отвечают сами клиентские и серверные потоки:

CQueue g_q(10);

// Общая очередь

В приложении 08-Queue.exe используются три глобальные переменные, управляющие совместной работой клиентских (пишущих) и серверных (читающих) потоков во избежание повреждения очереди:

SRWLOCK

g_srwLock;

//

SRWLock-блокировка, защищающая очередь

C0NDITI0N_VARIABLE

g_cvReadyToConsume;

//

Условная переменная,

Глава 8. Синхронизация потоков в пользовательском режиме.docx 271

// устанавливается читающими потоками

CONDITION_VARIABLE g_cvReadyToProduce; // Условная переменная,

// устанавливается читающими потоками

При каждой попытке обратиться к очереди поток вынужден получать SRWLock для совместного (в случае серверных, т.е. читающих потоков) либо монопольного доступа (в случае клиентских, т.е. записывающих потоков).

Реализация клиентских (записывающих) потоков

Рассмотрим реализацию клиентского потока:

DWORD WINAPI WriterThread(PVOID pvParam) {

int nThreadNum = PtrToUlong(pvParam);

HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENTS);

for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {

CQueue::ELEMENT e = { nThreadNum, nRequestNum };

//Запрашиваем доступ для записи

AcquireSRWLockExclusive(&g_srwLock);

//Если очередь заполнена, «усыпляем» поток, пока не изменится

//значение условной переменной.

//Примечание. В ожидании блокировки пользователь

//может щелкнуть кнопку Stop

if (g_q.IsFull() & !g_fShutdown) { // Очередь заполнена

AddText(hWndW, TEXT("[Xd] Queue is full: impossible to add Xd"), nThreadNum, nRequestNum);

//—> Необходимо дождаться, пока читающий поток не очистит

//место в очереди, чтобы получить блокировку.

SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, INFINITE, 0);

}

//Другие записывающие потоки могут ожидать получения блокировки.

//--> Освобождаем блокировку и уведомляем записывающие потоки,

//которые ожидают блокировки.

if (g_fShutdown) {

// Уведомляем о завершении текущего потока

AddText(hWndLB, TEXT("[Xd] bye bye"), nThreadNum);

272Часть II. Приступаем к работе

//Удерживать блокировку больше не требуется

ReleaseSRWLockExclusive(&g_srwLock);

//Уведомить все остальные ожидающие потоки, если пора

//завершения работу

WakeAllConditionVa riable(&g_cvReadyToP roduce);

//Завершение потока и запись строк «bye bye» return(0);

}else {

//Добавляем в очередь новый элемент

g_q.AddElement(e);

// Показываем результат обработки элемента

AddText(hWndLB, TEXT("[Xd] Adding Xd"), nThreadNum, nRequestNum);

//Удерживать блокировку больше не требуется

ReleaseSRWLockExclusive(&g_srwLock);

//Уведомляем читающие потоки о том, что в очереди появился

//новый элемент

WakeAllConditionVariable(&g_cvReadyToConsume);

// Ожидаем добавления нового элемента

Sleep(1500);

}

}

// Уведомляем о завершении текущего потока

AddText(hWndLB, TEXT("[Xd] bye bye"), nThreadNum);

return(0);

}

Цикл for увеличивает значения счетчика запросов, генерируемых этим потоком. Поток завершается, если булева переменная g_fShutdown принимает значение TRUE в результате закрытия главного окна приложения либо щелчка кнопки Stop. Я еще вернусь к этому вопросу при обсуждении проблем, связанных с остановкой фоновых клиентских и серверных потоков командой их активного потока, обслуживающего пользовательский интерфейс.

Перед попыткой добавления нового элемента в очередь поток должен получить SRWLock для монопольного доступа, вызвав функцию AcquireSRWLockExclusive. Если блокировка уже получена другим клиентским или серверным потоком, вызывающий поток блокируется при вызове AcquireSRWLockExchmve в ожидании освобождения блокировки. Когда эта

Глава 8. Синхронизация потоков в пользовательском режиме.docx 273

функция возвращает управление, блокировка устанавливается, но кроме этого для постановки запроса в очередь должно выполняться некоторое условие, а именно: в очереди должно быть место. Если очередь заполнена, необходимо «усыпить» записывающий поток, пока один из читающих потоков не обработает запрос и не освободит в очереди место для размещения нового запроса. Однако блокировку необходимо освободить до того, как поток заснет, иначе случится взаимная блокировка потоков: ни один читающий поток не сможет освободить очередь: им будет отказано в доступе, поскольку блокировка будет еще занята. Именно это и делает функция SleepConditionVariableSRW, которая освобождает переданную ей в качестве параметра блокировку g_srwLock и «усыпляет» поток, пока функция g_cvReadyToProduce не присвоит условной переменной нужное значение вызовом WakeConditionVariable. Последнюю операцию выполняет серверный поток, как только в очереди освобождается место.

Когда функция SleepConditionVariableSRW вернет управление, будут выполняться два условия: будет установлена блокировка, а другой поток, присвоив условной переменной нужное значение, уведомит клиентские потоки о том, что в очереди появилось место. К этому моменту поток уже готов к постановке в очередь нового запроса. Однако перед этим он проверяет, не поступила ли во время его сна команда на завершение работы и, если нет, добавляет в очередь новый запрос. При этом клиентский поток получает уведомление о том, что он должен обновить свой список и происходит освобождение блокировки вызовом ReleaseSRWLockExclusive. Перед следующим оборотом цикла вызывается функция WakeAllConditionVariable с передачей &g_cvReadyToConsume в качестве параметра,

чтобы пробудить все потоки для обработки данных.

Обработка запросов серверными потоками

При запуске приложения создается два серверных потока с идентичными функциями обратного вызова. Каждый из этих потоков обрабатывает запросы с четными либо нечетными номерами, вызывая функцию ConsumeElement в цикле, пока переменной g_fShutdown не будет присвоено значение TRUE. Вспомогательная функция возвращает TRUE после успешной обработки запроса и FALSE, если оказывается, что g_fShutdown = TRUE.

BOOL ConsumeEleroent(lnt nThreadNum, int nRequestNum, HWND hWndLB) {

//Получаем доступ к очереди для обработки нового элемента

AcquireSRWLockShared(&g_srwLock);

//Усыпляем поток, пока не появятся запросы для обработки.

//Проверяем, не отдана ли команда на прекращение работы,

//пока поток спал.

while (g_q.IsEmpty(nThreadNum) && !g_fShutdown) {

274 Часть II. Приступаем к работе

// В очереди нет доступных элементов

AddText(hWndLB, TEXT("[*d] Nothing to process"), nThreadNum);

//Очередь пуста, ждем, пока записывающий поток не добавит

//в очередь новые запросы, после чего функция вернет управление,

//установив блокировку, разрешающую совместный доступ для чтения.

SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);

}

//Перед завершением работы необходимо освободить блокировку, чтобы

//уведомить потоки через условные переменные.

if (g_fShutdown) {

//уведомляем о завершении текущего потока

AddText(hWndLB, TEXT("[Xd] bye bye"), nThreadNum);

//Другой записывающий поток может ожидать блокировки,

//поэтому необходимо освободить ее перед завершением.

ReleaseSRWLockShared(&g_srwLock);

//Уведомляем читающие потоки и

//пробуждаем их.

WakeConditionVariable(&g_cvReadyToConsume);

return(FALSE);

}

//Получаем первый элемент очереди

CQueue::ELEMENT e;

//Примечание. Нет нужды проверять результат, поскольку функция

//IsEmpty вернула FALSE

g_q.GetNewElement(nThreadNum, e);

//Удерживать блокировку больше не требуется

ReleaseSRWLockShared(&g_s rwLock);

//Показываем результат обработки запроса

AddText(hWndLB, TEXT("[%d] Processing %d:%d"), nThreadNum, e.m_nThreadNum, e.m_nRequestNum);

//В очереди освободилось место для нового запроса,

//поэтому пробуждаем записывающий поток.

WakeConditionVariable(&g_cvReadyToProduce);

return(TRUE);

}

Глава 8. Синхронизация потоков в пользовательском режиме.docx 275

DWORD WINAPI ReaderThread(PVOID pvParam) {

int nThreadNum = PtrToUlong(pvParam);

HWND hWndLB i GetDlgItem(g_hWnd, IDC_SERVERS);

for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {

if (!ConsumeElement(nThreadNum, nRequestNum, hWndLB)) return(0);

Sleep(2500); // Ждем перед чтением следующего элемента

}

//Во время сна потока установлена переменная g_fShutdown,

//поэтому сообщаем о завершении текущего потока.

AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);

return(0);

}

Перед обработкой запроса поток вызывает AcquireSRWLockShared, чтобы получить srwLock в режиме совместного доступа. Если блокировка в режиме монопольного доступа уже установлена клиентским потоком, вызов блокируется. Если же блокировку установил в режиме совместного доступа другой серверный поток, функция сразу возвращает управление, позволяя обработать запрос. Даже если блокировка будет успешно получена, в очереди может и не оказаться новых запросов, пригодных для обработки данным потоком. Например, в очереди может быть необработанный запрос с нечетным номером, но поток 0 обрабатывает только запросы с четными номерами. В этом случае списку серверного потока отправляется сообщение, а сам поток блокируется при вызове SleepConditionVariableSRW, пока клиентский поток не поставит в очередь новый запрос и не присвоит условной переменной g_cvReadyТоConsume значение, разрешающее его обработать. Когда функция SteepConditionVariableSRW вернет управление, блокировка g_srwLock будет установлена, а в очереди будет новый запрос. И в этом случае номер запроса может оказаться неподходящим, поэтому SleepConditionVariableSRW вызывается в цикле и проверяет наличие в очереди запроса с подходящим номером. Заметьте, что в этом примере используются две условных переменных, а не просто wReadyToConsume: одна управляет обработкой запросов с четными номерами, а другая — обработкой запросов с нечетными номерами. Это позволяет не будить зря серверные потоки, если в очереди нет запросов с подходящими номерами. Сейчас приложение реализовано так, что серверные потоки получают блокировку в режиме совместного доступа, даже если очередь обновляется во время вызова GetNewElement, поскольку поле m_nStamp в представляющей запрос структуре установлено в 0, показывая, что запрос

276 Часть II. Приступаем к работе

обрабатывается. В данном примере это не проблема, поскольку запросы «поделены» между серверными потоками: поток 0 обрабатывает только запросы с четными номерами, а поток 1 — только с нечетными номерами.

Если в очереди обнаруживается запрос с походящим номером, то он извлекается, вызывается функция ReleaseSRWLockShared и серверному списку направляется сообщение. Теперь пора разбудить клиентские потоки вызовом

WakeConditionVariable с передачей &g_cvReadyToProduce в качестве параметра и уведомить их о наличии свободного места в очереди.

Взаимные блокировки при остановке потоков

Добавляя к окну приложения кнопку Stop, я не думал, что ее щелчок приведет к взаимной блокировке (deadlock). Для остановки клиентских и серверных потоков используется простой код следующего вида:

void StopProcessing() { if (!g_fShutdown) {

//уведомляем потоки о необходимости остановки

InterlockedExchangePointer((PLONG*) &g_fShutdown, (LONG) TRUE);

//пробуждаем все потоки, ожидающие на условных переменных

WakeAllConditionVariable(&g_cvReadyToConsume); WakeAllConditionVariable(&g_cvReadyToProduce);

//дождемся завершения всех потоков и выполним очистку

WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE);

//Не забудем освободить ресурсы ядра.

//Примечание. Это не обязательно, поскольку завершается весь

//процесс.

while {g_nNumTh reads-) CloseHandle(g_hThreads[g_nNumThreads]);

// закрываем все списки

 

AddText(GetDlgItem(g_hWnd, IDC_SERVERS), TEXT("

---------------"));

AddText(GetDlgItem(g_hWnd, IDC_CLIENTS), TEXT("---------------

"));

}

}

Флаг g_fShutdown устанавливается в TRUE, а обе условные переменные переводятся в состояние, разрешающее пробуждение потоков (вызовом WakeAllConditionVariable). После этого остается только вызвать WaitForMultipleObjects, ис-

пользуя массив описателей работающих потоков в качестве параметра. Когда WaitForMultipleObjects вернет управление, описатели потоков будут закрыты, а в списки будет добавлена последняя строка.

Глава 8. Синхронизация потоков в пользовательском режиме.docx 277

Предполагается, что когда вызов WakeAllConditionVariabte пробудит потоки ото сна, в который их погрузила функция SleepConditionVariableSRW, они начинают отслеживать значение флага g_fShutdown и после его установки просто завершаются, отправляя в списки строку «bye bye». Вот тут-то (при отправке сообщения списку) и может случиться взаимная блокировка. Если код, исполняющий функцию StopProcessing, работает с описателем сообщения WM_COMMAND, то поток пользовательского интерфейса, ответственный за обработку оконных сообщений, блокируется в функции WaitForMultipleObjects. Если при этом какойнибудь клиентский или серверный поток вызовет ListBox_SetCurSel и ListBox_AddString, чтобы добавить к списку новый элемент, поток пользовательского интерфейса не сможет ответить на вызов, и... вот вам взаимная блокировка! Я решил деактивировать кнопку Stop на время исполнения обработчика сгенерированных ей сообщений и породить другой поток для вызова функции StopProcessing. При этом нет риска взаимной блокировки, поскольку обработчик сообщений сразу же возвращает управление:

DWORD WINAPI StoppingThread(PVOID pvParam) {

StopProcessing();

return(0);

}

void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify)

{

switch (id) { case IDCANCEL:

EndDialog(hWnd, id); break;

case IDC_BTN_STOP:

{

//StopProcessing нельзя вызывать из UI-потока из-за

//риска взаимной блокировки: для заполнения списков

//используется SendMessage(),

//следовательно, нужен отдельный поток.

DWORD dwThreadID;

CloseHandle(chBEGINTHREADEX(NULL, 0, StoppingThread,

NULL, 0, &dwThreadID));

// эту кнопку не удастся щелкнуть дважды

Button_Enable(hWndCtl, FALSE);

}

break;

}

}

Соседние файлы в предмете Программирование на C++