Синхронизация процессов с помощью событий. Объекты синхронизации в Windows

В предыдущих частях статьи я рассказал об общих принципах и конкретных методах построения многопоточных приложений. Различным потокам практически всегда периодически требуется взаимодействие между собой, при этом неизбежно возникает необходимость в синхронизации. Сегодня мы рассмотрим важнейший, самый мощный и универсальный инструмент синхронизации Windows: объекты синхронизации ядра.

WaitForMultipleObjects и другие функции ожидания

Как вы помните, для синхронизации потоков, обычно требуется временно приостановить выполнение одного из потоков. При этом он должен быть переведён средствами операционной системы в состояние ожидания, в котором он не занимает процессорное время. Мы уже знаем две функции, которые могут это делать: SuspendThread и ResumeThread . Но как я рассказывал в предыдущей части статьи, в силу некоторых особенностей эти функции непригодны для синхронизации.

Сегодня мы рассмотрим другую функцию, которая также переводит в поток в состояние ожидания, но в отличие от SuspendThread/ResumeThread , специально предназначена именно для организации синхронизации. Это WaitForMultipleObjects . Поскольку эта функция очень важна, я несколько отступлю от своего правила не вдаваться в детали API и расскажу о ней подробнее, даже приведу ее прототип:

DWORD WaitForMultipleObjects(

DWORD nCount, // число объектов в массиве lpHandles

CONST HANDLE * lpHandles, // указатель на массив описателей объектов ядра

BOOL bWaitAll, // флаг, означающей надо ли дожидаться всех объектов или достаточно одного

DWORD dwMilliseconds // таймаут

Главный параметр этой функции - это указатель на массив хэндлов объектов ядра. О том, что это за объекты, мы поговорим ниже. Пока нам важно знать то, что любой из таких объектов может находиться в одном из двух состояний: нейтральном или «сигнализирующем» (signaled state). Если флаг bWaitAll равен FALSE, функция вернет управление, как только хотя бы один из объектов подаст сигнал. А если флаг равен TRUE, это произойдет только тогда, когда сразу все объекты начнут сигнализировать (как мы увидим, это важнейшее свойство этой функции). В первом случае по возвращаемому значению можно узнать, какой именно из объектов подал сигнал. Надо вычесть из него константу WAIT_OBJECT_0 , и получится индекс в массиве lpHandles. Если время ожидания превысило указанный в последнем параметре таймаут, функция прекратит ожидание и вернет значение WAIT_TIMEOUT . В качестве таймаута можно указать константу INFINITE , и тогда функция будет ждать «до упора», а можно наоборот 0, и тогда поток вообще не будет приостановлен. В последнем случае функция вернет управление немедленно, но по ее результату можно будет узнать состояние объектов. Последний прием используется очень часто. Как видите, эта функция обладает богатыми возможностями. Имеется еще несколько WaitForXXX функций, но все они представляют собой вариации на тему главной. В частности, WaitForSingleObject представляет собой всего лишь ее упрощенный вариант. Остальные имеют каждая свою дополнительную функциональность, но применяются, в общем-то, реже. Например, они дают возможность реагировать не только на сигналы объектов ядра, но и на поступление новых оконных сообщений в очередь потока. Их описание, так же как и детальные сведения о WaitForMultipleObjects , вы, как обычно, найдете в MSDN.

Теперь о том, что же это за таинственные «объекты ядра». Начнем с того, что в их число входят сами потоки и процессы. Они переходят в сигнализирующее состояние сразу по завершении. Это очень важная возможность, поскольку очень часто бывает необходимо отслеживать момент завершения потока или процесса. Пусть, например, наше серверное приложение с набором рабочих потоков должно завершиться. Управляющий поток при этом должен каким-либо способом проинформировать рабочие потоки о том, что пора заканчивать работу (например, установив глобальный флаг), после чего дождаться, пока все потоки не завершатся, сделав все необходимые для корректного завершения действия: освободив ресурсы, информировав клиентов о завершении работы, закрыв сетевые соединения и т.п.

То, что потоки включают сигнал по окончанию работы, позволяет решить задачу синхронизации с завершением потока предельно легко:

// Пусть для простоты у нас будет всего один рабочий поток. Запускаем его:

HANDLE hWorkerThread = :: CreateThread(...);

// Перед окончанием работы надо каким-либо образом сообщаем рабочему потоку, что пора закачивать.

// Ждем завершения потока:

DWORD dwWaitResult = :: WaitForSingleObject( hWorkerThread, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* обработка ошибки */ }

// "Хэндл" потока можно закрыть:

VERIFY(:: CloseHandle( hWorkerThread );

/* Если CloseHandle завершилась неудачей и вернула FALSE, я не выбрасываю исключение. Во-первых, даже если бы это произошло из-за системной ошибки, это не имело бы прямых последствий для нашей программы, ведь раз мы закрываем хендл, значит никакой работы с ним в дальнейшем не предполагается. Реально же неудача CloseHandle может означать только ошибку в вашей программе. Поэтому вставим здесь макрос VERIFY, чтобы не пропустить её на этапе отладки приложения. */

Аналогично будет выглядеть код, ожидающий завершения процесса.

Если бы такой встроенной в систему возможности не было, рабочий поток должен был бы сам как-то передать информацию о своем завершении главному. Даже если бы он делал бы это в самую последнюю очередь, главный поток не мог бы быть уверенным, что рабочему не осталось выполнить хотя бы пару ассемблерных инструкций. В отдельных ситуациях (например, если код потока находится в DLL, которая должна быть выгружена по его завершении) это может привести к фатальным последствиям.

Хочу напомнить вам, что даже после того, как поток (или процесс) завершился, его описатели все равно остаются в силе до тех пор, пока явно не будут закрыты функцией CloseHandle . (Кстати, не забывайте делать это!) Это сделано как раз для того, чтобы в любой момент можно было бы проверить состояние потока.

Итак, функция WaitForMultipleObjects (и ее аналоги) позволяет синхронизировать выполнение потока с состоянием объектов синхронизации, в частности других потоков и процессов.

Специальные объекты ядра

Перейдем к рассмотрению объектов ядра, которые предназначены специально для синхронизации. Это события, семафоры и мьютексы. Кратко рассмотрим каждый из них:

Событие (event)

Пожалуй, самый простой и фундаментальный синхронизирующий объект. Это всего лишь флаг, которому функциями SetEvent/ResetEvent можно задать состояние: сигнализирующее или нейтральное. Событие - это самый удобный способ передать сигнал ожидающему потоку, что некое событие свершилось (потому оно так и называется), и можно продолжать работу. С помощью события мы легко решим проблему синхронизации при инициализации рабочего потока:

// Пусть для простоты хэндл события будет храниться в глобальной переменной:

HANDLE g_hEventInitComplete = NULL; // никогда не оставляем переменную неинициализированной!

{ // код в главном потоке

// создаем событие

g_hEventInitComplete = :: CreateEvent( NULL,

FALSE, // об этом параметре мы еще поговорим

FALSE, // начальное состояние - нейтральное

if (! g_hEventInitComplete ) { /* не забываем про обработку ошибок */ }

// создаем рабочий поток

DWORD idWorkerThread = 0 ;

HANDLE hWorkerThread = :: CreateThread( NULL, 0 , & WorkerThreadProc, NULL, 0 , & idWorkerThread );

if (! hWorkerThread ) { /* обработка ошибки */ }

// ждем сигнала от рабочего потока

DWORD dwWaitResult = :: WaitForSingleObject( g_hEventInitComplete, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* ошибка */ }

// вот теперь можно быть уверенным, что рабочий поток завершил инициализацию.

VERIFY(:: CloseHandle( g_hEventInitComplete )); // не забываем закрывать ненужные объекты

g_hEventInitComplete = NULL;

// функция рабочего потока

DWORD WINAPI WorkerThreadProc( LPVOID _parameter )

InitializeWorker(); // инициализация

// сигнализируем, что инициализация завершена

BOOL isOk = :: SetEvent( g_hEventInitComplete );

if (! isOk ) { /* ошибка */ }

Надо заметить, что существуют две заметно отличающиеся разновидности событий. Мы можем выбрать одну из них с помощью второго параметра функции CreateEvent . Если он TRUE, создается событие, состояние которого управляется только вручную, то есть функциями SetEvent/ResetEvent . Если же он FALSE, будет создано событие с автосбросом. Это означает, что как только некий поток, ожидающий данного события, будет освобожден сигналом от этого события, оно автоматически будет сброшено обратно в нейтральное состояние. Наиболее ярко их отличие проявляется в ситуации, когда одного события ожидают сразу несколько потоков. Событие с ручным управлением подобно стартовому пистолету. Как только оно будет установлено в сигнализирующее состояние, будут освобождены сразу все потоки. Событие же с автосбросом похоже на турникет в метро: оно отпустит лишь один поток и вернется в нейтральное состояние.

Мьютекс (mutex)

По сравнению с событием это более специализированный объект. Обычно он используется для решения такой распространенной задачи синхронизации, как доступ к общему для нескольких потоков ресурсу. Во многом он похож на событие с автосбросом. Основное отличие состоит в том, что он имеет специальную привязку к конкретному потоку. Если мьютекс находится в сигнальном состоянии - это означает, что он свободен и не принадлежит ни одному потоку. Как только некий поток дождался этого мьютекса, последний сбрасывается в нейтральное состояние (здесь он как раз похож на событие с автосбросом), а поток становится его хозяином до тех пор, пока явно не освободит мьютекс функцией ReleaseMutex , или же не завершится. Таким образом, чтобы быть уверенным, что с разделяемыми данными одновременно работает только один поток, следует все места, где происходит такая работа, окружить парой: WaitFor - ReleaseMutex :

HANDLE g_hMutex;

// Пусть хэндл мьютекса хранится в глобальной переменной. Его конечно надо создать заранее, до запуска рабочих потоков. Будем считать, что это уже было сделано.

int iWait = :: WaitForSingleObject( g_hMutex, INFINITE );

switch ( iWait ) {

case WAIT_OBJECT_0: // Все нормально

break ;

case WAIT_ABANDONED: /* Какой-то поток завершился, забыв вызвать ReleaseMutex. Скорее всего, это означает ошибку в вашей программе! Поэтому на всякий случай вставим здесь ASSERT, но в окончательно версии (release) будем считать этот код успешным. */

ASSERT( false );

break ;

default :

// Здесь должна быть обработка ошибки.

// Защищенный мьютексом участок кода.

ProcessCommonData();

VERIFY(:: ReleaseMutex( g_hMutex ));

Чем же мьютекс лучше события с автосбросом? В приведенном примере его также можно было бы использовать, только ReleaseMutex надо было бы заменить на SetEvent . Однако может возникнуть следующая сложность. Чаще всего работать с общими данными приходится в нескольких местах. Что будет, если ProcessCommonData в нашем примере вызовет функцию, которая работает с этими же данными и в которой уже есть своя пара WaitFor - ReleaseMutex (на практике это встречается весьма часто)? Если бы мы использовали событие, программа, очевидно, зависла бы, поскольку внутри защищенного блока событие находится в нейтральном состоянии. Мьютекс же устроен более хитро. Для потока-хозяина он всегда остается в сигнализирующем состоянии, несмотря на то, что для всех остальных потоков он при этом находится в нейтральном. Поэтому если поток захватил мьютекс, повторный вызов WaitFor функции не приведет к блокировке. Более того, в мьютекс встроен еще и счетчик, так что ReleaseMutex должна быть вызвана столько же раз, сколько было вызовов WaitFor . Таким образом, мы можем смело защищать каждый участок кода, работающий с общими данными, парой WaitFor - ReleaseMute x, не волнуясь о том, что этот код может быть вызван рекурсивно. Это делает мьютекс очень простым в использовании инструментом.

Семафор (semaphore)

Еще более специфический объект синхронизации. Должен сознаться, что в моей практике еще не было случая, где он пригодился бы. Семафор предназначен для того, чтобы ограничить максимальное число потоков, одновременно работающих с неким ресурсом. По сути, семафор - это событие со счетчиком. Пока этот счетчик больше нуля, семафор находится в сигнализирующем состоянии. Однако каждый вызов WaitFor уменьшает этот счетчик на единицу до тех пор, пока он не станет равным нулю и семафор перейдет в нейтральное состояние. Подобно мьютексу, для семафора есть функция ReleaseSemaphor , увеличивающая счётчик. Однако в отличие от мьютекса семафор не привязан к потоку и повторный вызов WaitFor/ReleaseSemaphor еще раз уменьшит/увеличит счетчик.

Как можно использовать семафор? Например, с его помощью можно искусственно ограничить многопоточность. Как я уже рассказывал, слишком много одновременно активных потоков может заметно ухудшить производительность всей системы из-за частых переключений контекстов. И если уж нам пришлось создать слишком много рабочих потоков, можно ограничить число одновременно активных потоков числом порядка числа процессоров.

Что ещё можно сказать про объекты синхронизации ядра? Очень удобна возможность давать им имена. Соответствующий параметр есть у всех функций, создающих объекты синхронизации: CreateEvent , CreateMutex , CreateSemaphore . Если вы дважды вызовете, к примеру CreateEvent , оба раза указав одно и тоже непустое имя, то второй раз функция вместо того, чтобы создать новый объект, вернет хэндл уже существующего. Это произойдет, даже если второй вызов был сделан из другого процесса. Последнее очень удобно в тех случаях, когда требуется синхронизировать потоки, принадлежащие разным процессам.

Когда объект синхронизации вам больше не требуется, не забывайте вызвать функцию CloseHandle , о которой я уже упоминал, когда рассказывал о потоках. На самом деле, она не обязательно сразу же удалит объект. Дело в том, что хэндлов у объекта может быть создано несколько, тогда он будет удален только когда будет закрыт последний из них.

Хочу напомнить, что лучший способ гарантировать, чтобы CloseHandle или подобная «очищающая» функция была обязательно вызвана, даже в случае нештатной ситуации, - это поместить ее в деструктор. Об этом, кстати, в свое время неплохо и очень подробно рассказывалось в статье Кирилла Плешивцева «Умный деструктор». В приведённых выше примерах я не использовал этот приём исключительно в учебных целях, чтобы работа функций API была более наглядной. В реальном же коде для очистки следует всегда использовать классы-оболочки с умными деструкторами.

Кстати, с функцией ReleaseMutex и подобными постоянно возникает та же проблема, что и с CloseHandle . Она обязана быть вызвана по окончании работы с общими данными, независимо от того, насколько успешно эта работа была завершена (ведь могло быть выброшено исключение). Последствия «забывчивости» здесь более серьёзны. Если не вызванный CloseHandle приведёт лишь у утечке ресурсов (что тоже плохо!), то не освобожденный мьютекс не позволит другим потокам работать с общим ресурсом до самого завершения давшего сбой потока, что скорее всего не позволит приложению нормально функционировать. Чтобы избежать этого, нам опять поможет специально обученный класс с умным деструктором.

Заканчивая обзор объектов синхронизации, хочется упомянуть об объекте, которого нет в Win32 API. Многие мои коллеги высказывают недоумение, почему в Win32 нет специализированного объекта типа «один пишет, многие читают». Этакий «продвинутый мьютекс», следящий за тем, чтобы получить доступ к общим данным на запись мог бы одновременно только один поток, а только на чтение - сразу несколько. Подобный объект можно найти в UNIX"ах. Некоторые библиотеки, например от Borland, предлагают эмулировать его на основе стандартных объектов синхронизации. Впрочем, реальная польза от таких эмуляций весьма сомнительна. Эффективно такой объект может быть реализован только на уровне ядра операционной системы. Но в ядре Windows подобный объект не предусмотрен.

Почему же разработчики ядра Windows NT не позаботились об этом? Чем мы хуже UNIX? На мой взгляд, ответ заключается в том, что реальной потребности в таком объекте для Windows пока просто не возникало. На обычной однопроцессорной машине, где потоки все равно физически не могут работать одновременно, он будет практически эквивалентен мьютексу. На многопроцессорной машине он может дать выигрыш за счёт того, что позволит читающим потокам работать параллельно. Вместе с тем, реально этот выигрыш станет ощутим лишь когда вероятность «столкновения» читающих потоков велика. Несомненно, что к примеру на 1024-процессорной машине подобный объект ядра будет жизненно необходим. Подобные машины существуют, но это - специализированные системы, работающие под специализированными ОС. Зачастую, такие ОС строят на основе UNIX, вероятно оттуда объект типа «один пишет, многие читают» попал и в более общеупотребительные версии этой системы. Но на привычных нам x86-машинах установлен, как правило, всего один и лишь изредка два процессора. И только самые продвинутые модели процессоров типа Intel Xeon поддерживают 4-х и даже более процессорные конфигурации, но такие системы пока остаются экзотикой. Но даже на такой «продвинутой» системе «продвинутый мьютекс» сможет дать заметный выигрыш в производительности лишь в очень специфических ситуациях.

Таким образом, реализация «продвинутого» мьютекса просто не стоит свеч. На «малопроцессорной» машине он может оказаться даже менее эффективным из-за усложнения логики объекта по сравнению со стандартным мьютексом. Учтите, реализация подобного объекта не так проста, как может показаться на первый взгляд. При неудачной реализации, если читающих потоков будет слишком много, пишущему потоку будет просто «не пробиться» к данным. По этим причинам я также не рекомендую вам пытаться эмулировать такой объект. В реальных приложениях на реальных машинах обычный мьютекс или критическая секция (о которой речь пойдет в следующей части статьи) прекрасно справится с задачей синхронизации доступа к общим данным. Хотя, я полагаю, с развитием ОС Windows объект ядра «один пишет многие читают» рано или поздно появится.

Примечание. На самом деле, объект «один пишет - многие читают» в Windows NT всё-таки есть. Просто, когда я писал эту статью, ещё не знал об этом. Этот объект носит название «ресурсы ядра» и не доступен для програм пользовательского режима, вероятно поэтому и не слишком известен. Подобности о нём можно найти в DDK. Спасибо Константину Манурину, что указал мне на это.

Deadlock

А теперь вернемся к функции WaitForMultipleObjects , точнее к ее третьему параметру, bWaitAll. Я обещал рассказать, почему возможность ожидания сразу нескольких объектов так важна.

Почему необходима функция, позволяющая ожидать один из нескольких объектов, понятно. В отсутствие специальной функции это можно было бы сделать, разве что последовательно проверяя состояние объектов в пустом цикле, что, конечно же, недопустимо. А вот необходимость в специальной функции, позволяющей ожидать момента, когда сразу несколько объектов перейдут в сигнальное состояние, не так очевидна. Действительно, представим следующую характерную ситуацию: нашему потоку в определенный момент необходим доступ сразу к двум наборам общих данных, за каждый из которых отвечает свой мьютекс, назовем их А и В. Казалось бы, поток может сначала подождать, пока освободиться мьютекс А, захватить его, затем подождать освобождения мьютекса В... Вроде бы, можно обойтись парой вызовов WaitForSingleObject . Действительно, это будет работать, но только до тех пор, пока все остальные потоки будут захватывать мьютексы в том же порядке: сначала А, потом В. Что произойдет, если некий поток попытается сделать наоборот: захватить сначала В, затем А? Рано или поздно возникнет ситуация, когда один поток захватил мьютекс А, другой В, первый ждет, когда освободится В, второй А. Понятно, что они этого никогда не дождутся и программа зависнет.

Подобная взаимная блокировка (deadlock) является очень характерной ошибкой. Как и все ошибки, связанные с синхронизацией, она проявляется лишь время от времени и способна испортить программисту немало нервов. В то же время взаимоблокировкой чревата практически любая схема, в которую вовлечены несколько объектов синхронизации. Поэтому, этой проблеме следует уделять особое внимание на этапе проектирования такой схемы.

В приведенном простом примере избежать блокировки довольно легко. Надо потребовать, чтобы все потоки захватывали мьютексы в определенном порядке: сначала А, потом В. Однако в сложной программе, где много различным образом связанных друг с другом объектов, добиться этого обычно бывает не так просто. В блокировку могут быть вовлечены не два, а много объектов и потоков. Поэтому, самый надёжный способ избежать взаимной блокировки в ситуации, когда потоку требуется сразу несколько объектов синхронизации - это захватывать их все одним вызовом функции WaitForMultipleObjects с параметром bWaitAll=TRUE. По правде говоря, при этом мы всего лишь перекладываем проблему взаимных блокировок на ядро операционной системы, но главное - это уже будет не наша забота. Впрочем, в сложной программе со множеством объектов, когда не всегда сразу можно сказать, какие именно из них потребуются для выполнения той или иной операции, свести все вызовы WaitFor в одно место и объединить тоже часто оказывается не просто.

Таким образом, есть два пути избежать возникновения deadlock. Надо либо следить, чтобы объекты синхронизации всегда захватывались потоками строго в одном и том же порядке, либо чтобы они захватывались единым вызовом WaitForMultipleObjects . Последний метод более прост и предпочтителен. Однако на практике, с выполнением и того и другого требования постоянно возникают сложности, приходится сочетать оба этих подхода. Проектирование сложных схем синхронизации часто является весьма нетривиальной задачей.

Пример организации синхронизации

В большинстве типичных ситуаций, вроде тех, что я описывал выше, организовать синхронизацию не составляет труда, достаточно события или мьютекса. Но периодически встречаются более сложные случаи, где решение проблемы не так очевидно. Хочу проиллюстрировать это на конкретном примере из своей практики. Как вы увидите, решение оказалось удивительно простым, но прежде чем я нашел его, мне пришлось перепробовать несколько неудачных вариантов.

Итак, задача. Практически во всех современных менеджерах закачек (download managers), или попросту говоря «качалках» есть возможность ограничения трафика, чтобы работающая в фоновом режиме «качалка» не сильно мешала пользователю лазить по Сети. Я разрабатывал похожую программу, и передо мной была поставлена задача реализовать именно такую «фичу». Моя качалка работала по классической схеме многопоточности, когда каждой задачей, в данном случае скачиванием конкретного файла, занимается отдельный поток. Ограничение трафика должно было быть суммарным для всех потоков. То есть, нужно было добиться, чтобы в течение заданного интервала времени все потоки считывали из своих сокетов не более определенного количества байтов. Просто разделить этот лимит поровну между потоками, очевидно, будет неэффективно, поскольку скачивание файлов может идти весьма неравномерно, один будет качаться быстро, другой медленно. Следовательно, нужен общий для всех потоков счетчик, сколько байт считано, и сколько еще можно считать. Здесь-то и не обойтись без синхронизации. Дополнительную сложность задаче придало требование того, чтобы в любой момент любой из рабочих потоков можно было остановить.

Сформулируем задачу более детально. Систему синхронизации я решил заключить в специальный класс. Вот его интерфейс:

class CQuota {

public : // methods

void Set( unsigned int _nQuota );

unsigned int Request( unsigned int _nBytesToRead, HANDLE _hStopEvent );

void Release( unsigned int _nBytesRevert, HANDLE _hStopEvent );

Периодически, скажем раз в секунду, управляющий поток вызывает метод Set, устанавливая квоту на скачивание. Перед тем, как рабочий поток считает данные, полученные из сети, он вызывает метод Request, который проверяет, что текущая квота не равна нулю, и если да, возвращает не превышающее текущую квоту число байтов, которые можно считать. Квота соответственно уменьшается на это число. Если же при вызове Request квота равна нулю, вызывающий поток должен подождать, пока она не появится. Иногда случается, что реально получено меньше байтов, чем было запрошено, в таком случае поток возвращает часть выделенной ему квоты методом Release. И, как я уже сказал, пользователь в любой момент может дать команду прекратить скачивание. В таком случае ожидание надо прервать независимо от наличия квоты. Для этого используется специальное событие: _hStopEvent. Поскольку задачи можно запускать и останавливать независимо друг от друга, для каждого рабочего потока используется свое событие остановки. Его описатель передается методам Request и Release.

В одном из неудачных вариантов я попробовал использовать сочетание мьютекса, синхронизирующего доступ к классу CQuota и события, сигнализирующего наличие квоты. Однако в эту схему никак не вписывается событие остановки. Если поток желает получить квоту, то его состояние ожидания должно управляться сложным логическим выражением: ((мьютекс И событие наличия квоты) ИЛИ событие остановки). Но WaitForMultipleObjects такого не позволяет, можно объединить несколько объектов ядра либо операцией И, либо ИЛИ, но не вперемешку. Попытка разделить ожидание двумя последовательными вызовами WaitForMultipleObjects неизбежно приводит к deadlock. В общем, этот путь оказался тупиковым.

Не буду больше напускать тумана и расскажу решение. Как я уже говорил, мьютекс очень похож на событие с автосбросом. И здесь мы имеем как раз тот редкий случай, когда удобнее использовать именно его, но не одно а сразу два:

class CQuota {

private: // data

unsigned int m_nQuota;

CEvent m_eventHasQuota;

CEvent m_eventNoQuota;

Одновременно может быть установлено только одно из этих событий. Любой поток, произведший манипуляции с квотой должен установить первое событие, если оставшаяся квота не равна нулю, и второе, если квота исчерпана. Поток, желающий получить квоту, должен подождать первого события. Потоку же, увеличивающего квоту, достаточно дождаться любого из этих событий, поскольку если оба они находятся в сброшенном состоянии, это означает, что с квотой в данный момент работает другой поток. Таким образом, два события выполняют сразу две функции: синхронизации доступа к данным и ожидание. Наконец, поскольку поток ожидает одного из двух событий, сюда легко включается и событие, дающее сигнал на остановку.

Приведу для примера реализацию метода Request. Остальные реализуются аналогично. Я слегка упростил код, использовавшийся в реальном проекте:

unsigned int CQuota:: Request( unsigned int _nRequest, HANDLE _hStopEvent )

if (! _nRequest ) return 0 ;

unsigned int nProvide = 0 ;

HANDLE hEvents[ 2 ];

hEvents[ 0 ] = _hStopEvent; // Событие остановки имеет больший приоритет. Ставим его первым.

hEvents[ 1 ] = m_eventHasQuota;

int iWaitResult = :: WaitForMultipleObjects( 2 , hEvents, FALSE, INFINITE );

switch ( iWaitResult ) {

case WAIT_FAILED:

// ОШИБКА

throw new CWin32Exception;

case WAIT_OBJECT_0:

// Событие остановки. Я обрабатывал его с помощью специального исключения, но ничто не мешает реализовать это как-то иначе.

throw new CStopException;

case WAIT_OBJECT_0+ 1 :

// Событие "квота доступна"

ASSERT( m_nQuota ); // Если сигнал подало это событие, но квоты на самом деле нет, значит где-то мы ошиблись. Надо искать баг!

if ( _nRequest >= m_nQuota ) {

nProvide = m_nQuota;

m_nQuota = 0 ;

m_eventNoQuota. Set();

else {

nProvide = _nRequest;

m_nQuota -= _nRequest;

m_eventHasQuota. Set();

break ;

return nProvide;

Маленькое замечание. Библиотека MFC в том проекте не использовалась, но, как вы наверно уже догадались, я сделал собственный класс CEvent, оболочку вокруг объекта ядра «событие», подобную MFC"шной. Как я уже говорил, такие простые классы-оболочки очень полезны, когда есть некий ресурс (в данном случае объект ядра), который необходимо не забыть освободить по окончанию работы. В остальном же нет разницы, писать ли SetEvent(m_hEvent) или m_event.Set().

Надеюсь, этот пример поможет вам спроектировать собственную схему синхронизации, если вам встретится нетривиальная ситуация. Главное - максимально тщательно проанализируйте вашу схему. Не может ли возникнуть ситуации, в которой она сработает неправильно, в частности, не может ли возникнуть блокировка? Отлавливать такие ошибки в отладчике - обычно безнадежное дело, здесь помогает только детальный анализ.

Итак, мы рассмотрели важнейшее средство синхронизации потоков: объекты синхронизации ядра. Это мощный и универсальный инструмент. С его помощью можно строить даже очень сложные схемы синхронизации. К счастью, такие нетривиальные ситуации встречаются нечасто. Кроме того, за универсальность всегда приходится расплачиваться производительностью. Поэтому во многих случаях стоит использовать другие средства синхронизации потоков, имеющиеся в Windows, такие как критические секции и атомарные операции. Они не так универсальны, зато просты и эффективны. О них мы и поговорим в следующей части.

При одновременном доступе нескольких процессов (или нескольких потоков одного процесса) к какому-либо ресурсу возникает проблема синхронизации. Поскольку поток в Win32 может быть остановлен в любой, заранее ему неизвестный момент времени, возможна ситуация, когда один из потоков не успел завершить модификацию ресурса (например, отображенной на файл области памяти), но был остановлен, а другой поток попытался обратиться к тому же ресурсу. В этот момент ресурс находится в несогласованном состоянии, и последствия обращения к нему могут быть самыми неожиданными - от порчи данных до нарушения защиты памяти.

Главной идеей, заложенной в основе синхронизации потоков в Win32, является использование объектов синхронизации и функций ожидания. Объекты могут находиться в одном из двух состояний - Signaled или Not Signaled. Функции ожидания блокируют выполнение потока до тех пор, пока заданный объект находится в состоянии Not Signaled. Таким образом, поток, которому необходим эксклюзивный доступ к ресурсу, должен выставить какой-либо объект синхронизации в несигнальное состояние, а по окончании - сбросить его в сигнальное. Остальные потоки должны перед доступом к этому ресурсу вызвать функцию ожидания, которая позволит им дождаться освобождения ресурса.

Рассмотрим, какие объекты и функции синхронизации предоставляет нам Win32 API.

Функции синхронизации

Функции синхронизации делятся на две основные категории: функции, ожидающие единственный объект, и функции, ожидающие один из нескольких объектов.

Функции, ожидающие единственный объект

Простейшей функцией ожидания является функция WaitForSingleObject:

Function WaitForSingleObject(hHandle: THandle; // идентификатор объекта dwMilliseconds: DWORD // период ожидания): DWORD; stdcall;

Функция ожидает перехода объекта hHandle в сигнальное состояние в течение dwMilliseconds миллисекунд. Если в качестве параметра dwMilliseconds передать значение INFINITE, функция будет ждать в течение неограниченного времени. Если dwMilliseconds равен 0, то функция проверяет состояние объекта и немедленно возвращает управление.

Функция возвращает одно из следующих значений:

Следующий фрагмент кода запрещает доступ к Action1 до перехода объекта ObjectHandle в сигнальное состояние (например, таким образом можно дожидаться завершения процесса, передав в качестве ObjectHandle его идентификатор, полученный функцией CreateProcess):

Var Reason: DWORD; ErrorCode: DWORD; Action1.Enabled:= FALSE; try repeat Application.ProcessMessages; Reason:= WailForSingleObject(ObjectHandle, 10); if Reason = WAIT_FAILED then begin ErrorCode:= GetLastError; raise Exception.CreateFmt(‘Wait for object failed with error: %d’, ); end; until Reason <> WAIT_TIMEOUT; finally Actionl.Enabled:= TRUE; end;

В случае когда одновременно с ожиданием объекта требуется перевести в сигнальное состояние другой объект, может использоваться функция SignalObjectAndWait:

Function SignalObjectAndWait(hObjectToSignal: THandle; // объект, который будет переведен в // сигнальное состояние hObjectToWaitOn: THandle; // объект, который ожидает функция dwMilliseconds: DWORD; // период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // завершение операции ввода-вывода): DWORD; stdcall;

Возвращаемые значения аналогичны функции WaitForSingleObject.

Объект hObjectToSignal может быть семафором, событием (event) либо мьютексом. Параметр bAlertable определяет, будет ли прерываться ожидание объекта в случае, если операционная система запросит у потока окончание операции асинхронного ввода-вывода либо асинхронный вызов процедуры. Более подробно это будет рассматриваться ниже.

Функции, ожидающие несколько объектов

Иногда требуется задержать выполнение потока до срабатывания одного или сразу всех из группы объектов. Для решения подобной задачи используются следующие функции:

Type TWOHandleArray = array of THandle; PWOHandleArray = ^TWOHandleArray; function WaitForMultipleObjects(nCount: DWORD; // Задает количество объектов lpHandles: PWOHandleArray; // Адрес массива объектов bWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds: DWORD // Период ожидания): DWORD; stdcall;

Функция возвращает одно из следующих значений:

Число в диапазоне от

WAIT_OBJECT_0 до WAIT_OBJECT_0 + nCount – 1

Если bWaitAll равно TRUE, то это число означает, что все объекты перешли в сигнальное состояние. Если FALSE - то, вычтя из возвращенного значения WAIT_OBJECT_0, мы получим индекс объекта в массиве lpHandles

Число в диапазоне от

WAIT_ABANDONED_0 до WAIT_ABANDONED_0 + nCount – 1

Если bWaitAll равно TRUE, это означает, что все объекты перешли в сигнальное состояние, но хотя бы один из владевших ими потоков завершился, не сделав объект сигнальным Если FALSE - то, вычтя из возвращенного значения WAIT_ABANDONED_0, мы получим в массиве lpHandles индекс объекта, при этом поток, владевший этим объектом, завершился, не сделав его сигнальным
WAIT_TIMEOUT Истек период ожидания
WAIT_FAILED Произошла ошибка

Например, в следующем фрагменте кода программа пытается модифицировать два различных ресурса, разделяемых между потоками:

Var Handles: array of THandle; Reason: DWORD; RestIndex: Integer; ... Handles := OpenMutex(SYNCHRONIZE, FALSE, ‘FirstResource’); Handles := OpenMutex(SYNCHRONIZE, FALSE, ‘SecondResource’); // Ждем первый из объектов Reason:= WaitForMultipleObjects(2, @Handles, FALSE, INFINITE); case Reason of WAIT_FAILED: RaiseLastWin32Error; WAIT_OBJECT_0, WAIT_ABANDONED_0: begin ModifyFirstResource; RestIndex:= 1; end; WAIT_OBJECT_0 + 1, WAIT_ABANDONED_0 + 1: begin ModifySecondResource; RestIndex:= 0; end; // WAIT_TIMEOUT возникнуть не может end; // Теперь ожидаем освобождения следующего объекта if WailForSingleObject(Handles, INFINITE) = WAIT_FAILED then RaiseLastWin32Error; // Дождались, модифицируем оставшийся ресурс if RestIndex = 0 then ModifyFirstResource else ModifySecondResource;

Описанную выше технику можно применять, если вы точно знаете, что задержка ожидания объекта будет незначительной. В противном случае ваша программа окажется «замороженной» и не сможет даже перерисовать свое окно. Если период задержки может оказаться значительным, то необходимо дать программе возможность реагировать на сообщения Windows. Выходом может стать использование функций с ограниченным периодом ожидания (и повторный вызов - в случае возврата WAIT_TIMEOUT) либо функции MsgWaitForMultipleObjects:

Function MsgWaitForMultipleObjects(nCount: DWORD; // количество объектов синхронизации var pHandles; // адрес массива объектов fWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds, // Период ожидания dwWakeMask: DWORD // Тип события, прерывающего ожидание): DWORD; stdcall;

Главное отличие этой функции от предыдущей - параметр dwWakeMask, который является комбинацией битовых флагов QS_XXX и задает типы сообщений, прерывающих ожидание функции независимо от состояния ожидаемых объектов. Например, маска QS_KEY позволяет прервать ожидание при появлении в очереди сообщений WM_KEYUP, WM_KEYDOWN, WM_SYSKEYUP или WM_SYSKEYDOWN, а маска QS_PAINT - сообщения WM_PAINT. Полный список значений, допустимых для dwWakeMask, имеется в документации по Windows SDK. При появлении в очереди потока, вызвавшего функцию, сообщений, соответствующих заданной маске, функция возвращает значение WAIT_OBJECT_0 + nCount. Получив это значение, ваша программа может обработать его и снова вызвать функцию ожидания. Рассмотрим пример с запуском внешнего приложения (необходимо, чтобы на время его работы вызывающая программа не реагировала на ввод пользователя, однако ее окно должно продолжать перерисовываться):

Procedure TForm1.Button1Click(Sender: TObject); var PI: TProcessInformation; SI: TStartupInfo; Reason: DWORD; Msg: TMsg; begin // Инициализируем структуру TStartupInfo FillChar(SI, SizeOf(SI), 0); SI.cb:= SizeOf(SI); // Запускаем внешнюю программу Win32Check(CreateProcess(NIL, "COMMAND.COM", NIL, NIL, FALSE, 0, NIL, NIL, SI, PI)); //************************************************** // Попробуйте заменить нижеприведенный код на строку // WaitForSingleObject(PI.hProcess, INFINITE); // и посмотреть, как будет реагировать программа на // перемещение других окон над ее окном //************************************************** repeat // Ожидаем завершения дочернего процесса или сообщения // перерисовки WM_PAINT Reason:= MsgWaitForMultipleObjects(1, PI.hProcess, FALSE, INFINITE, QS_PAINT); if Reason = WAIT_OBJECT_0 + 1 then begin // В очереди сообщений появился WM_PAINT – Windows // требует обновить окно программы. // Удаляем сообщение из очереди PeekMessage(Msg, 0, WM_PAINT, WM_PAINT, PM_REMOVE); // И перерисовываем наше окно Update; end; // Повторяем цикл, пока не завершится дочерний процесс until Reason = WAIT_OBJECT_0; // Удаляем из очереди накопившиеся там сообщения while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) do; CloseHandle(PI.hProcess); CloseHandle(PI.hThread) end;

Если в потоке, вызывающем функции ожидания, явно (функцией CreateWindow) или неявно (используя TForm, DDE, COM) создаются окна Windows - поток должен обрабатывать сообщения. Поскольку широковещательные сообщения посылаются всем окнам в системе, то поток, не обрабатывающий сообщения, может вызвать взаимоблокировку (система ждет, когда поток обработает сообщение, поток - когда система или другие потоки освободят объект) и привести к зависанию Windows. Если в вашей программе имеются подобные фрагменты, необходимо использовать MsgWaitForMultipleObjects или MsgWaitForMultipleObjectsEx и позволять прервать ожидание для обработки сообщений. Алгоритм аналогичен вышеприведенному примеру.

Прерывание ожидания по запросу на завершение операции ввода-вывода или APC

Windows поддерживает асинхронные вызовы процедур. При создании каждого потока (thread) с ним ассоциируется очередь асинхронных вызовов процедур (APC queue). Операционная система (или приложение пользователя - при помощи функции QueueUserAPC) может помещать в нее запросы на выполнение функций в контексте данного потока. Эти функции не могут быть выполнены немедленно, поскольку поток может быть занят. Поэтому операционная система вызывает их, когда поток вызывает одну из следующих функций ожидания:

Function SleepEx(dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function WaitForSingleObjectEx(hHandle: THandle; // Идентификатор объекта dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function WaitForMultipleObjectsEx(nCount: DWORD; // количество объектов lpHandles: PWOHandleArray;// адрес массива идентификаторов объектов bWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function SignalObjectAndWait(hObjectToSignal: THandle; // объект, который будет переведен в // сигнальное состояние hObjectToWaitOn: THandle; // объект, которого ожидает функция dwMilliseconds: DWORD; // период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function MsgWaitForMultipleObjectsEx(nCount: DWORD; // количество объектов синхронизации var pHandles; // адрес массива объектов fWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds, // Период ожидания dwWakeMask: DWORD // Тип события, прерывающего ожидание dwFlags: DWORD // Дополнительные флаги): DWORD; stdcall;

Если параметр bAlertable равен TRUE (либо если dwFlags в функции MsgWaitForMultipleObjectsEx содержит MWMO_ALERTABLE), то при появлении в очереди APC запроса на асинхронный вызов процедуры операционная система выполняет вызовы всех имеющихся в очереди процедур, после чего функция возвращает значение WAIT_IO_COMPLETION.

Такой механизм позволяет реализовать, например, асинхронный ввод-вывод. Поток может инициировать фоновое выполнение одной или нескольких операций ввода-вывода функциями ReadFileEx или WriteFileEx, передав им адреса функций-обработчиков завершения операции. По завершении вызовы этих функций будут поставлены в очередь асинхронного вызова процедур. В свою очередь, инициировавший операции поток, когда он будет готов обработать результаты, может, используя одну из вышеприведенных функций ожидания, позволить операционной системе вызвать функции-обработчики. Поскольку очередь APC реализована на уровне ядра ОС, она более эффективна, чем очередь сообщений, и позволяет реализовать гораздо более эффективный ввод-вывод.

Event (событие)

Event позволяет известить один или несколько ожидающих потоков о наступлении события. Event бывает:

Для создания объекта используется функция CreateEvent:

Function CreateEvent(lpEventAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes bManualReset, // Задает, будет Event переключаемым // вручную (TRUE) или автоматически (FALSE) bInitialState: BOOL; // Задает начальное состояние. Если TRUE - // объект в сигнальном состоянии lpName: PChar // Имя или NIL, если имя не требуется): THandle; stdcall; // Возвращает идентификатор созданного // объекта Структура TSecurityAttributes описана, как: TSecurityAttributes = record nLength: DWORD; // Размер структуры, должен // инициализироваться как // SizeOf(TSecurityAttributes) lpSecurityDescriptor: Pointer; // Адрес дескриптора защиты. В // Windows 95 и 98 игнорируется // Обычно можно указывать NIL bInheritHandle: BOOL; // Задает, могут ли дочерние // процессы наследовать объект end;

Если не требуется задание особых прав доступа под Windows NT или возможности наследования объекта дочерними процессами, в качестве параметра lpEventAttributes можно передавать NIL. В этом случае объект не может наследоваться дочерними процессами и ему задается дескриптор защиты «по умолчанию».

Параметр lpName позволяет разделять объекты между процессами. Если lpName совпадает с именем уже существующего объекта типа Event, созданного текущим или любым другим процессом, то функция не создает нового объекта, а возвращает идентификатор уже существующего. При этом игнорируются параметры bManualReset, bInitialState и lpSecurityDescriptor. Проверить, был ли объект создан или используется уже существующий, можно следующим образом:

HEvent:= CreateEvent(NIL, TRUE, FALSE, ‘EventName’); if hEvent = 0 then RaiseLastWin32Error; if GetLastError = ERROR_ALREADY_EXISTS then begin // Используем ранее созданный объект end;

Если объект используется для синхронизации внутри одного процесса, его можно объявить как глобальную переменную и создавать без имени.

Имя объекта не должно совпадать с именем любого из существующих объектов типов Semaphore, Mutex, Job, Waitable Timer или FileMapping. В случае совпадения имен функция возвращает ошибку.

Если известно, что Event уже создан, для получения доступа к нему можно вместо CreateEvent воспользоваться функцией OpenEvent:

Function OpenEvent(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор объекта либо 0 - в случае ошибки. Параметр dwDesiredAccess может принимать одно из следующих значений:

После получения идентификатора можно приступать к его использованию. Для этого имеются следующие функции:

Function SetEvent(hEvent: THandle): BOOL; stdcall;

Устанавливает объект в сигнальное состояние

Function ResetEvent(hEvent: THandle): BOOL; stdcall;

Сбрасывает объект, устанавливая его в несигнальное состояние

Function PulseEvent(hEvent: THandle): BOOL; stdcall

Устанавливает объект в сигнальное состояние, дает отработать всем функциям ожидания, ожидающим этот объект, а затем снова сбрасывает его.

В Windows API события используются для выполнения операций асинхронного ввода-вывода. Следующий пример показывает, как приложение инициирует запись одновременно в два файла, а затем ожидает завершения записи перед продолжением работы; такой подход может обеспечить более высокую производительность при высокой интенсивности ввода-вывода, чем последовательная запись:

Var Events: array of THandle; // Массив объектов синхронизации Overlapped: array of TOverlapped; ... // Создаем объекты синхронизации Events := CreateEvent(NIL, TRUE, FALSE, NIL); Events := CreateEvent(NIL, TRUE, FALSE, NIL); // Инициализируем структуры TOverlapped FillChar(Overlapped, SizeOf(Overlapped), 0); Overlapped.hEvent:= Events; Overlapped.hEvent:= Events; // Начинаем асинхронную запись в файлы WriteFile(hFirstFile, FirstBuffer, SizeOf(FirstBuffer), FirstFileWritten, @Overlapped); WriteFile(hSecondFile, SecondBuffer, SizeOf(SecondBuffer), SecondFileWritten, @Overlapped); // Ожидаем завершения записи в оба файла WaitForMultipleObjects(2, @Events, TRUE, INFINITE); // Уничтожаем объекты синхронизации CloseHandle(Events); CloseHandle(Events)

По завершении работы с объектом он должен быть уничтожен функцией CloseHandle.

Delphi предоставляет класс TEvent, инкапсулирующий функциональность объекта Event. Класс расположен в модуле SyncObjs.pas и объявлен следующим образом:

Type TWaitResult = (wrSignaled, wrTimeout, wrAbandoned, wrError); TEvent = class(THandleObject) public constructor Create(EventAttributes: PSecurityAttributes; ManualReset, InitialState: Boolean; const Name: string); function WaitFor(Timeout: DWORD): TWaitResult; procedure SetEvent; procedure ResetEvent; end;

Назначение методов очевидно следует из их названий. Использование этого класса позволяет не вдаваться в тонкости реализации вызываемых функций Windows API. Для простейших случаев объявлен еще один класс с упрощенным конструктором:

Type TSimpleEvent = class(TEvent) public constructor Create; end; … constructor TSimpleEvent.Create; begin FHandle:= CreateEvent(nil, True, False, nil); end;

Mutex (Mutually Exclusive)

Мьютекс - это объект синхронизации, который находится в сигнальном состоянии только тогда, когда не принадлежит ни одному из процессов. Как только хотя бы один процесс запрашивает владение мьютексом, он переходит в несигнальное состояние и остается таким до тех пор, пока не будет освобожден владельцем. Такое поведение позволяет использовать мьютексы для синхронизации совместного доступа нескольких процессов к разделяемому ресурсу. Для создания мьютекса используется функция:

Function CreateMutex(lpMutexAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes bInitialOwner: BOOL; // Задает, будет ли процесс владеть // мьютексом сразу после создания lpName: PChar // Имя мьютекса): THandle; stdcall;

Функция возвращает идентификатор созданного объекта либо 0. Если мьютекс с заданным именем уже был создан, возвращается его идентификатор. В этом случае функция GetLastError вернет код ошибки ERROR_ALREDY_EXISTS. Имя не должно совпадать с именем уже существующего объекта типов Semaphore, Event, Job, Waitable Timer или FileMapping.

Если неизвестно, существует ли уже мьютекс с таким именем, программа не должна запрашивать владение объектом при создании (то есть должна передать в качестве bInitialOwner значение FALSE).

Если мьютекс уже существует, приложение может получить его идентификатор функцией OpenMutex:

Function OpenMutex(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор открытого мьютекса либо 0 - в случае ошибки. Мьютекс переходит в сигнальное состояние после срабатывания функции ожидания, в которую был передан его идентификатор. Для возврата в несигнальное состояние служит функция ReleaseMutex:

Function ReleaseMutex(hMutex: THandle): BOOL; stdcall;

Если несколько процессов обмениваются данными, например через файл, отображенный на память, то каждый из них должен содержать следующий код для обеспечения корректного доступа к общему ресурсу:

Var Mutex: THandle; // При инициализации программы Mutex:= CreateMutex(NIL, FALSE, ‘UniqueMutexName’); if Mutex = 0 then RaiseLastWin32Error; ... // Доступ к ресурсу WaitForSingleObject(Mutex, INFINITE); try // Доступ к ресурсу, захват мьютекса гарантирует, // что остальные процессы, пытающиеся получить доступ, // будут остановлены на функции WaitForSingleObject ... finally // Работа с ресурсом окончена, освобождаем его // для остальных процессов ReleaseMutex(Mutex); end; ... // При завершении программы CloseHandle(Mutex);

Подобный код удобно инкапсулировать в класс, который создает защищенный ресурс. Мьютекс имеет свойства и методы для оперирования ресурсом, защищая их при помощи функций синхронизации.

Разумеется, если работа с ресурсом может потребовать значительного времени, то необходимо либо использовать функцию MsgWaitForSingleObject, либо вызывать WaitForSingleObject в цикле с нулевым периодом ожидания, проверяя код возврата. В противном случае ваше приложение окажется замороженным. Всегда защищайте захват-освобождение объекта синхронизации при помощи блока try ... finally, иначе ошибка во время работы с ресурсом приведет к блокированию работы всех процессов, ожидающих его освобождения.

Semaphore (семафор)

Семафор представляет собой счетчик, содержащий целое число в диапазоне от 0 до максимальной величины, заданной при его создании. Счетчик уменьшается каждый раз, когда поток успешно завершает функцию ожидания, использующую семафор, и увеличивается путем вызова функции ReleaseSemaphore. При достижении семафором значения 0 он переходит в несигнальное состояние, при любых других значениях счетчика его состояние - сигнальное. Такое поведение позволяет использовать семафор в качестве ограничителя доступа к ресурсу, поддерживающему заранее заданное количество подключений.

Для создания семафора служит функция CreateSemaphore:

Function CreateSemaphore(lpSemaphoreAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes lInitialCount, // Начальное значение счетчика lMaximumCount: Longint; // Максимальное значение счетчика lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор созданного семафора либо 0, если создать объект не удалось.

Параметр lMaximumCount задает максимальное значение счетчика семафора, lInitialCount задает начальное значение счетчика и должен быть в диапазоне от 0 до lMaximumCount. lpName задает имя семафора. Если в системе уже есть семафор с таким именем, то новый не создается, а возвращается идентификатор существующего семафора. В случае если семафор используется внутри одного процесса, можно создать его без имени, передав в качестве lpName значение NIL. Имя семафора не должно совпадать с именем уже существующего объекта типов event, mutex, waitable timer, job или file-mapping.

Идентификатор ранее созданного семафора может быть также получен функцией OpenSemaphore:

Function OpenSemaphore(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Параметр dwDesiredAccess может принимать одно из следующих значений:

Для увеличения счетчика семафора используется функция ReleaseSemaphore:

Function ReleaseSemaphore(hSemaphore: THandle; // Идентификатор семафора lReleaseCount: Longint; // Счетчик будет увеличен на эту величину lpPreviousCount: Pointer // Адрес 32-битной переменной, // принимающей предыдущее значение // счетчика): BOOL; stdcall;

Если значение счетчика после выполнения функции превысит заданный для него функцией CreateSemaphore максимум, то ReleaseSemaphore возвращает FALSE и значение семафора не изменяется. В качестве параметра lpPreviousCount можно передать NIL, если это значение нам не нужно.

Рассмотрим пример приложения, запускающего на выполнение несколько заданий в отдельных потоках (например, программа для фоновой загрузки файлов из Internet). Если количество одновременно выполняющихся заданий будет слишком велико, то это приведет к неоправданной загрузке канала. Поэтому реализуем потоки, в которых будет выполняться задание, таким образом, чтобы когда их количество превышает заранее заданную величину, то поток бы останавливался и ожидал завершения работы ранее запущенных заданий:

Unit LimitedThread; interface uses Classes; type TLimitedThread = class(TThread) procedure Execute; override; end; implementation uses Windows; const MAX_THREAD_COUNT = 10; var Semaphore: THandle; procedure TLimitedThread.Execute; begin // Уменьшаем счетчик семафора. Если к этому моменту уже запущено // MAX_THREAD_COUNT потоков - счетчик равен 0 и семафор в // несигнальном состоянии. Поток будет заморожен до завершения // одного из запущенных ранее. WaitForSingleObject(Semaphore, INFINITE); // Здесь располагается код, отвечающий за функциональность потока, // например загрузка файла... // Поток завершил работу, увеличиваем счетчик семафора и позволяем // начать обработку другим потокам. ReleaseSemaphore(Semaphore, 1, NIL); end; initialization // Создаем семафор при старте программы Semaphore:= CreateSemaphore(NIL, MAX_THREAD_COUNT, MAX_THREAD_COUNT, NIL); finalization // Уничтожаем семафор по завершении программы CloseHandle(Semaphore); end;

Привет! Сегодня продолжим рассматривать особенности многопоточного программирования и поговорим о синхронизации потоков.

Что же такое «синхронизация»? Вне области программирования под этим подразумевается некая настройка, позволяющая двум устройствам или программам работать совместно. Например, смартфон и компьютер можно синхронизировать с Google-аккаунтом, личный кабинет на сайте - с аккаунтами в социальных сетях, чтобы логиниться с их помощью. У синхронизации потоков похожий смысл: это настройка взаимодействия потоков между собой. В предыдущих лекциях наши потоки жили и работали обособленно друг от друга. Один что-то считал, второй спал, третий выводил что-то на консоль, но друг с другом они не взаимодействовали. В реальных программах такие ситуации редки. Несколько потоков могут активно работать, например, с одним и тем же набором данных и что-то в нем менять. Это создает проблемы. Представь, что несколько потоков записывают текст в одно и то же место - например, в текстовый файл или консоль. Этот файл или консоль в данном случае становится общим ресурсом. Потоки не знают о существовании друг друга, поэтому просто записывают все, что успеют за то время, которое планировщик потоков им выделит. В недавней лекции курса у нас был пример, к чему это приведет, давай его вспомним: Причина кроется в том, что потоки работали с общим ресурсом, консолью, не согласовывая действия друг с другом. Если планировщик потоков выделил время Потоку-1, тот моментально пишет все в консоль. Что там уже успели или не успели написать другие потоки - неважно. Результат, как видишь, плачевный. Поэтому в многопоточном программировании ввели специальное понятие мьютекс (от англ. «mutex», «mutual exclusion» - «взаимное исключение») . Задача мьютекса - обеспечить такой механизм, чтобы доступ к объекту в определенное время был только у одного потока. Если Поток-1 захватил мьютекс объекта А, остальные потоки не получат к нему доступ, чтобы что-то в нем менять. До тех пор, пока мьютекс объекта А не освободится, остальные потоки будут вынуждены ждать. Пример из жизни: представь, что ты и еще 10 незнакомых людей участвуете в тренинге. Вам нужно поочередно высказывать идеи и что-то обсуждать. Но, поскольку друг друга вы видите впервые, чтобы постоянно не перебивать друг друга и не скатываться в гвалт, вы используете правило c «говорящим мячиком»: говорить может только один человек - тот, у кого в руках мячик. Так дискуссия получается адекватной и плодотворной. Так вот, мьютекс, по сути, и есть такой мячик. Если мьютекс объекта находится в руках одного потока, другие потоки не смогут получить доступ к работе с этим объектом. Не нужно ничего делать, чтобы создать мьютекс: он уже встроен в класс Object , а значит, есть у каждого объекта в Java.

Как работает оператор synchronized

Давай познакомимся с новым ключевым словом - synchronized . Им помечается определенный кусок нашего кода. Если блок кода помечен ключевым словом synchronized , это значит, что блок может выполняться только одним потоком одновременно. Синхронизацию можно реализовать по-разному. Например, создать целый синхронизированный метод: public synchronized void doSomething () { //...логика метода } Или же написать блок кода, где синхронизация осуществляется по какому-то объекту: public class Main { private Object obj = new Object () ; public void doSomething () { synchronized (obj) { } } } Смысл прост. Если один поток зашел внутрь блока кода, который помечен словом synchronized , он моментально захватывает мьютекс объекта, и все другие потоки, которые попытаются зайти в этот же блок или метод вынуждены ждать, пока предыдущий поток не завершит свою работу и не освободит монитор. Кстати! В лекциях курса ты уже видел примеры synchronized , но они выглядели иначе: public void swap () { synchronized (this ) { //...логика метода } } Тема для тебя новая, и путаница с синтаксисом, само собой, первое время будет. Поэтому запомни сразу, чтобы не путаться потом в способах написания. Эти два способа записи означают одно и то же: public void swap () { synchronized (this ) { //...логика метода } } public synchronized void swap () { } } В первом случае создаешь синхронизированный блок кода сразу же при входе в метод. Он синхронизируется по объекту this , то есть по текущему объекту. А во втором примере вешаешь слово synchronized на весь метод. Тут уже нет нужды явно указывать какой-то объект, по которому осуществляется синхронизация. Раз словом помечен целый метод, этот метод автоматически будет синхронизированным для всех объектов класса. Не будем углубляться в рассуждения, какой способ лучше. Пока выбирай то, что больше нравится:) Главное - помни: объявить метод синхронизированным можно только тогда, когда вся логика внутри него выполняется одним потоком одновременно. Например, в этом случае сделать метод doSomething() синхронизированным будет ошибкой: public class Main { private Object obj = new Object () ; public void doSomething () { //...какая-то логика, доступная для всех потоков synchronized (obj) { //логика, которая одновременно доступна только для одного потока } } } Как видишь, кусочек метода содержит логику, для которой синхронизация не обязательна. Код в нем могут выполнять несколько потоков одновременно, а все критически важные места выделены в отдельный блок synchronized . И еще один момент. Давай рассмотрим «под микроскопом» наш пример из лекции с обменом именами: public void swap () { synchronized (this ) { //...логика метода } } Обрати внимание: синхронизация проводится по this . То есть по конкретному объекту MyClass . Представь, что у нас есть 2 потока (Thread-1 и Thread-2) и всего один объект MyClass myClass . В этом случае, если Thread-1 вызовет метод myClass.swap() , мьютекс объекта будет занят, и Thread-2 при попытке вызвать myClass.swap() повиснет в ожидании, когда мьютекс освободится. Если же у нас будет 2 потока и 2 объекта MyClass - myClass1 и myClass2 - на разных объектах наши потоки спокойно смогут одновременно выполнять синхронизированные методы. Первый поток выполняет: myClass1. swap () ; Второй выполняет: myClass2. swap () ; В этом случае ключевое слово synchronized внутри метода swap() не повлияет на работу программы, поскольку синхронизация осуществляется по конкретному объекту. А в последнем случае объектов у нас 2. Поэтому потоки не создают друг другу проблем. Ведь у двух объектов есть 2 разных мьютекса, и их захват не зависит друг от друга .

Особенности синхронизации в статических методах

А что делать, если нужно синхронизировать статический метод ? class MyClass { private static String name1 = "Оля" ; private static String name2 = "Лена" ; public static synchronized void swap () { String s = name1; name1 = name2; name2 = s; } } Непонятно, что будет выполнять роль мьютекса в этом случае. Ведь мы уже определились, что у каждого объекта есть мьютекс. Но проблема в том, что для вызова статического метода MyClass.swap() нам не нужны объекты: метод-то статический! И что дальше? :/ На самом деле, проблемы в этом нет. Создатели Java обо всем позаботились:) Если метод, в котором содержится критически важная «многопоточная» логика, статический, синхронизация будет осуществляться по классу. Для большей ясности, приведенный выше код можно переписать так: class MyClass { private static String name1 = "Оля" ; private static String name2 = "Лена" ; public static void swap () { synchronized (MyClass. class ) { String s = name1; name1 = name2; name2 = s; } } } В принципе, ты мог до этого додуматься самостоятельно: раз объектов нет, значит механизм синхронизации должен быть как-то «зашит» в сами классы. Так оно и есть: по классам тоже можно синхронизироваться.

Последнее обновление: 31.10.2015

Нередко в потоках используются некоторые разделяемые ресурсы, общие для всей программы. Это могут быть общие переменные, файлы, другие ресурсы. Например:

Class Program { static int x=0; static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } }

Здесь у нас запускаются пять потоков, которые работают с общей переменной x. И мы предполагаем, что метод выведет все значения x от 1 до 8. И так для каждого потока. Однако в реальности в процессе работы будет происходить переключение между потоками, и значение переменной x становится непредсказуемым.

Решение проблемы состоит в том, чтобы синхронизировать потоки и ограничить доступ к разделяемым ресурсам на время их использования каким-нибудь потоком. Для этого используется ключевое слово lock . Оператор lock определяет блок кода, внутри которого весь код блокируется и становится недоступным для других потоков до завершения работы текущего потока. И мы можем переделать предыдущий пример следующим образом:

Class Program { static int x=0; static object locker = new object(); static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { lock (locker) { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } } }

Для блокировки с ключевым словом lock используется объект-заглушка, в данном случае это переменная locker . Когда выполнение доходит до оператора lock, объект locker блокируется, и на время его блокировки монопольный доступ к блоку кода имеет только один поток. После окончания работы блока кода, объект locker освобождается и становится доступным для других потоков.

. Синхронизация потоков

Для синхронизации действий, выполняемых разными потоками существует несколько различных способов. Условно их можно разделить на следующие:

· синхронизация с помощью глобальных переменных

· синхронизация с помощью обмена событиями

· синхронизация с помощью специальных объектов (событий, семафоров, критических секций, объектов исключительного владения и других)

Первый метод синхронизации следует признать самым неудачным, так как он требует постоянного опроса состояния глобальной переменной. У этого метода есть и более серьезные недостатки - так, например, возможна полная блокировка потоков, если поток, ожидающий изменения глобальной переменной имеет более высокий приоритет, чем поток, изменяющий эту переменную. Правда, его можно несколько улучшить - вводя дополнительные временные задержки между последовательными опросами.

Второй метод потребует создания объектов, способных получать сообщения или извещения о выполнении некоторых действий. Это могут быть окна, файлы (например, при использовании асинхронных операций или операций с уведомлением), каталоги и т.д. Во многих случаях может быть удобно создать невидимое окно, участвующее в обмене сообщениями или ожидающее получения сообщений для выполнения тех или иных действий.

Третий метод - синхронизация с помощью специальных объектов - потребует рассмотрения разных объектов и разных методов синхронизации с использованием объектов. В Win32 API существует большое число объектов, которые могут быть применены в качестве синхронизирующих, причем во многих случаях вместо одного объекта может применяться объект другого типа. Поэтому будет интересно рассмотреть несколько основных методов синхронизации с использованием объектов.

Общие представления о методах синхронизации

При рассмотрении способов синхронизации удобно выделить несколько основных способов, применяемых разными операционными системами. В числе таких способов можно выделить четыре метода:

· критические секции

· объекты исключительного владения

· события

· синхронизация группой объектов

Последовательно рассмотрим эти основные методы, используя в качестве примера функции по работе с глобальной кучей. Попробуем абстрактно разобраться со свойствами этих функций, что бы понять, какие методы синхронизации и в каких случаях удобно применять.

Во–первых, глобальную кучу в целом можно рассматривать как некий сложный объект, над которым могут выполняться некоторые операции (пока мы рассматриваем только операции над кучей в целом, не вдаваясь в нюансы работы с отдельными блоками - при необходимости доступ к отдельным блокам кучи должен быть синхронизирован, но это уже не функции кучи, а проблема того, кто пользуется кучей).

Часть операций, например получение указателя на какой–либо блок в куче, не нуждается в изменении самой кучи. То есть функции, осуществляющие подобные операции могут выполняться одновременно разными потоками не конфликтуя друг с другом. Такие функции удобно назвать “читателями” - они не изменяют кучу как единый объект.

Другие операции, например выделение нового блока в куче, требуют внесения изменений в кучу - изменения в цепочке выделяемых блоков. Обычно такие изменения осуществляются не в одном месте, а требуется согласованное внесение изменений в нескольких структурах данных. В процессе такой операции структура кучи какое–то время оказывается нарушенной, вследствие чего одновременное выполнение таких действий должно быть исключено. Такие функции удобно назвать “писателями” - они изменяют кучу как единый объект.

Применительно к куче возможен одновременный доступ нескольких читателей и исключительный - писателей. Более того, если к куче получает доступ писатель, то читатели также должны быть лишены доступа (в других случаях это может быть не так - читатели могут работать одновременно с писателем).

Можно сформулировать несколько правил для работы с таким объектом:

· если к объекту имеет доступ писатель, то ни читатели, ни другие писатели доступа не имеют;

· если к объекту имеет доступ читатель, то возможен одновременный доступ других читателей и запрещен доступ писателям;

· если объект свободен, то первый пришедший писатель или читатель имеет право доступа.

Рассмотренный пример очень удобен, так как подобная ситуация (много читателей, единственный писатель) встречается сплошь и рядом - а стандартного объекта для синхронизации доступа к такому объекту нет. Благодаря этому такой пример становится благодатной почвой для рассмотрения разных способов синхронизации.


Даже при простейшем анализе такой таблицы можно заметить, что писатели находятся в неравном положении - при достаточно большом числе читателей и писателей возможна полная блокировка последних, так как читатели могут получать право одновременного доступа к объекту, так что возможна ситуация, когда объект вообще не будет освобожден для писателей - хоть один читатель да найдется. Для выхода из такой ситуации следует слегка модифицировать реакцию объекта во 1м случае - если имеется ожидающий поток–писатель, то надо разрешить доступ ему, а не читателю. Для полной “симметрии” следует в 4м случае проверять наличие читателей и разрешать им доступ, даже если имеется ожидающий поток–писатель. Такой механизм, хотя и достаточно сложный в реализации, обеспечит более–менее равные права для выполнения потоков разного типа.

Критические секции

Этот синхронизирующий объект может использоваться только локально внутри процесса, создавшего его. Остальные объекты могут быть использованы для синхронизации потоков разных процессов. Название объекта “критическая секция” связано с некоторым абстрактным выделением части программного кода (секции), выполняющего некоторые операции, порядок которых не может быть нарушен. То есть попытка двумя разными потоками одновременно выполнять код этой секции приведет к ошибке.

Например, такой секцией может быть удобно защитить функции–писатели, так как одновременный доступ нескольких писателей должен быть исключен.

Для критической секции вводят две операции:

· войти в секцию;
Пока какой–либо поток находится в критической секции, все остальные потоки при попытке войти в нее будут автоматически останавливаться в ожидании. Поток, уже вошедший в эту секцию, может входить в нее многократно, не ожидая ее освобождения.

· покинуть секцию;
При покидании потоком секции уменьшается счетчик числа вхождений этого потока в секцию, так что секция будет освобождена для других потоков только если поток выйдет из секции столько раз, сколько раз в нее входил. При освобождении критической секции будет пробужден только один поток, ожидающий разрешения на вход в эту секцию.

Вообще говоря, в других API, отличных от Win32 (например, OS/2), критическая секция рассматривается не как синхронизирующий объект, а как фрагмент кода программы, который может исполняться только одним потоком приложения. То есть вход в критическую секцию рассматривается как временное выключение механизма переключения потоков до выхода из этой секции. В Win32 API критические секции рассматриваются как объекты, что приводит к определенной путанице - они очень близки по своим свойствам к неименованным объектам исключительного владения (mutex, см. ниже).

При использовании критических секций надо следить, что бы в секции не выделялись чересчур большие фрагменты кода, так как это может привести к существенным задержкам в выполнении других потоков.

Например, применительно к уже рассмотренным кучам - не имеет смысла все функции по работе с кучей защищать критической секцией, так как функции–читатели могут выполняться параллельно. Более того, применение критической секции даже для синхронизации писателей на самом деле представляется малоудобным - так как для синхронизации писателя с читателями последним все–равно придется входить в эту секцию, что практически приводит к защите всех функций единой секцией.

Можно выделить несколько случаев эффективного применения критических секций:

· читатели не конфликтуют с писателями (защищать надо только писателей);

· все потоки имеют примерно равные права доступа (скажем, нельзя выделить чистых писателей и читателей);

· при построении составных синхронизирующих объектов, состоящих из нескольких стандартных, для защиты последовательных операций над составным объектом.

Объекты исключительного владения

Объекты исключительного владения - mutex (mut ualex clusion) - могут принадлежать только одному потоку одновременно. Соответственно определяются операции над этими объектами:

· затребовать владение объектом;
При запросе владения система проверяет, владеет–ли какой–либо другой поток этим объектом или нет. Если имеется другой поток–владелец, то данный поток останавливается до тех пор, пока объект не освободиться. Как только объект становится свободным, система отдает его во владение новому потоку. Поток, уже владеющий объектом, может многократно вступать во владение им.

· освободить объект;
При освобождении объекта система просматривает, имеются–ли другие потоки, ожидающие освобождения этого объекта. Если имеются, то возобновит работу только один поток, а все остальные продолжат ожидание - объект mutex может быть во владении только у одного потока. Освобождать объект может только тот поток, который им в данный момент владеет, другие потоки этого сделать не могут. Для полного освобождения объекта поток должен столько раз освободить его, сколько раз он затребовал владение с того момента, как ему дали этот объект во владение.