Synchronizacja wątków#
Poprawność programów współbieżnych#
Właściwości związane z poprawnością programu współbieżnego:
Właściwość żywotności - program współbieżny jest żywotny, jeśli zapewnia, że każde pożądane zdarzenie w końcu zajdzie.
Żywotność może być naruszona poprzez:
deadlock
zagłodzenie wątków
livelock
Właściwość bezpieczeństwa - program współbieżny jest bezpieczny, jeśli nigdy nie doprowadza do niepożądanego stanu.
… - problem wzajemnego wykluczania - dwa współbieżne procesy nie mogą znaleźć się w tym samym czasie w tej samej sekcji krytycznej … - problem producentów i konsumentów - każda porcja danych zostanie skonsumowana w kolejności ich produkowania.
Niepoprawnie zaprojektowane zależności między wątkami często naruszają integralność i żywotność danych.
Thread-safety#
Bezpieczeństwo wątków w programowaniu współbieżnym (thread-safety) jest pojęciem stosowanym w kontekście programów wielowątkowych.
Segment kodu jest “thread-safe” jeśli manipuluje współdzielonym stanem w taki sposób, że gwarantowane jest bezpieczne (prawidłowe) wykonanie go przez wiele wątków pracujących w tym samym czasie. Oznacza to, że kod “thread-safe” musi w odpowiedni sposób chronić współdzielony między wątkami stan programu.
Kod “unsafe” to kod, który:
nieprawidłowo chroni współdzielone dane oraz zasoby (shared state)
zależy od przechowywanego między wywołaniami stanu (np. funkcja
strtok()
,rand()
)wywołuje kod, który jest “unsafe”
Sposoby osiągnięcia bezpieczeństwa wielowątkowego:
Stosowanie jawnego blokowanie (Explicit Locking) - muteksów, do ochrony danych współdzielonych. Operacje synchronizacji mogą znacznie spowolnić działanie programu.
Niezmienność danych (Immutability) - stan obiektu nie może być zmieniony po jego utworzeniu. Obiekty mogą być bezpiecznie współdzielone, jeśli wątki mogą jedynie odczytywać ich stan.
Atomowość (Atomicity) - stosowanie zmiennych i flag atomowych
Współdzielenie danych#
Shared mutable state is an evil of all multithreading
Zmienne, które są niezmienne (immutable) mogą być bezpiecznie współdzielone bez blokad
Modyfikator
const
oznacza od C++11 thread-safe
Attention
Współbieżny dostęp do zmiennych, w czasie gdy przynajmniej jeden wątek modyfikuje stan współdzielonego obiektu (data race) może skutkować niezdefiniowanym zachowaniem (undefined behaviour).
Współdzielenie obiektów biblioteki standardowej#
Kontenery standardowe#
Dla kontenerów STL oraz adapterów kontenerów:
Dozwolony jest współbieżny odczyt (operacje read-only) np:
wywołanie metod
begin()
,end()
,rbegin()
,rend()
,front()
,back()
,lower_bound()
upper_bound()
,equal_range()
,at()
operator []
- z wyjątkiem kontenerów asocjacyjnychdostęp za pomocą iteratorów, jeśli nie modyfikujemy stanu wskazywanych elementów
Dozwolony jest współbieżny dostęp do różnych elementów kontenera
za wyjątkiem obiektów
bitset
orazvector<bool>
Strumienie#
Dla formatowanych operacji wejścia oraz wyjścia na obiektach strumieni standardowych (cin
, cout
, cerr
, …)
Dozwolony jest dostęp współbieżny
Może wystąpić przemieszanie kolejności znaków (interleaved characters)
Synchronizowane strumienie - C++20#
TODO
Jawne wykluczanie (Explicit Locking)#
Sekcja krytyczna#
Fragment kodu programu, który w danej chwili może być wykonywany tylko przez jeden wątek. Sekcja krytyczna zapewnia właściwość wzajemnego wykluczenia.
Standardową realizacją wzajemnego wykluczenia jest wykorzystanie obiekt blokady (muteksu) zawierającego operacje:
lock()
unlock()
Wątek
pozyskuje blokadę (blokuje), kiedy wywołuję metodę
lock()
zwalnia blokadę (odblokowuje), gdy wywołuje metodę
unlock()
Wątek jest prawidłowo skonstruowany, jeżeli spełnia poniższe warunki:
Każda sekcja krytyczna jest skojarzona z niepowtarzalnym obiektem blokady
Wątek, próbując wejść do sekcji krytycznej, wywołuje metodę
lock()
tego obiektuWątek wychodząc z sekcji krytycznej, wywołuje metodę
unlock()
Podstawowe obiekty blokad#
Muteksy (Mutual Exclusion) - obiekty implementujące właściwość wzajemnego wykluczania
Semafory - binarne i zliczające
Blokady współdzielone (blokady readers-writers)
Zmienne warunkowe - warunkowe wzajemne wykluczenie#
Zmienne warunkowe używane są do powiadamiania wątków o zaistnieniu określonego warunku
Są powiązane z muteksem, który jest ponownie pozyskiwany w momencie wybudzenia z blokady
Warunek wybudzenia z blokady jest zdefiniowany przy pomocy predykatu, który jest parametrem blokady
Muteksy i klasy menadżerów blokad#
Muteksy zaimplementowane w bibliotece standardowej C++ korzystają z następujących konceptów obiektów blokad (lockable objects):
BasicLockable
Lockable
TimedLockable
SharedLockable
Koncepty BasicLockable, Lockable i klasa std::mutex
#
Koncept BasicLockable
modeluje właściwość wzajemnego wykluczenia. Typ L
jest zgodny z wymaganiami
konceptu BasicLockable
gdy w interfejsie posiada metody:
void lock()
- bieżący wątek jest wstrzymany, aż do momentu pozyskania blokadyvoid unlock()
- jeżeli bieżący wątek jest posiadaczem blokady, to następuje jej zwolnienie
Koncept Lockable
rozszerza koncept BasicLockable
o wymóg posiadania metody:
bool try_lock()
- próba pozyskania blokady bez wstrzymywania bieżącego wątku. Zwracatrue
jeśli blokada została pozyskana, w przeciwnym wypadku zwracafalse
.
std::mutex
#
Klasa std::mutex
implementuje koncept Lockable zapewniając podstawowy mechanizm synchronizacji, który może być użyty do implementacji
bezpiecznego dostępu do współdzielonego zasobu.
Wywołujący wątek posiada muteks od momentu udanego wywołania metody
lock()
lubtry_lock()
do wywołanie metodyunlock()
.Jeśli wątek posiada muteks, wszystkie inne wątki wywołujące
lock()
będą blokowane lub zwrócąfalse
po wywołaniutry_lock()
Wywołujący wątek nie może posiadać muteksu przed wywołaniem
lock()
lubtry_lock()
-std::mutex
implementuje nie-rekursywną wersję muteksu.
std::recursive_mutex
#
Klasa std::recursive_mutex
implementuje rekursywną wersję konceptu Lockable
. Ten sam wątek może wielokrotnie pozyskać muteks poprzez wywołanie
metody lock()
lub try_lock()
. Aby zwolnić muteks wątek musi odpowiednią ilość razy wywołać unlock()
.
Maksymalny poziom rekursji dla obiektu std::recursive_mutex
nie jest zdefiniowany, ale po przekroczeniu tej wartości:
z metody
lock()
rzucony zostanie wyjątekstd::system_error
metoda
try_lock()
zwrócifalse
TimedLockable i klasa std::timed_mutex
#
Koncept TimedLockable wzbogaca koncept Lockable
o metody umożliwiające zdefiniowanie maksymalnego czasu oczekiwania na pozyskanie blokady przez wątek
bool try_lock_until(abs_time)
bool try_lock_for(rel_time)
Implementacje w bibliotece standardowej - std::timed_mutex
oraz std::recursive_timed_mutex
.
Blokady współdzielone (C++14)#
Blokady współdzielone wprowadzone są w standardzie C++14.
Muteksy implementujące koncepty Lockable
oraz TimedLockable
szeregują odwołania do zasobów, lecz nie rozróżniają dostępów modyfikujących od niemodyfikujących.
Odczyt danych współdzielonych:
nie może zakłócić innego odczytu współbieżnego
pozyskiwane są blokady współdzielone
Przed podjęciem próby zapisu danych współdzielonych należy pozyskać blokadę na wyłączność.
Menadżery blokad#
Zarządzanie blokadami (obiektami muteksów) odbywa się za pomocą następujących klas:
lock_guard<Mutex>
unique_lock<Mutex>
shared_lock<Mutex>
(C++14)
Wszystkie klasy zarządzające blokadami implementują mechanizm RAII gwarantujący zwolnienie posiadanej przez wątek blokady w przypadku zgłoszenia sytuacji wyjątkowej.
W destruktorze menadżerów blokad wywoływana jest metoda unlock()
.
Klasa std::lock_guard<Mutex>
#
Menadżer blokad implementuje technikę RAII w celu zarządzania zasobem w postaci muteksu.
template<typename _Mutex>
class lock_guard
{
public:
typedef _Mutex mutex_type;
explicit lock_guard(mutex_type& __m) : _M_device(__m)
{ _M_device.lock(); }
lock_guard(mutex_type& __m, adopt_lock_t) : _M_device(__m)
{ } // calling thread owns mutex
~lock_guard()
{ _M_device.unlock(); }
lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;
private:
mutex_type& _M_device;
};
std::lock_guard
jest najprostszym managerem blokad:
konstruktor pozyskuje blokadę wywołując na rzecz muteksu przekazanego przez referencję jako argument metodę
lock()
destruktor zwalnia blokadę wywołując
unlock()
przeciążony konstruktor przyjmujący jako drugi parametr obiekt typu
std::adopt_lock_t
umożliwia adaptowanie, tj. przejęcie prawa własności i tym samym odpowiedzialności za zwolnienie pozyskanej już wcześniej blokady
Przykład:
unsigned int counter=0;
std::mutex mtx_counter;
unsigned int increment()
{
std::lock_guard<std::mutex> lk(mtx_counter);
return ++counter;
}
unsigned int query()
{
std::lock_guard<std::mutex> lk(mtx_counter);
return counter;
}
Klasa std::unique_lock<Mutex>
#
Rozbudowana wersja managera blokad umożliwiająca:
ochronę RAII przed wyciekami blokad
opóźnione pozyskiwanie blokad
adaptowanie pozyskanej przez wątek blokady
transfer prawa własności - instancja
unique_lock
nie jest kopiowalna, ale jest przenaszalna (moveable)podejmowanie nieblokujących prób pozyskania blokady
korzystanie z muteksów czasowych
Jeśli instancja managera std::unique_lock<>
posiada blokadę:
metoda
mutex()
zwraca wskaźnik do muteksu realizującego blokadęmetoda
owns_lock()
zwraca wartośćtrue
w momencie niszczenia obiektu destruktor wywoła metodę
unlock()
na obiekcie muteksu
Opóźnione pozyskiwanie blokady jest możliwe przy pomocy konstruktora std::shared_lock(Lockable& m, std::defer_lock_t)
.
Po jego wywołaniu:
metoda
owns_lock()
zwracafalse
metoda
mutex()
zwraca adres obiektu muteksu
Adaptowanie pozyskanej przez wątek blokady - konstruktor unique_lock(Lockable& m, std::adopt_lock_t)
Nieblokujące próby pozyskania blokady - unique_lock(Lockable & m,std::try_to_lock_t)
w konstruktorze wywoływana jest dla muteksu nieblokująca metoda
try_lock()
jeśli blokada zostanie pozyskana metoda
owns_lock()
zwracatrue
, w przeciwnym wypadku zwracafalse
Jeśli jako parametr szablonu unique_lock<Mutex>
zostanie przekazany typ muteksu implementujący koncept TimedLockable (np. std::timed_mutex
) możliwe jest blokujące pozyskiwanie blokad:
albo przez dany czas
albo do określonego punktu w czasie
using namespace std::literals;
std::timed_mutex mutex;
std::unique_lock<std::timed_mutex> lock(mutex, std::try_to_lock);
//...
if (!lock.owns_lock())
{
int count = 0;
do
{
std::cout << "Thread doesn't own a lock... Tries to acquire a mutex..."
<< std::endl;
} while(!lock.try_lock_for(1s));
}
Transfer własności blokad std::unique_lock<Mutex>
#
Managery blokad typu std::unique_lock<>
potrafią między sobą transferować własności blokady zgodnie z semantyką
przenoszenia.
std::unique_lock<std::mutex> acquire_lock()
{
static std::mutex m;
return std::unique_lock<std::mutex>(m);
}
Zakleszczenie#
Zakleszczenie (deadlock) sytuacja, w której co najmniej dwa różne wątki czekają na siebie nawzajem, więc żadny nie może się zakończyć.
Do zakleszczenia może dojść w sytuacji, gdy istnieją przynajmniej dwa zasoby i dwa wątki ubiegające się o dostęp do tych zasobów.
Do zakleszczenia dochodzi, jeśli spełnione są cztery warunki:
Wzajemne wykluczenie - w danym czasie tylko jeden wątek może korzystać z zasobu
Trzymanie zasobu i oczekiwanie - wątek utrzymuje jeden z zasobów, ale do ukończenia pracy potrzebne jest także zablokowanie innego zasobu
Cykliczne oczekiwanie - wątki w taki sposób żądają dostępu do zasobów, że powstaje cykliczny graf skierowany
Brak wywłaszczania zasobu - wątki dobrowolnie nie rezygnują z przydzielonych im zasobów, zwolnienie zasobów możliwe jest po zakończeniu zadania
Przykład:
Klasa ze stanem wewnętrznym, chroniona muteksem. Chcemy napisać operator porównania.
class X
{
mutable std::mutex mtx_;
int some_data_;
public:
bool operator<(const X& other) const
{
std::lock_guard<std::mutex> lk(mtx_);
std::lock_guard<std::mutex> lk(other.mtx_);
return some_data_ < other.some_data_;
}
};
Mamy dwa obiekty x1
i x2
i dwa wątki próbujące je porównać, ale w odwrotną stronę:
if (x1 < x2) { /**/ }
jest rozwinięte do:
x1.mtx_.lock(); // 1
/*******************************/
x1.mtx_.lock(); // 3 <- DEADLOCK
if (x2 < x1) { /**/ }
jest rozwinięte do:
x2.mtx_.lock(); // 2
/*******************************/
x1.mtx_.lock();
Dwa wątki pozyskają muteksy w odwrotnej kolejności, co może spowodować zakleszczenie.
Zapobieganie zakleszczeniom#
Metoda std::lock()
#
gwarantuje zablokowanie wszystkich muteksów bez zakleszczenia niezależnie od kolejności ich pozyskiwania
wymaga przekazania jako parametrów opóźnionych blokad typu
std::unique_lock
bool X::operator< (const X& other)
{
std::unique_lock<std::mutex> l1(mtx_, std::defer_lock);
std::unique_lock<std::mutex> l2(other.mtx_, std::defer_lock);
std::lock(l1, l2); // avoiding deadlock
return some_data < other.some_data;
}
Istnieje możliwość skonstruowania blokady typu std::unique_lock<>
bez blokowania muteksu za pomocą parametru std::defer_lock
.
Pozwala to uniknąć zakleszczeń dla blokad uzyskiwanych jednocześnie. Działa na każdym obiekcie implementującym koncept Lockable
.
Wciąż może istnieć zagrożenie zakleszczeniem, jeśli blokady są pozyskiwane oddzielnie. Aby zminimalizować ryzyko należy pozyskiwać blokady zawsze w tej samej kolejności.
Klasa std::scoped_lock
- C++20#
TODO
Synchronizacja za pomocą zdarzeń#
W programach wielowątkowych często zachodzi potrzeba zsynchronizowania pracy wielu wykonywanych współbieżnie zadań. Dzieje się tak w sytuacji, gdy jeden z pracujących wątków osiąga punkt, w którym nie może wykonać następnych operacji, dopóki inne wątki nie zakończą swojej częsci pracy, przygotowując dane potrzebne do zakończenia zadania. Fakt, że oczekiwane dane są dostępne jest określany jako zdarzenie (event).
Oczekiwanie na zdarzenie może być zaimplementowane jako:
idle wait - wątek oczekujący na zdarzenie przechodzi do stanu blocked, w którym nie zużywa cyklów CPU
busy wait - oczekiwanie na zdarzenie implementowane jest za pomocą pętli do-while
Busy waits - implementacje#
Implementacja z wykorzystaniem muteksu#
Implementacja komunikacji przy pomocy flagi data_ready
typu bool
oraz muteksu:
volatile bool data_ready;
std::mutex mtx;
void set_data_ready()
{
prepare_data();
std::unique_lock<std::mutex> lk(mtx);
data_ready = true;
}
void task()
{
bool ready_flag;
do
{
{
std::lock_guard<std::mutex> lk{mtx};
ready_flag = data_ready;
}
} while(!ready_flag);
process_data();
}
Implementacja z wykorzystaniem zmiennej atomowej#
Implementacja wykorzystująca atomową zmienną typu std::atomic<bool>
nie wymaga do synchronizacji muteksu.
std::atomic<bool> data_ready;
void set_data_ready()
{
prepare_data();
data_ready = true;
}
void task()
{
while(!data_ready.load())
{
std::this_thread::yield();
}
process_data();
}
Operacje na zmiennej atomowej dają gwarancję odpowiedniego uporządkowania kodu zgodnie z modelem pamięci C++11.
Przygotowanie danych - operacja prepare_data()
, w której ustawiany jest stan zmiennych współdzielonych, odbywa się przed (sequenced before) operacją zapisu do atomowej flagi data_ready
.
Kiedy w innym wątku wartość odczytana z zmiennej atomowej data_ready
jest true
, operacja zapisu do zmiennej atomowej
synchronizuje się (synchronizes with) z tym odczytem tworząc relację happens before. Ponieważ relacja happens before jest przechodnia, przygotowanie danych odbywa się przed zmianą stanu flagi, ta odbywa się przed odczytem wartości true
, który z kolei odbywa się przed przetworzeniem danych.
Idle waits#
Zmienne warunkowe#
Aby uniknąć aktywnego odpytywania się o spełnienie określonego warunku, efektywniej jest zablokować oczekujący wątek do momentu zajścia zdarzenia (spełnienia oczekiwanego warunku). Mechanizm taki jest wykorzystany w implementacji zmiennych warunkowych (condition variables).
Biblioteka standardowa C++11 dostarcza dwie implementacje zmiennych warunkowych:
std::condition_variable
std::condition_variable_any
Obie klasy współpracują z muteksem, aby zapewnić prawidłową synchronizację wątków:
conditional_variable
współpracuje tylko z typemstd::mutex
conditional_variable_any
współpracuje z dowolnym typem muteksu
class CV
{
bool data_ready_ = false;
std::mutex mtx_data_ready_;
std::condition_variable cv_is_data_ready_;
public:
void process_data()
{
std::unique_lock<std::mutex> lk(mtx_data_ready_);
while(!data_ready_)
{
cv_is_data_ready_.wait(lk);
}
// implementation of data processing
// ...
}
void load_data()
{
// implementation of data loading
// ...
{
std::lock_guard<std::mutex> lk(mtx_data_ready_);
data_ready_ = true;
} // release lock on mutex
cv_is_data_ready_.notify_one();
}
};
Można uprościć kod oczekujący na spełnienie warunku korzystając z przeciążonej wersji metody wait(lock_type& lock, predicate_type pred)
przyjmującej jako argument predykat.
Predykat może być zdefiniowany jako:
funkcja
bool pred()
obiekt funkcyjny
obiekt funkcyjny zdefiniowany w momencie wywołania przy pomocy funkcji lambda
void process_data()
{
std::unique_lock<std::mutex> lk(mtx_data_ready_);
cv_is_data_ready_.wait(lk, [this] { return data_ready_; });
process_data();
}
Powiadomienie o zdarzeniu może zostać zrealizowane przy pomocy dwóch metod:
void notify_one()
– odblokowuje jeden z wątków znajdujących się w stanie oczekiwania po uprzednim wywołaniu na obiekcie zmiennej warunkowej metody wait()void notify_all()
– odblokowuje wszystkie wątki znajdujące się w stanie oczekiwania
Zmienne atomowe - C++20#
TODO
Lokalna pamięć wątku#
Thread Local Storage - dane, które należą do konkretnego wątku
Każdy wątek może posiadać własny zestaw zmiennych TLS
Odpowiednik statycznych składowych klasy dla wątków
Zastosowanie:
errno
lokalny generator liczb losowych
strtok
Modyfikator thread_local
#
C++11 zapewnia dostęp do danych TLS za pomocą modyfikatora
thread_local
namespace Unsafe
{
double rand()
{
static int seed = 665; // shared state
seed = (seed * IMUL + IADD) & MASK;
return (seed * SCALE);
}
}
namespace ThreadSafe
{
double rand_double()
{
auto thread_id = std::this_thread::get_id();
std::hash<std::thread::id> hasher;
thread_local int thd_seed = 665 * hasher(thread_id);
int result = (thd_seed * IMUL + IADD) & MASK;
thd_seed = result;
return (result * SCALE);
}
}
int main()
{
std::thread thd1([] {
for (size_t i = 0; i < 100; ++i)
std::cout << ThreadSafe::rand_double() << std::endl;
});
std::thread thd2([] {
for (size_t i = 0; i < 100; ++i)
std::cout << ThreadSafe::rand_double() << std::endl;
});
thd1.join();
thd2.join();
}