CPP 058 – Working With Shared Data

Critical Section

Data race’in gerçekleşebileceği bölgeye critical section diyoruz. Yani kritik bölge, bir kaza/bir yanlışın çıkabileceği alan.

Aşağıdaki kodda brute force bir çarpma işlemi yapıyoruz. 20 farklı thread aynı değişkeni 10’000’er defa ++(increment) ediyor. sonuç 200’000 olmalı. Fakat kritik bölgeye dikkat etmediğimizde yanlış sonuç elde ediyoruz:

#include <iostream>
#include <vector>
#include <thread>

using namespace std;

void add(int& num, int y) {
	for (int i{}; i < y; i++)
		num++;
}

int main() {
	vector<thread> threads;

	int final_num{};
	int mult_x = 20;
	int mult_y = 10'000;

	for (int i{}; i < mult_x; i++) 
		threads.push_back(thread(add, ref(final_num), mult_y));
	
	for(auto& t : threads) 
		t.join();

	cout <<	"Final number: " << final_num << endl;
	// Final number: 194700
}

Bu koddaki kritik bölge:

	for (int i{}; i < y; i++)
		num++; // bu satır kritik bölgemiz 

Mutex

Mutual exclusive’in kısaltması. 4 kişi tuvalette bekliyor olsun. Tuvalet boşsa kişilerden biri girip kapıyı kitler. İşi bittiğinde kilidi açar ve dışarı çıkar. Daha sonra başka bir kişi aynı işlemi tekrarlar. Buradaki kilit mekanizmasına Mutex diyoruz.

Mutex’in 3 fonksiyonu vardır.

  • lock: Kapıyı kilitler. Eğer kapı zaten kilitliyse o zaman bekler.
  • try_lock: Kapıyı kilitlemeyi dener. Eğer kapıyı kilitlemişse true, kilitleyememişse false döner AMA bu fonksiyon lock fonksiyonunun aksine bekleme yapmaz.
  • unlock: Kilidi açar.
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

using namespace std;

mutex m; 

void add(int& num, int y) {
	m.lock();
	for (int i{}; i < y; i++) 
		num++;
	m.unlock();
}

int main() {
	vector<thread> threads;

	int final_num{};
	int mult_x = 20;
	int mult_y = 10'000;

	for (int i{}; i < mult_x; i++) 
		threads.push_back(thread(add, ref(final_num), mult_y));
	
	for(auto& t : threads) 
		t.join();

	cout <<	"Final number: " << final_num << endl;
	// Final number: 200000
}

Burada kritik bölgemizdeki data race problemini mutex vasıtasıyla hallettik ama şuanda başka bir problemimiz var. Kod parallel bir şekilde çalışmıyor. Seri bir şekilde çalışıyor. 1 thread kitleyip for döngüsünü tamamlayıp kilidi açıyor, sonra öteki thread kitleyip for döngüsünü tamamlayıp kritik bölgeyi açıyor.

Bu yaklaşım tek threadli bir programdan bile daha yavaş çalışacaktır.

O zaman bizim daha iyi bir metoda ihtiyacımız var:

void add(int& num, int y) {
	int my_num{};
	for (int i{}; i < y; i++) 
		my_num++;

	m.lock();
	num += my_num;
	m.unlock();
}

Burada her thread kendine ait oluşturmuş olduğu my_num değişkeni üzerinde işlemleri halledip en sonunda num değişkenine bunu aktarıyor. Böylece kritik bölgede geçen süre ciddi boyutta azalmış oluyor.

Peki hiç mutex kullanmadan yapabilir miyiz? Tabi:

#include <iostream>
#include <vector>
#include <thread>

using namespace std;

void add(int& num, int y) {
	for (int i{}; i < y; i++) 
		num++;
}

