2 Block()
Листинг. 9 Нарушенное решение с конечным буфером
Но мы не можем это сделать, так как мы не можем узнать текущее значение семафора, есть только операции сигнала и ожидания.
Задача: написать код, который обрабатывает ограничение конечного буфера потребителя – производителя.
Сигнал конечного буфера производителя потребителя
Добавим второй семафор, чтобы отслеживать число доступного места в буфер.
1 mutex = Semaphore(1)
2 items = Semaphore(0)
3 spaces = Semaphore(buffer.size())
Листинг. 10 Инициализация конечного буфера производителя - потребителя
Когда потребитель удаляет элемент, то он должен подать сигнал об освобождении памяти. Когда приходит производитель, он должен декрементировать свободное место, в этой точке возможна блокировка до следующего сигнала потребителя.
Конечный буфер производителя потребителя. Решение.
Ниже приведено конечное решение:
1 Items.Wait()
2 mutex.wait()
3 event = buffer.get()
4 mutex.signal()
5 spaces.signal()
6
7 event.process()
Листинг. 11 Решение потребителя с конечным буфером
Код производителя симметричен:
1 event = waitForEvent()
2
3 spaces.wait()
4 mutex.wait()
5 buffer.add(event)
6 mutex.signal()
7 items.signal()
Листинг. 12 Решение производителя с конечным буфером
Для того, что бы избежать deadlock производителей и потребителей, нужно проверить наличие свободных мест до захвата мьютекса. Для лучшей производительности мьютекс освобождают до подачи сигнала.
ОСРВ RTX
Расширение IntervalZero RTX - программное средство, предназначенное для добавления функциональности «жёсткого» реального времени в системы под управлением операционных систем Microsoft Windows. Программный продукт RTX был с успехом опробован в тысячах различных автоматизированных систем управления, оборонных и аэрокосмических системах, контрольно-измерительной аппаратуре, роботах и т.д. Он позволил добиться повышения их эффективности, возможностей, степени масштабируемости и надёжности функционирования при одновременном сокращении сроков и стоимости разработки новой продукции.
Постановка задачи
Заправочную станцию обслуживает один заправщик (производитель) и заправляется один потребитель. В единицу времени можно либо сливать топливо, либо заправляться, размер бака потребителя и размер бака заправки конечны. Реализовать синхронизацию между потребителем и производителем.
Листинг программы
//////////////////////////////////////////////////////////////////
//
// RtxApp1.c - C file
//
//////////////////////////////////////////////////////////////////
#include "RtxApp1.h"
HANDLE mutex;
HANDLE items;
HANDLE spaces;
INT GasStation = 20;
INT lReleaseCount = 1;
INT Bubble;
INT BubbleM;
INT Plus = 3;
INT Minus = 4;
#define MSGSTR_SEM "Sem"
#define MUTEX_ENABLED
INT RTFCNDCL Thread1Cycle (PVOID unused)
{
while(1)
{
RtWaitForSingleObject(mutex, INFINITE);
{
Bubble = GasStation + Plus;
if (Bubble < 20)
{
GasStation += Plus;
RtPrintf("(PRODUCER)GIVE = %d ", Plus);
RtPrintf("(Fuel level = %d\n", GasStation);
}
if (Bubble > 20)
{
RtPrintf("(PRODUCER)want GIVE = %d ", Plus);
RtPrintf("Fuel level will be FULL, wait = %d\n", GasStation);
}
if (Bubble == 20)
{
GasStation += Plus;
RtPrintf("(PRODUCER)GIVE = %d ", Plus);
RtPrintf("Fuel level is FULL = %d\n", GasStation);
}
RtSleep(3000);
RtReleaseSemaphore(mutex, lReleaseCount, NULL);
}
}
return NO_ERRORS;
}
INT RTFCNDCL Thread2Cycle (PVOID unused)
{
while(1)
{
RtWaitForSingleObject(mutex, INFINITE);
{
srand(10);
Minus = rand()%10;
BubbleM = GasStation - Minus;
if (BubbleM > 0)
{
GasStation -= Minus;
RtPrintf("(CONSUMER)TAKE = %d ", Minus);
RtPrintf("Fuel level = %d\n", GasStation);
}
if (BubbleM < 0)
{
RtPrintf("(CONSUMER)want TAKE = %d ", Minus);
RtPrintf("(CONSUMER)Need more GAS, wait = %d\n", GasStation);
}
if (BubbleM == 0)
{
GasStation -= Minus;
RtPrintf("(CONSUMER)TAKE = %d ", Minus);
RtPrintf("Fuel level is EMPTY = %d\n", GasStation);
}
RtSleep(3000);
RtReleaseSemaphore(mutex, lReleaseCount, NULL);
}
}
return NO_ERRORS;
}
VOID _cdecl wmain(int argc, wchar_t **argv, wchar_t **envp)
{
// for periodic timer code
LARGE_INTEGER liPeriod; // timer period
LARGE_INTEGER time;
HANDLE hTimer = NULL; // timer handle
DWORD threadId;
DWORD dwStackSize = 0;
ULONG stackSize = 0;
ULONG sleepTime = 30000;
DWORD dwExitCode = 0;
RtPrintf("\n");
//
// Create the thread.
//
hThread1 = CreateThread(
NULL,
dwStackSize, //default
Thread1Cycle, //function
NULL, //parameters
CREATE_SUSPENDED,
&threadId
);
if(hThread1 == NULL)
{
RtPrintf("Error: could not create thread. GetLastError = %d\n", GetLastError());
return ERROR_OCCURED;
}
if(!RtSetThreadPriority( hThread1, RT_PRIORITY_MAX))
{
RtPrintf("Error: could not set thread priority. GetLastError = %d\n", GetLastError());
TerminateThread( hThread1, dwExitCode);
return ERROR_OCCURED;
}
//
// Create the CPU HOG thread.
//
hThread2 = CreateThread(
NULL,
dwStackSize, //default
Thread2Cycle, //function
NULL, //parameters
CREATE_SUSPENDED,
&threadId
);
if(hThread2 == NULL)
{
RtPrintf("Error: could not create thread. GetLastError = %d\n", GetLastError());
return ERROR_OCCURED;
}
if(!RtSetThreadPriority(hThread2, RT_PRIORITY_MIN))
{
RtPrintf("Error: could not set thread priority. GetLastError = %d\n", GetLastError());
TerminateThread( hThread2, dwExitCode);
return ERROR_OCCURED;
}
mutex = RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);
items = RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);
spaces = RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);
if (mutex==NULL)
{
RtPrintf("RtCreateSemaphore failed.");
}
if (items==NULL)
{
RtPrintf("RtCreateSemaphore failed.");
}
if (spaces==NULL)
{
RtPrintf("RtCreateSemaphore failed.");
}
RtReleaseSemaphore(mutex, lReleaseCount, NULL);
if(
(ResumeThread(hThread1) == RESUME_ERROR)
||
(ResumeThread(hThread2) == RESUME_ERROR)
)
{
RtPrintf("Error: could not resume thread. GetLastError = %d\n", GetLastError());
TerminateThread(hThread1, dwExitCode);
TerminateThread(hThread2, dwExitCode);
return ERROR_OCCURED;
}
RtSleep(150000);
//
// Stop thread.
if( !TerminateThread(hThread1, dwExitCode) || !TerminateThread(hThread2, dwExitCode))
{
RtPrintf("Error: could not terminate thread. GetLastError = %d\n", GetLastError());
return ERROR_OCCURED;
}
RtCloseHandle(mutex);
RtCloseHandle(items);
RtCloseHandle(spaces);
ExitProcess(0);
}
#include "RtxApp1.h"
//
// RTX periodic timer handler function
//
void RTFCNDCL TimerHandler(PVOID context)
{
PINT a = (PINT)context;
(*a)++;
}
//////////////////////////////////////////////////////////////////
//
// RtxApp1.h - header file
//
//////////////////////////////////////////////////////////////////
#include <windows.h>
#include <wchar.h>
#include <rtapi.h>
#include <time.h>
#include <stdio.h>
//#include <string.h>
//#include <ctype.h>
//#include <conio.h>
#include <stdlib.h>
//#include <math.h>
//#include <errno.h>
// Add DEFINES Here
//**** error codes ****//
#define NO_ERRORS 0
#define ERROR_OCCURED -1
#define RESUME_ERROR 0xFFFFFFFF
#define SUPPEND_ERROR 0xFFFFFFFF
HANDLE hThread1; // handle to the thread
HANDLE hThread2; // handle to the thread
// Add Function prototypes Here
// function prototype for periodic timer function
void
RTFCNDCL
TimerHandler(
void * nContext
);
// Interrupt handler prototype
void
RTFCNDCL
InterruptHandler(
void * nContext
);
Результаты работы
(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 20
(CONSUMER)TAKE = 1 Fuel level = 19
(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 19
(CONSUMER)TAKE = 1 Fuel level = 18
(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 18
(CONSUMER)TAKE = 1 Fuel level = 17
(PRODUCER)GIVE = 3 Fuel level is FULL = 20
(CONSUMER)TAKE = 1 Fuel level = 19
(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 19
(CONSUMER)TAKE = 1 Fuel level = 18
(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 18
(CONSUMER)TAKE = 1 Fuel level = 17
Листинг. 13 Результат исполнения
Рис. 1 Результат работы
Список литературы
The Little Book of SEMAPHORES, Allen B. Downey, 2009