В качестве объектов для синхронизации потоков используются классы:
std::mutex - базовая реализация мьютекса. Обеспечивает взаимное исключение: только один поток может владеть мьютексом в каждый момент времениstd::lock_guard - обёртка для мьютекса, реализующая принцип RAII. При создании объекта мьютекс захватывается, при уничтожении (выходе из области видимости) - освобождается автоматическиstd::mutex m;
int shared_counter = 0;
void increment() {
std::lock_guard<std::mutex> lock(m); // lock() вызывается здесь
++shared_counter;
} // unlock() вызывается автоматически при разрушении lock
Семафоры и другие низкоуровневые примитивы, как правило, не используются в обычном прикладном коде - они появляются преимущественно в системном программировании или при реализации собственных примитивов синхронизации
Также существуют модификация мьютекса – разделенный мьютекс. Разделенный мьютекс (или RW-mutex) std::shared_mutex работает аналогично обычному мьютексу, но допускает несколько одновременных блокировок для чтения и только одну для записи. Это полезно, когда операции чтения значительно преобладают над записью:
| Операция | Обёртка | Метод мьютекса |
|---|---|---|
| Запись (единичный доступ) | std::unique_lock |
lock() |
| Чтение (совместный доступ) | std::shared_lock |
lock_shared() |
Пример:
std::shared_mutex rw_mutex;
std::map<int, std::string> data;
// Читать могут несколько потоков одновременно
std::string read(int key) {
std::shared_lock lock(rw_mutex);
return data.at(key);
}
// Писать может только один поток
void write(int key, std::string value) {
std::unique_lock lock(rw_mutex);
data[key] = std::move(value);
}
std::atomic<T>Рассмотрим простой счётчик, к которому обращаются два потока:
int counter = 0;
void increment() {
++counter; // НЕ атомарная операция!
}
На уровне машинных инструкций ++counter — это три отдельных шага:
lw a5,-20(s0) # прочитать значение counter из памяти в регистр
addi a5,a5,1 # прибавить 1 к регистру
sw a5,-20(s0) # записать результат обратно в память
Если два потока выполняют эти шаги одновременно, возникает гонка обновлений:
lw a5,-20(s0) - читает 0lw a5,-20(s0) - читает 0addi a5,a5,1, sw a5,-20(s0) - пишет 1addi a5,a5,1, sw a5,-20(s0) - пишет 1, но должен быть 2Тип std::atomic<T> позволяет выполнять такие операции над переменной атомарно без явного использования мьютекса:
std::atomic<int> counter{0};
// Безопасно из нескольких потоков без мьютекса
void increment() {
counter.fetch_add(1); // или просто ++counter
}
std::atomic<T> гарантирует, что все операции над переменной выполняются неделимо, без возможности вмешательства другого потока. Под капотом процессор использует специальные инструкции (например, LOCK XADD, CMPXCHG на x86-архитектуре) или шины памяти, которые делают операцию неделимой на аппаратном уровне - без блокировок и переключения контекста
Основные методы:
Чтение и запись
std::atomic<int> x{0};
x.store(42); // записать значение
int val = x.load(); // прочитать значение
int val2 = x; // неявный вызов load()
x = 42; // неявный вызов store()
storeиloadпредпочтительнее неявных операторов — они явно сигнализируют, что работа идёт с атомарной переменной.
Обмен
std::atomic<int> x{10};
int old = x.exchange(99); // атомарно: записать 99, вернуть старое значение (10)
Полезно, например, для атомарного сброса флага:
std::atomic<bool> flag{true};
if (flag.exchange(false)) {
// Только один поток войдёт сюда, даже при гонке
}
Сравнение и замена (Compare-and-swap, CAS) - ключевой механизм для алгоритмов без замков
bool compare_exchange_strong(T& expected, T desired);
bool compare_exchange_weak(T& expected, T desired);
Атомарно выполняется следующее:
if (x == expected) {
x = desired;
return true;
} else {
expected = x; // обновляет expected текущим значением
return false;
}
Разница между сильным и слабым сравнениями:
compare_exchange_strong - гарантирует успех, если x == expectedcompare_exchange_weak - может ложно вернуть false, зато быстрее на некоторых архитектурах (RISC, ARM), используется в циклеАрифметические операции (только для целых чисел и указателей)
std::atomic<int> x{10};
x.fetch_add(5); // x = 15, возвращает старое значение (10)
x.fetch_sub(3); // x = 12, возвращает старое значение (15)
x.fetch_and(0xF); // побитовое AND
x.fetch_or(0x1); // побитовое OR
x.fetch_xor(0x3); // побитовое XOR
// Операторы-сокращения (не возвращают старое значение):
++x; x++; --x; x--;
x += 5; x -= 3;
Каждый метод atomic принимает опциональный параметр std::memory_order, который управляет тем, как компилятор и процессор могут переупорядочивать инструкции вокруг атомарной операции, например, x.store(1, std::memory_order_relaxed);
| Memory Order | Смысл |
|---|---|
relaxed |
Никаких гарантий порядка - только атомарность самой операции, поэтому максимальная производительность |
acquire |
Все последующие операции с памятью в этом потоке выполнятся после этой загрузки, используется при load |
release |
Все предшествующие операции с памятью в этом потоке выполнятся до этой записи, используется при store |
acq_rel |
Комбинация acquire и release, используется для exchange, fetch_add |
seq_cst |
Полная последовательная согласованность. Самый безопасный, поэтому значение по умолчанию, но самый медленный |
Типичный паттерн acquire/release — передача данных между потоками без мьютекса:
std::atomic<bool> ready{false};
int data = 0;
// Поток A (производитель)
void producer() {
data = 42; // (1) запись данных
ready.store(true, std::memory_order_release); // (2) публикуем флаг
// release гарантирует, что (1) виден до (2)
}
// Поток B (потребитель)
void consumer() {
while (!ready.load(std::memory_order_acquire)); // (3) ждём флага
// acquire гарантирует, что после (3) мы видим (1)
assert(data == 42); // всегда верно
}
Без acquire и release компилятор или процессор мог бы переставить инструкции так, что data читался бы до того, как производитель его записал
Если нет уверенности, какой порядок использовать, то лучше оставить значение по умолчанию seq_cst, так как оптимизировать такое стоит только тогда, когда есть реальная проблема производительности
std::recursive_mutexОбычный std::mutex вызовет взаимную блокировку, если один и тот же поток попытается заблокировать его дважды. std::recursive_mutex решает эту проблему - он позволяет одному потоку захватывать мьютекс несколько раз подряд (и должен освободить его столько же раз)
Типичный случай - это методы класса, которые вызывают друг друга, при этом каждый берёт блокировку:
class SafeCollection {
std::recursive_mutex m;
std::vector<int> data;
public:
void add(int x) {
std::lock_guard lock(m);
data.push_back(x);
}
void add_twice(int x) {
std::lock_guard lock(m); // первый захват
add(x); // второй захват того же мьютекса
add(x);
}
};
С обычным std::mutex вызов add() внутри add_twice() привёл бы к взаимной блокировке, так как поток попытался бы заблокировать уже захваченный им мьютекс
std::scoped_lockstd::scoped_lock - аналог std::lock_guard, но позволяет захватить несколько мьютексов одновременно, избегая взаимной блокировки:
std::mutex m1, m2;
void transfer(Account& from, Account& to, int amount) {
// Захватываем оба мьютекса атомарно
std::scoped_lock lock(m1, m2);
from.balance -= amount;
to.balance += amount;
}
Если бы два потока захватывали m1 и m2 в разном порядке по отдельности - была бы классическая взаимная блокировка, а тип scoped_lock этого не допускает
Spinlock - это мьютекс, который при ожидании не усыпляет поток, а крутится в цикле. Он быстрее обычного мьютекса при очень коротких критических секциях, но сжигает процессорное время впустую при долгом ожидании
#include <atomic>
class Spinlock {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() { while (flag.test_and_set(std::memory_order_acquire)); }
void unlock() { flag.clear(std::memory_order_release); }
};
Поток, как правило, - тоже ресурс, выделяемый операционной системой. Его создание и освобождение занимают время, поэтому стараются переиспользовать потоки. Из-за этого есть один поток, который принимает задачи в очереди
Создаём поток; если есть разделяемый ресурс (очередь задач) — создаём мьютекс для доступа к нему. Вот как выглядит типичный event loop:
std::queue<std::function<void()>> tasks;
std::mutex m;
Task t;
while (true) {
{
std::lock_guard l(m);
if (!tasks.empty()) {
t = std::move(tasks.front());
tasks.pop();
}
} // мьютекс освобождается здесь
if (t) {
t();
t = nullptr;
}
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
Без паузы поток будет непрерывно захватывать и освобождать мьютекс в цикле, не давая другим потокам возможности в него войти, создавая процессорное голодание. Пауза в 100 нс — это компромисс: поток уступает процессор, но очень ненадолго.
std::condition_variablestd::condition_variable позволяет потоку заснуть до наступления условия и быть разбуженным другим потоком
std::mutex m;
std::queue<std::function<void()>> tasks;
std::condition_variable cv;
bool done = false;
// Поток-потребитель
void worker() {
while (true) {
std::unique_lock lock(m); // condition_variable требует unique_lock
// Засыпаем, пока очередь пуста (и не завершаем работу)
cv.wait(lock, [] { return !tasks.empty() || done; });
if (done && tasks.empty()) break;
auto task = std::move(tasks.front());
tasks.pop();
lock.unlock(); // освобождаем мьютекс перед выполнением задачи
task();
}
}
// Поток-производитель
void enqueue(std::function<void()> task) {
{
std::lock_guard lock(m);
tasks.push(std::move(task));
}
cv.notify_one(); // будим одного спящего потребителя
}
Ключевые методы:
| Метод | Описание |
|---|---|
cv.wait(lock, predicate) |
Засыпает, пока предикат не вернёт true. Атомарно освобождает мьютекс при засыпании и снова захватывает при пробуждении. |
cv.notify_one() |
Будит один ожидающий поток. |
cv.notify_all() |
Будит все ожидающие потоки. |
condition_variable::wait должен временно освободить мьютекс, пока поток спит - lock_guard этого не умеет, а unique_lock поддерживает ручное lock() и unlock()
Также важно заметить, что поток может проснуться без вызова notify. Именно поэтому wait принимает предикат - без него нужно писать цикл while (!predicate()) cv.wait(lock); вручную