int main() {
	vector<thread> threads;
	vector <int> numbers;

	int final_num{};
	int mult_x = 20;
	int mult_y = 10'000;

	numbers.resize(mult_x);
	fill(numbers.begin(), numbers.end(), 0);

	for (int i{}; i < mult_x; i++) 
		threads.push_back(thread(add, ref(numbers[i]), mult_y));

	for(auto& t : threads) 
		t.join();

	for(int& i: numbers) 
		final_num += i;

	cout <<	"Final number: " << final_num << endl;
	// Final number: 200000
}

20 elemanlık bir vektör oluşturup her bir üyesii farklı threadlere verdik. threadler hesaplamalarını bitirdikten sonra ana thread’de vektördeki tüm elemanları topladık.

lock_guard

mutex’in exception vb durumlarda unlock olmama durumunu önlemek için yapılmış bir wrapper’dır. Mutex objesini alıp RAII ile harmanlıyor:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

using namespace std;

mutex my_mutex;

void add(int& num, int y) {
	lock_guard<mutex> lg(my_mutex); // Lock the mutex for the duration of this function
	for (int i{}; i < y; i++) 
		num++;
}

int main() {
	vector<thread> threads;
	
	int final_num{};
	int mult_x = 20;
	int mult_y = 10'000;


	for (int i{}; i < mult_x; i++) 
		threads.push_back(thread(add, ref(final_num), mult_y));

	for(auto& t : threads) 
		t.join();

	cout <<	"Final number: " << final_num << endl;
	// Final number: 200000
}

unique_lock

Lock_guard’a benzer yapıdadır. Farkları:

  • Lock_guard, scope’un sonuna kadar çalışır eğer kritik bölgeden sonra aynı scope’un içerisinde yapılacak iş varsa performans kaybına neden olur:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

using namespace std;

mutex my_mutex;

void print(string str) {	
	lock_guard<mutex> lg(my_mutex); // Lock the mutex for the duration of this function
	for (int i{}; i < 3; i++) 
		cout << str << endl; 

	this_thread::sleep_for(chrono::milliseconds(400)); // Simulate some work
}

int main() {
	vector<thread> threads;

	for (string str : {"abc", "def", "kgh", "xyz"}) 
		threads.push_back(thread(print, str));

	for(auto& t : threads) 
		t.join();
}

Yukarıdaki örnekte gerekmediği halde mutex’in kitli kalması sonucu diğer tüm threadler her seferinde 400ms lik bir geçikme yaşıyor. Bunu unique_lock ile önleyebiliriz:

void print(string str) {	
	unique_lock<mutex> ul(my_mutex); // Lock the mutex for the duration of this function
	for (int i{}; i < 3; i++) 
		cout << str << endl; 
	ul.unlock(); // Unlock the mutex
	this_thread::sleep_for(chrono::milliseconds(400)); // Simulate some work
}

unique_lock, lock_guard’ın aksine manual olarak unlock edilebilir. Eğer manual olarak unlcok edilmezse scope bitiminde otomatik olarak unlock edilir.

https://en.cppreference.com/w/cpp/thread/unique_lock

Lock Type

uniqe lock’u farklı taglarle initialize edebiliyoruz:

TypeEffect(s)
defer_lock_tdo not acquire ownership of the mutex
try_to_lock_ttry to acquire ownership of the mutex without blocking
adopt_lock_tassume the calling thread already has ownership of the mutex

timed_mutex

mutex’e ek olarak iki fonksiyon içerir:

  • try_lock_for(duration): Mutex kilitliyse belli bir süre boyunca kitlemek için bekler eğer olmazsa false döner. Kitleyebilirse true döner.
  • try_lock_until(deadline): Aynı durum söz konusu tek farkı “belli bir zaman noktasına kadar” bekleme yapar.
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

using namespace std;

timed_mutex mut;

void t1() {
	mut.lock();
	cout << "Thread 1 locked mut" << endl;
	this_thread::sleep_for(chrono::seconds(5));
	cout << "Thread 1 unlocked mut" << endl;
	mut.unlock();
}
void t2() {
	this_thread::sleep_for(500ms);

	while (!mut.try_lock_for(1s))	
		cout << "Thread 2 cannot locked mut" << endl;

	cout << "Thread 2 locked mut" << endl;
	this_thread::sleep_for(1s);
	cout << "Thread 2 unlocked mut" << endl;
	mut.unlock();
}

