#20.6 golang mutex Мьютексы - Логика работы, примеры, sync.RWMutex, использование в структуре

Golang поддерживает механизм мьютексов.

Для чего мьютексы используются на практике

Обычно мьютекс - это возможность гарантировать, что только одна горутина будет иметь доступ к какому-то ресурсу (на чтение/запись или на то и на то), что, как и в других языках, помогает уйти от состояния гонки между горутинами.

Как мьютексы работают "под капотом"

Прежде чем переходить к коду заметим:
Мьютексы не связаны с работой с переменными/памятью, при их использовании ничего про работу с переменными не отслеживается.

Что происходит вместо отслеживания работы с памятью:

  • Мьютексы сами в себе хранят информацию достаточную, чтобы понять - нужно ли блокировать дальнейшее выполнение горутины, когда она пытается заблокировать мьютекс
  • Выполнение горутины останавливается, если она старается выполнить блокировку мьютекса, который был заблокирован кем-то ещё на запись

Пример использования мьютекса

Рассмотрим пример:

package main

import (
	"fmt"
	"sync"
	"time"
)

/*
Будет сбрасывать общую переменную в ноль
и затем инкрементировать ее 5 раз
*/
func doIt(number int, commonValue *int, wg *sync.WaitGroup) {
	*commonValue = 0 // сбрасываем счетчик

	fmt.Println(">>> Старт Горутины", number)
	for k := 1; k <= 5; k++ {
		*commonValue += 1
		time.Sleep(100 * time.Millisecond) // задержка для наглядности
		fmt.Println("Горутина", number, "-", *commonValue)
	}
	wg.Done() // сигнализируем, что горутина завершила работу
}

func main() {
	var wg sync.WaitGroup
	commonValue := 1
	list := []int{1, 2, 3}
	wg.Add(len(list))

	// запускаем горутины
	for _, i := range list {
		go doIt(i, &commonValue, &wg)
	}
	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

- здесь доступ горутин к общей переменной при обновлении её значения никак не синхронизирован, и поэтому распечатка программы будет иметь вид:

>>> Старт Горутины 3
>>> Старт Горутины 1
>>> Старт Горутины 2
Горутина 2 - 2
Горутина 3 - 3
Горутина 1 - 2
Горутина 1 - 5
Горутина 2 - 5
Горутина 3 - 6
Горутина 3 - 8
Горутина 1 - 8
Горутина 2 - 8
Горутина 2 - 11
Горутина 1 - 11
Горутина 3 - 11
Горутина 3 - 14
Горутина 1 - 14
Горутина 2 - 14
Конец!

Теперь давайте добавим мьютекс, чтобы очередная добравшаяся до commonValue горутина, не давала доступ к эту переменной другим горутинам, пока эта очередная горутина не завершит свою работу:

package main

import (
	"fmt"
	"sync"
	"time"
)

/*
Будет сбрасывать общую переменную в ноль
и затем инкрементировать ее 5 раз
*/
func doIt(number int, commonValue *int,
	wg *sync.WaitGroup, mutex *sync.Mutex) {
	mutex.Lock()     // блокируем доступ
	*commonValue = 0 // сбрасываем счетчик

	fmt.Println(">>> Старт Горутины", number)
	for k := 1; k <= 5; k++ {
		*commonValue += 1
		time.Sleep(100 * time.Millisecond) // задержка для наглядности
		fmt.Println("Горутина", number, "-", *commonValue)
	}
	mutex.Unlock() // разблокируем доступ
	wg.Done()      // сигнализируем, что горутина завершила работу
}

func main() {
	var mutex sync.Mutex // определяем мьютекс
	var wg sync.WaitGroup
	commonValue := 1
	list := []int{1, 2, 3}
	wg.Add(len(list))

	// запускаем горутины
	for _, i := range list {
		go doIt(i, &commonValue, &wg, &mutex)
	}
	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

-- теперь распечатка нашей программы будет выглядеть более упорядоченно:

>>> Старт Горутины 3
Горутина 3 - 1
Горутина 3 - 2
Горутина 3 - 3
Горутина 3 - 4
Горутина 3 - 5
>>> Старт Горутины 1
Горутина 1 - 1
Горутина 1 - 2
Горутина 1 - 3
Горутина 1 - 4
Горутина 1 - 5
>>> Старт Горутины 2
Горутина 2 - 1
Горутина 2 - 2
Горутина 2 - 3
Горутина 2 - 4
Горутина 2 - 5
Конец!

-- как это работает?
В целом тут всё просто мы явно блокируем выполнение горутины вызовом:

mutex.Lock()  // блокируем доступ

-- при этом, в примере выше в функции мы передавали указатель на переменную мьютекса (чтобы все горутины работали с одним мьютексом, а не с копией),
далее всё, можно сказать, просто:

  1. если мьютекс уже был заблокирован какой то горутиной (то есть mutex.Lock() был выполен), то тот же мьютекс уже не получится заблокировать в паррально выполняем коде
  2. Таким образом, все кто не успел его заблокировать, - ждут, остановившись, опть же на строчке блокировки, пока заблокировшая его горутина его не разблокирует, и только после этого кто-то первый из ожидающих сможет заблокировать его снова.

Логика работы sync.Mutex не связана с чтением/записью "общих" данных

Стоит подчеркнуть, что мьютекс в примере выше это именно явная блокировка выполнения, а не какая-то инструкция особого доступа к переменным, которая вызывает блокировку именно при их чтении/записи.

Чтобы проиллюстрировать этот момент рассмотрим пример, где в горутинах вообще не используются общие данные (кроме самого мьютекса и группы синхронизации), т.е. мы не используем в функциях глобальные значения или не передаем указатель на них, чтобы изменять их как в примерах выше, а просто блокируем параллельное выполнение для части функции горутины:

package main

import (
	"fmt"
	"sync"
	"time"
)

func doIt(number int,
	wg *sync.WaitGroup, mutex *sync.Mutex) {

	// вынесем сообщение о старте за зону синхронизации
	fmt.Println(">>> Старт Горутины", number)
	mutex.Lock() // блокируем доступ (синхиронизация)

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond) // задержка для наглядности
		fmt.Println("Горутина", number, k)
	}
	mutex.Unlock() // разблокируем доступ
	wg.Done()      // сигнализируем, что горутина завершила работу

}

