itmo_conspects

Лекция 2. Примитивы синхронизации в C++

В качестве объектов для синхронизации потоков используются классы:

std::mutex m;
int shared_counter = 0;

void increment() {
    std::lock_guard<std::mutex> lock(m); // lock() вызывается здесь
    ++shared_counter;
} // unlock() вызывается автоматически при разрушении lock

Семафоры и другие низкоуровневые примитивы, как правило, не используются в обычном прикладном коде - они появляются преимущественно в системном программировании или при реализации собственных примитивов синхронизации


Также существуют модификация мьютекса – разделенный мьютекс. Разделенный мьютекс (или RW-mutex) std::shared_mutex работает аналогично обычному мьютексу, но допускает несколько одновременных блокировок для чтения и только одну для записи. Это полезно, когда операции чтения значительно преобладают над записью:

Операция Обёртка Метод мьютекса
Запись (единичный доступ) std::unique_lock lock()
Чтение (совместный доступ) std::shared_lock lock_shared()

Пример:

std::shared_mutex rw_mutex;
std::map<int, std::string> data;

// Читать могут несколько потоков одновременно
std::string read(int key) {
    std::shared_lock lock(rw_mutex);
    return data.at(key);
}

// Писать может только один поток
void write(int key, std::string value) {
    std::unique_lock lock(rw_mutex);
    data[key] = std::move(value);
}

Тип std::atomic<T>

Рассмотрим простой счётчик, к которому обращаются два потока:

int counter = 0;

void increment() {
    ++counter; // НЕ атомарная операция!
}

На уровне машинных инструкций ++counter — это три отдельных шага:

lw      a5,-20(s0)    # прочитать значение counter из памяти в регистр
addi    a5,a5,1       # прибавить 1 к регистру  
sw      a5,-20(s0)    # записать результат обратно в память

Если два потока выполняют эти шаги одновременно, возникает гонка обновлений:

  1. Поток A: lw a5,-20(s0) - читает 0
  2. Поток B: lw a5,-20(s0) - читает 0
  3. Поток A: addi a5,a5,1, sw a5,-20(s0) - пишет 1
  4. Поток B: addi a5,a5,1, sw a5,-20(s0) - пишет 1, но должен быть 2

Тип std::atomic<T> позволяет выполнять такие операции над переменной атомарно без явного использования мьютекса:

std::atomic<int> counter{0};

// Безопасно из нескольких потоков без мьютекса
void increment() {
    counter.fetch_add(1); // или просто ++counter
}

std::atomic<T> гарантирует, что все операции над переменной выполняются неделимо, без возможности вмешательства другого потока. Под капотом процессор использует специальные инструкции (например, LOCK XADD, CMPXCHG на x86-архитектуре) или шины памяти, которые делают операцию неделимой на аппаратном уровне - без блокировок и переключения контекста

Основные методы:


Каждый метод atomic принимает опциональный параметр std::memory_order, который управляет тем, как компилятор и процессор могут переупорядочивать инструкции вокруг атомарной операции, например, x.store(1, std::memory_order_relaxed);

Memory Order Смысл
relaxed Никаких гарантий порядка - только атомарность самой операции, поэтому максимальная производительность
acquire Все последующие операции с памятью в этом потоке выполнятся после этой загрузки, используется при load
release Все предшествующие операции с памятью в этом потоке выполнятся до этой записи, используется при store
acq_rel Комбинация acquire и release, используется для exchange, fetch_add
seq_cst Полная последовательная согласованность. Самый безопасный, поэтому значение по умолчанию, но самый медленный

Типичный паттерн acquire/release — передача данных между потоками без мьютекса:

std::atomic<bool> ready{false};
int data = 0;

// Поток A (производитель)
void producer() {
    data = 42;                               // (1) запись данных
    ready.store(true, std::memory_order_release); // (2) публикуем флаг
    // release гарантирует, что (1) виден до (2)
}

// Поток B (потребитель)
void consumer() {
    while (!ready.load(std::memory_order_acquire)); // (3) ждём флага
    // acquire гарантирует, что после (3) мы видим (1)
    assert(data == 42); // всегда верно
}

Без acquire и release компилятор или процессор мог бы переставить инструкции так, что data читался бы до того, как производитель его записал

Если нет уверенности, какой порядок использовать, то лучше оставить значение по умолчанию seq_cst, так как оптимизировать такое стоит только тогда, когда есть реальная проблема производительности