int main() {
	thread thread1(t1);
	thread thread2(t2);

	thread1.join();
	thread2.join();
}

// Thread 1 locked mut
// Thread 2 cannot locked mut
// Thread 2 cannot locked mut
// Thread 2 cannot locked mut
// Thread 2 cannot locked mut
// Thread 1 unlocked mut
// Thread 2 locked mut
// Thread 2 unlocked mut

Yukarıdaki örnekte try_lock_for fonksiyonunun kullanımı gördük. try_lock_until fonksiyonunu da aşağıdaki şekilde kullanabiliriz:

void t2() {
	//...
	auto death_time = chrono::system_clock::now() + 1s;
	while (!mut.try_lock_until(death_time))
		cout << "Thread 2 cannot locked mut" << endl;
	//...
}

recursive_mutex

Normalde bir mutex 1 kere lock edilir sonra unlock edilir. Recursive de n kere lock ve n kere unlock edebiliyoruz:

void foo() {
   ... mutex_acquire();
   ... foo();
   ... mutex_release();
}

https://stackoverflow.com/questions/2415082/when-to-use-recursive-mutex

Chrono Clocks…

https://stackoverflow.com/questions/64080301/what-are-the-pros-cons-of-the-different-c-clocks-for-logging-time-stamps

system_clock is a clock that keeps time with UTC (excluding leap seconds). Every once in a while (maybe several times a day), it gets adjusted by small amounts, to keep it aligned with the correct time. This is often done with a network service such as NTP. These adjustments are typically on the order of microseconds, but can be either forward or backwards in time. It is actually possible (though not likely nor common) for timestamps from this clock to go backwards by tiny amounts. Unless abused by an administrator, system_clock does not jump by gross amounts, say due to daylight saving, or changing the computer’s local time zone, since it always tracks UTC.

steady_clock is like a stopwatch. It has no relationship to any time standard. It just keeps ticking. It may not keep perfect time (no clock does really). But it will never be adjusted, especially not backwards. It is great for timing short bits of code. But since it never gets adjusted, it may drift over time with respect to system_clock which is adjusted to keep in sync with UTC.

This boils down to the fact that steady_clock is best for timing short durations. It also typically has nanosecond resolution, though that is not required. And system_clock is best for timing “long” times where “long” is very fuzzy. But certainly hours or days qualifies as “long”, and durations under a second don’t. And if you need to relate a timestamp to a human readable time such as a date/time on the civil calendar, system_clock is the only choice.

high_resolution_clock is allowed to be a type alias for either steady_clock or system_clock, and in practice always is. But some platforms alias to steady_clock and some to system_clock. So imho, it is best to just directly choose steady_clock or system_clock so that you know what you’re getting.

Though not specified, std::time is typically restricted to a resolution of a second. So it is completely unusable for situations that require subsecond precision. Otherwise std::time tracks UTC (excluding leap seconds), just like system_clock.

std::clock tracks processor time, as opposed to physical time. That is, when your thread is not busy doing something, and the OS has parked it, measurements of std::clock will not reflect time increasing during that down time. This can be really useful if that is what you need to measure. And it can be very surprising if you use it without realizing that processor time is what you’re measuring.

And new for C++20

C++20 adds four more clocks to the <chrono> library:

utc_clock is just like system_clock, except that it counts leap seconds. This is mainly useful when you need to subtract two time_points across a leap second insertion point, and you absolutely need to count that inserted leap second (or a fraction thereof).

tai_clock measures seconds since 1958-01-01 00:00:00 and is offset 10s ahead of UTC at this date. It doesn’t have leap seconds, but every time a leap second is inserted into UTC, the calendrical representation of TAI and UTC diverge by another second.