func main() {
	var mutex sync.Mutex // определяем мьютекс
	var wg sync.WaitGroup
	list := []int{1, 2, 3}
	wg.Add(len(list))

	// запускаем горутины
	for _, i := range list {
		go doIt(i, &wg, &mutex)
	}
	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

-- после запуска мы получим распечатку вида:

>>> Старт Горутины 3
>>> Старт Горутины 1
>>> Старт Горутины 2
Горутина 3 1
Горутина 3 2
Горутина 3 3
Горутина 1 1
Горутина 1 2
Горутина 1 3
Горутина 2 1
Горутина 2 2
Горутина 2 3
Конец!

-- где видно, что сообщения о старте выводят все в кучу, без блокировки, а вот зона окруженная mutex.Lock()/Unlock() не перемешивается с параллельными вызовам (т.е. циклы каждой горутины доработавают до конца и только потом свой цикл может запустить следующая горутина).

sync.RWMutex - разрешаем "чтение", блокируем "запись"

В гошке кроме sync.Mutex есть еще один тип мьютекса sync.RWMutex, он позволяет более тонко настраивать блокировку.
Сначала в целом покажем чем они отличаются:

  • sync.Mutex - Работает "стул" на котором может сидеть одна горутина, пока она не встанет (не разблокирует мьютекс) - другие сесть не могут
  • sync.RWMutex - обладает 4 методами:
    1. RLock() - под капотом накручивает счетчик "блокировок", но при этом реально не блокирует выполение.
      Т.е. бесконечное число горутин могут вызвать этот метод и продолжить выполнение, но пока этот счетчик не станет нулю не получится вызывать метод реальной/эксклюзивной блокировки .Lock() (о нём будет ниже)

      Фактически RLock() уведомляет все потоки, использующие этот мьютекс, что существуют N (по числу раз вызова этого метода) потоков, которые вызывали это метод, а с чтением это связано только тем, именно эту блокировку можно использовать, чтобы дать и другим потокам читать данные параллельно с текущим.

    2. RUnlock(): освобождает полученную одну блокировку, то есть скручивает счетчик блокировок на 1
    3. Lock(): блокирует мьютекс, можно использовать для защиты от состояния гонки при записи
      - эта шутка работает как и метод в Lock() в sync.Mutex, но если на момент вызова, есть более 0 нуля неразблокированных вызовов RLock(), то горутина не сможет заблокировать мьютекс, а будет заблокирована сама в ожидании пока счетчик будет скручен через RUnlock()
    4. Unlock(): снимает блокировку, которую ранее добавили через Lock(). При выполнении Unlock без получения блокировки мы получим ошибку во время выполнения.

RWMutex.RLock() сам по себе ничего не блокирует

Проиллюстрируем это на примере:

package main

import (
	"fmt"
	"sync"
	"time"
)

func runGorutine1(
	wg *sync.WaitGroup, mutex *sync.RWMutex) {

	mutex.RLock() // "блокируем", с разрешением "чтения"

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Горутина _1", k)
	}
	mutex.RUnlock() // "разблокируем" доступ
	wg.Done()       // сигнализируем, что горутина завершила работу

}