Тип std::recursive_mutex

Обычный std::mutex вызовет взаимную блокировку, если один и тот же поток попытается заблокировать его дважды. std::recursive_mutex решает эту проблему - он позволяет одному потоку захватывать мьютекс несколько раз подряд (и должен освободить его столько же раз)

Типичный случай - это методы класса, которые вызывают друг друга, при этом каждый берёт блокировку:

class SafeCollection {
    std::recursive_mutex m;
    std::vector<int> data;

public:
    void add(int x) {
        std::lock_guard lock(m);
        data.push_back(x);
    }

    void add_twice(int x) {
        std::lock_guard lock(m); // первый захват
        add(x);                  // второй захват того же мьютекса
        add(x);
    }
};

С обычным std::mutex вызов add() внутри add_twice() привёл бы к взаимной блокировке, так как поток попытался бы заблокировать уже захваченный им мьютекс

Тип std::scoped_lock

std::scoped_lock - аналог std::lock_guard, но позволяет захватить несколько мьютексов одновременно, избегая взаимной блокировки:

std::mutex m1, m2;

void transfer(Account& from, Account& to, int amount) {
    // Захватываем оба мьютекса атомарно
    std::scoped_lock lock(m1, m2);
    from.balance -= amount;
    to.balance   += amount;
}

Если бы два потока захватывали m1 и m2 в разном порядке по отдельности - была бы классическая взаимная блокировка, а тип scoped_lock этого не допускает

Spinlock

Spinlock - это мьютекс, который при ожидании не усыпляет поток, а крутится в цикле. Он быстрее обычного мьютекса при очень коротких критических секциях, но сжигает процессорное время впустую при долгом ожидании

#include <atomic>

class Spinlock {
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
    void lock()   { while (flag.test_and_set(std::memory_order_acquire)); }
    void unlock() { flag.clear(std::memory_order_release); }
};

Петля событий

Поток, как правило, - тоже ресурс, выделяемый операционной системой. Его создание и освобождение занимают время, поэтому стараются переиспользовать потоки. Из-за этого есть один поток, который принимает задачи в очереди

Создаём поток; если есть разделяемый ресурс (очередь задач) — создаём мьютекс для доступа к нему. Вот как выглядит типичный event loop:

std::queue<std::function<void()>> tasks;
std::mutex m;

Task t;
while (true) {
    {
        std::lock_guard l(m);
        if (!tasks.empty()) {
            t = std::move(tasks.front());
            tasks.pop();
        }
    } // мьютекс освобождается здесь

    if (t) {
        t();
        t = nullptr;
    }

    std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}

Без паузы поток будет непрерывно захватывать и освобождать мьютекс в цикле, не давая другим потокам возможности в него войти, создавая процессорное голодание. Пауза в 100 нс — это компромисс: поток уступает процессор, но очень ненадолго.

std::condition_variable

std::condition_variable позволяет потоку заснуть до наступления условия и быть разбуженным другим потоком

std::mutex m;
std::queue<std::function<void()>> tasks;
std::condition_variable cv;
bool done = false;

// Поток-потребитель
void worker() {
    while (true) {
        std::unique_lock lock(m); // condition_variable требует unique_lock

        // Засыпаем, пока очередь пуста (и не завершаем работу)
        cv.wait(lock, [] { return !tasks.empty() || done; });

        if (done && tasks.empty()) break;

        auto task = std::move(tasks.front());
        tasks.pop();
        lock.unlock(); // освобождаем мьютекс перед выполнением задачи

        task();
    }
}

// Поток-производитель
void enqueue(std::function<void()> task) {
    {
        std::lock_guard lock(m);
        tasks.push(std::move(task));
    }
    cv.notify_one(); // будим одного спящего потребителя
}

Ключевые методы:

Метод Описание
cv.wait(lock, predicate) Засыпает, пока предикат не вернёт true. Атомарно освобождает мьютекс при засыпании и снова захватывает при пробуждении.
cv.notify_one() Будит один ожидающий поток.
cv.notify_all() Будит все ожидающие потоки.

condition_variable::wait должен временно освободить мьютекс, пока поток спит - lock_guard этого не умеет, а unique_lock поддерживает ручное lock() и unlock()

Также важно заметить, что поток может проснуться без вызова notify. Именно поэтому wait принимает предикат - без него нужно писать цикл while (!predicate()) cv.wait(lock); вручную