gps_clock models the GPS time system. It measures seconds since the first Sunday of January, 1980 00:00:00 UTC. Like TAI, every time a leap second is inserted into UTC, the calendrical representation of GPS and UTC diverge by another second. Because of the similarity in the way that GPS and TAI handle UTC leap seconds, the calendrical representation of GPS is always behind that of TAI by 19 seconds.

file_clock is the clock used by the filesystem library, and is what produces the chrono::time_point aliased by std::filesystem::file_time_type.

One can use a new named cast in C++20 called clock_cast to convert among the time_points of system_clock, utc_clock, tai_clock, gps_clock and file_clock. For example:

auto tp = clock_cast<system_clock>(last_write_time("some_path/some_file.xxx"));

The type of tp is a system_clock-based time_point with the same duration type (precision) as file_time_type.

shared_mutex

  • 1 writer, n reader olma durumunda kullanırız.
  • 1 değişkenden farklı threadlerin okumasında bir sıkıntı yok.
  • 1 değişkene farklı threadlerin yazması sıkıntılı. Çünkü hangi thread’in verinin hangi bölümünü yazacağı belli değil.
  • 1 değişkene bir thread’in yazıp öteki thread’in okuması yine sıkıntılı bir durum. Çünkü verinin bir kısmı yeni, diğer kısmı eski olabilir.
  • Bu sıkıntıları gidermek için shared mutex kullanıyoruz:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <shared_mutex>

using namespace std;

shared_mutex mut;

int counter = 0;

void read() {
	while (true){
		shared_lock<shared_mutex> lock(mut); 
		cout << counter << ", ";
		lock.unlock();
		this_thread::sleep_for(100ms);
	}
}
void write() {
	while (true){
		unique_lock<shared_mutex> lock(mut); 
		counter++;
		cout << endl;
		lock.unlock();
		this_thread::sleep_for(1s);
	}
}

int main() {
	vector<thread> threads;

	threads.push_back(thread(write));
	threads.push_back(thread(read));

	for (auto & t : threads) t.join();
}

// 1. shared_mutex: Okuma ve yazma işlemlerini senkronize etmek için kullanılır.

// 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// 2, 2, 2, 2, 2, 2, 2, 2, 2,
// 3, 3, 3, 3, 3, 3, 3, 3, 3,
// 4, 4, 4, 4, 4, 4, 4, 4, 4,
// 5, 5, 5, 5, 5, 5, 5, 5, 5,
// 6, 6, 6, 6, 6, 6, 6, 6, 6,
// 7, 7, 7, 7, 7, 7, 7, 7, 7,
// 8, 8, 8, 8, 8, 8, 8, 8, 8,
// 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
// 10, 10, 10, 10, 10, 10, 10, 10, 10,
// 11, 11, 11, 11, 11, 11, 11, 11, 11,
// 12, 12, 12, 12, 12, 12, 12, 12, 12,
// 13, 13, 13, 13, 13, 13, 13, 13, 13,
// 14, 14, 14, 14, 14, 14, 14, 14, 14,
// 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
// 16, 16, 16, 16, 16, 16, 16, 16, 16,
// 17, 17, 17, 17, 17, 17, 17, 17, 17,
// 18, 18, 18, 18, 18, 18, 18, 18, 18,
// 19, 19, 19, 19, 19, 19, 19, 19, 19,
// 20, 20, 20, 20, 20, 20, 20, 20, 20, 20,
// 21, 21, 21, 21, 21, 21, 21, 21, 21,
// 22, 22, 22, 22, 22, 22, 22, 22, 22,
// 23, 23, 23, 23, 23, 23, 23, 23, 23,
// 24, 24, 24, 24, 24, 24, 24, 24, 24,
// 25, 25, 25, 25, 25, 25, 25, 25, 25,
// 26, 26, 26, 26, 26, 26, 26, 26, 26, 26,
// 27, 27, 27, 27, 27, 27, 27, 27, 27,
// 28, 28, 28, 28, 28, 28, 28, 28, 28,
// 29, 29, 29, 29, 29, 29, 29, 29, 29,
// 30, 30, 30, 30, 30, 30, 30, 30, 30,
// 31, 31, 31, 31, 31, 31, 31, 31, 31,
// 32, 32, 32, 32, 32, 32, 32, 32, 32, 32,
// 33, 33, 33, 33, 33, 33, 33, 33, 33,
// 34, 34, 34, 34, 34, 34, 34, 34, 34,
// 35, 35, 35, 35, 35, 35, 35, 35, 35,
// 36, 36, 36, 36, 36, 36, 36, 36, 36,
// 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
// 38, 38, 38, 38, 38, 38, 38, 38, 38,
// 39, 39, 39, 39, 39,
// 40, 40, 40, 40, 40, 40, 40, 40, 40,
// 41, 41, 41, 41, 41, 41, 41, 41, 41,
// 42, 42, 42, 42, 42, 42, 42, 42, 42,