func runGorutine2(
	wg *sync.WaitGroup, mutex *sync.RWMutex) {

	mutex.RLock() // "блокируем", с разрешением "чтения"

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Горутина _2", k)
	}
	mutex.RUnlock() // "разблокируем" доступ
	wg.Done()       // сигнализируем, что горутина завершила работу

}

func main() {
	var mutex sync.RWMutex // определяем мьютекс
	var wg sync.WaitGroup

	wg.Add(2)

	go runGorutine1(&wg, &mutex)
	go runGorutine2(&wg, &mutex)

	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

-- если запустим, то увидим, что витки цикла смешиваются в распечатке, эксклюзивной блокировки нет:

Горутина _2 1
Горутина _1 1
Горутина _1 2
Горутина _2 2
Горутина _1 3
Горутина _2 3
Конец!

-- но если мы хотя бы в одной горутине перейдем на "эксклюзивную" блокировку мьютекса, то синхронизация начнет работать:

package main

import (
	"fmt"
	"sync"
	"time"
)

func runGorutine1(
	wg *sync.WaitGroup, mutex *sync.RWMutex) {

	mutex.RLock() // "блокируем", с разрешением "чтения"

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Горутина _1", k)
	}
	mutex.RUnlock() // "разблокируем" доступ
	wg.Done()       // сигнализируем, что горутина завершила работу

}

func runGorutine2(
	wg *sync.WaitGroup, mutex *sync.RWMutex) {

	// перейдем на обычную полноценную блокировку
	mutex.Lock() // "блокируем" полностью

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Горутина _2", k)
	}
	mutex.Unlock() // "разблокируем" доступ
	wg.Done()      // сигнализируем, что горутина завершила работу

}

func main() {
	var mutex sync.RWMutex // определяем мьютекс
	var wg sync.WaitGroup

	wg.Add(2)

	go runGorutine1(&wg, &mutex)
	go runGorutine2(&wg, &mutex)

	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

-- распечатка станет упорядоченной в "критической зоне", витки цикла перестанут смешиваться:

Горутина _1 1
Горутина _1 2
Горутина _1 3
Горутина _2 1
Горутина _2 2
Горутина _2 3
Конец!

- почему так происходит?

Дело в том, что, как мы написали выше, одна горутина вызывает .RLock() - это разрешает вызывать другие .RLock()-ки, но блокирует всех кто вызовет .Lock(), до того как все вызыванные .RLock()-ки будут компенсированы вызванными RUnlock()-ами. Вот так-то ;)

Мьютексы как поля структуры

Внутри структур можно определить свой мьютекс (RW или обычный) как поле и использовать его, рассмотрим минималистичный пример:

package main

import (
	"fmt"
	"sync"
)

// структура с полем-мьютексом
type Stats struct {
	sync.RWMutex
	counters map[string]int
}

func (s *Stats) Read(key string) int {
	// метод доступен в структуре
	// из-за композии "встраиванием"
	s.RLock()         // неэксклюзивная блокировка
	defer s.RUnlock() // отложенный вызов разблокировки
	return s.counters[key]
}

func (s *Stats) Write(key string, val int, wg *sync.WaitGroup) {
	s.Lock() // эксклюзивная блокировка
	defer s.Unlock()
	s.counters[key] = val
	fmt.Println("Сохраняем", val)
	wg.Done()
}

func main() {
	var wg sync.WaitGroup
	s := &Stats{counters: make(map[string]int)}

	wg.Add(1)
	go s.Write("users", 10, &wg) // Пишем
	wg.Wait()

	fmt.Println("Читаем:", s.Read("users")) // Читаем
}

-- здесь метод мьютекса доступен на уровне переменной структуры за счет "наследования".

Последний пример не показывает решения состояния гонки, т.к. мы не делаем тут параллельных вызовов методов,
но в целом идея должен быть понятна - если переменная структуры будет доступна нескольким потокам, то мы можем с помощью встроенного мьютекса прямо на уровне реализации методов инкапсулированно выполнять какие-то участки кода (критические зоны) в особом режиме, используя методы мьютексов.

Источники/дополнительные материалы

vedro-compota's picture

Пример где RWMutex с блокировкой на запись приводит к хаотичному выводу в консоль (блокировка фактически не работает, т.к. в коде нет записи):

package main

import (
	"fmt"
	"sync"
	"time"
)

func doIt(number int,
	wg *sync.WaitGroup, mutex *sync.RWMutex) {

	// вынесем сообщение о старте за зону синхронизации
	fmt.Println(">>> Старт Горутины", number)
	mutex.RLock() // блокируем доступ (синхиронизация)

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond) // задержка для наглядности
		fmt.Println("Горутина", number, k)
	}
	mutex.RUnlock() // разблокируем доступ
	wg.Done()       // сигнализируем, что горутина завершила работу

}

func main() {
	var mutex sync.RWMutex // определяем мьютекс
	var wg sync.WaitGroup
	list := []int{1, 2, 3}
	wg.Add(len(list))

	// запускаем горутины
	for _, i := range list {
		go doIt(i, &wg, &mutex)
	}
	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

_____________
матфак вгу и остальная классика =)

vedro-compota's picture

еще пример работы обычно мьютекса:

package main

import (
	"fmt"
	"sync"
	"time"
)

func runGorutine1(
	wg *sync.WaitGroup, mutex *sync.Mutex) {

	mutex.Lock() // блокируем доступ (синхиронизация)

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond) // задержка для наглядности
		fmt.Println("Горутина 1", k)
	}
	mutex.Unlock() // разблокируем доступ
	wg.Done()      // сигнализируем, что горутина завершила работу

}

func runGorutine2(
	wg *sync.WaitGroup, mutex *sync.Mutex) {

	mutex.Lock() // блокируем доступ (синхиронизация)

	for k := 1; k <= 3; k++ {
		time.Sleep(100 * time.Millisecond) // задержка для наглядности
		fmt.Println("Горутина 2", k)
	}
	mutex.Unlock() // разблокируем доступ
	wg.Done()      // сигнализируем, что горутина завершила работу

}

func main() {
	var mutex sync.Mutex // определяем мьютекс
	var wg sync.WaitGroup

	wg.Add(2)

	go runGorutine1(&wg, &mutex)
	go runGorutine2(&wg, &mutex)

	// ожидаем завершения всех горутин
	wg.Wait()
	fmt.Println("Конец!")
}

_____________
матфак вгу и остальная классика =)

vedro-compota's picture

Мьютекс — это не «надсмотрщик» над памятью, а просто сигнальный флаг.

Как это работает на самом деле

Когда вы пишете mu.Lock(), программа не начинает магическим образом следить за вашими переменными. Она просто проверяет состояние самого мьютекса:

  • Если флаг «занят», горутина засыпает.
  • Если флаг «свободен», горутина помечает его как занятый и идет выполнять код дальше.

Память никак не защищена автоматически. Вы можете забыть вызвать Lock() в одной из горутин, и она спокойно прочитает или перезапишет те же самые переменные, пока другая горутина удерживает мьютекс. Это и называется Race Condition (состояние гонки).

В чем тогда разница в «отслеживании»?

Разница только в том, какой именно «сигнал» проверяет мьютекс:

Обычный Mutex (sync.Mutex):

  • У него внутри всего одно состояние: «кто-то внутри» или «свободно». Ему всё равно, что вы там делаете — читаете или пишете.

RWMutex (sync.RWMutex):

  • Он чуть умнее, но он отслеживает только количество читателей (через внутренний счетчик), а не сами переменные.
  • Когда вы вызываете RLock(), он просто увеличивает счетчик: «теперь здесь +1 читатель».
  • Когда вы вызываете Lock() (для записи), он смотрит: «ага, счетчик читателей больше нуля, значит, я жду, пока он станет 0».

_____________
матфак вгу и остальная классика =)

vedro-compota's picture

  • переписать опредления R-методов для RW
  • выстариваются ли попытки заблокировать мьютекс в очередь

_____________
матфак вгу и остальная классика =)