Yapısı:

shared_mutex mut;

void write() {
  // Exclusive lock
	unique_lock<shared_mutex> lock(mut); 
	...
}

void read() {
  // Shared lock
  shared_lock<shared_mutex> lock(mut); 
  ...
}

  • Write thread, diğer tüm threadler unlock olmadıkça exclusive lock yapamaz.
  • Write thread exclusive lock yaptığında; ne reader threadler shared lock ne de writer threadler exclusive locak yapabilir.
  • Reader thread, writer thread exclusive lock yapmadığı müddetçe shared lock yapabilir.
  • Birden fazla reader thread aynı anda exclusive lock yapabilir.

member functions:

Exclusive locking
locklocks the mutex, blocks if the mutex is not available
(public member function)
try_locktries to lock the mutex, returns if the mutex is not available
(public member function)
unlockunlocks the mutex
(public member function)
Shared locking
lock_sharedlocks the mutex for shared ownership, blocks if the mutex is not available
(public member function)
try_lock_sharedtries to lock the mutex for shared ownership, returns if the mutex is not available
(public member function)
unlock_sharedunlocks the mutex (shared ownership)
(public member function)

https://en.cppreference.com/w/cpp/thread/shared_mutex

Shared Data Initialization

local static değişkenler c++ 11 den itibaren thread safe olarak initialize edilebilir:

class Pikachu {
	int pika{};
	int chu{};
};

Pikachu& get_pikachu() {
	static Pikachu pika;
	return pika;
}

https://learn.microsoft.com/en-us/cpp/build/reference/zc-threadsafeinit-thread-safe-local-static-initialization?view=msvc-170

thread_local

Her thread, kendine özgü değişkene sahip olur:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

using namespace std;

thread_local int counter{};
mutex mut;

void increment() {
	for (int i = 0; i < 10; ++i) 
		++counter;

	unique_lock ul(mut);
	cout << "Thread ID: " << this_thread::get_id() << " Counter: " << counter << endl;
	
}


int main() {
	vector<thread> threads;
	cout << "Main thread counter: " << counter << endl;
	for (int i = 0; i < 10; i++)
		threads.push_back(thread(increment));

	for (auto& t : threads)
		t.join();


	cout << "Main thread counter: " << counter << endl;

}

// Main thread counter : 0
// Thread ID : 14532 Counter : 10
// Thread ID : 16912 Counter : 10
// Thread ID : 15836 Counter : 10
// Thread ID : 19800 Counter : 10
// Thread ID : 3832 Counter : 10
// Thread ID : 23720 Counter : 10
// Thread ID : 5192 Counter : 10
// Thread ID : 22604 Counter : 10
// Thread ID : 23748 Counter : 10
// Thread ID : 22732 Counter : 10
// Main thread counter : 0

call_once

sadece bir kere çağırır:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

using namespace std;

std::once_flag oflag;

void print() {
	cout << "for once in my life..." << endl;
}
void func() {
	call_once(oflag, print);
}
int main() {
	vector<thread> threads;
	for (int i = 0; i < 10; ++i) 
		threads.push_back(thread(func));
	
	for (auto& t : threads) 
		t.join();
}

// Output:
// for once in my life...


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *