Introduction

일반적으로 학교에서 배우는 모든 단순한 project level 은 단일 Core CPU 라고 생각하고 하나의 프로세스에서 순차적으로 작업을 해나가는것 으로 배웠다. 하지만 실무 또는 OS, Computer System 을 배우다 보면 MultiThreading 또는 MultiProcessing 이라는 이야기를 많이 한다. 그러면 일단 Process 와 Thread 의 개념을 알아야한다.

Process vs Thread

Process 라는건 OS 에서 작성한 프로그램을 실행시키는 단위라고 보면 되고, 하나의 Process 에서 여러개의 Thread 를 관리할수 있다라는것이다. 즉, 간단하게 말해서, 우리가 운동하기, 저녁먹기, 청소하기 이런것들이 Process 개념이며, 하나 운동이라는 Task 를 주어졌을때, Gym 을 가기 또는 저녁 반찬 준비하기가 Thread 라고 할수 있다.

MultiProcessing vs MultiThreading

Multiprocessing 같은 경우, 하나의 Program 을 만들고, 그 프로그램이 여러개의 Thread 를 만들어서, 여러개의 코어를 동시에 활용해서 효율성을 높이는 작업이며, Multithreading 은 하나의 CPU 에 여러개의 코어가 들어있는 경우, 그 코어들을 활용해서 동시에 여러가지 작업을 수행하는것이라고 생각하면 편하다.

단편적으로 multithreading 은 보편적으로 대세라고 보면되고, multiprocessing 같은 경우에는 여러개의 PC 들을 네트워크로 연결시켜서, 그 여러개의 PC 에 들어있는 코어를 전부 활용하는 Distributed Computing 이라고 본다. Multiprocessing 의 단점이라고 말을 할수 있는건 네트워크로 연결된 Phsycially 하게 멀어져있는 Computer 가 흩어져 있기때문에 하나의 PC 가 다른 PC 에 어떤 데이터를 가지고 있는지가 알수가 없다. 하지만 Multithread 는 여러개의 Thread 가 Memory 를 공유한다는 점에서는 장점이다. 이게 양날의 검일수도 있고 정말 잘사용하면 효율이 잘나온다.

Multithreading

![Thread](image.png)

위의 그림을 보면 Thread 1 이 시작이되고, 그 다음으로 Thread 2, 3 이렇게 시작된다 어떻게 보면 순차적으로 일어나는것과 같아보이지만, 개념상으로 어떤 Thread 가 먼저 끝낼지는 모르고, thread 3 개를 동시에 띄운다라고 생각하면 되고, 하나하나 Thread 를 실행시킨다고 하면된다. 즉 Main Thread 로 부터 시작해서, Thread 1, 2, 3 이라는 자식 Thread 를 만들어서 어떤 3000 작업을 한다고 하면 1000 개씩 각 3 개의 Thread 에게 일을 시키면된다. 아마 Ctrl + Alt + Delete 를 누르다보면, Task Manager 에서 Performance 를 가다보면 Core 개수를 확인 할수 있을거다.

C++ Thread Basics

바로 코드로 넘어가보자, 아마 이런 코드를 한번 실행시키면 굉장히 좋은 질문일것 같다. Thread 를 생성하고, while 문으로 돌린다. 근데 끝나 버린다. 이 이유 같은 경우 Main Thread 에서 Child Thread 를 만들었는데 (ID 는 모름) 근데 Main Thread 가 끝나 버린 케이스이다. 그걸 위해서는 t1.join() 이게 필요하다.

#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include <mutex>

using namespace std;
int main()
{
    const int num_process = std::thread::hardware_concurrency();
    std::thread t1 = std::thread([]() {while(true) {}});
}

만약의 위의 코드를 Debugging 용도로 사용하려면, 아래와 방식의 코드로 ID 를 Checking 할수 있다.

using namespace std;
int main()
{
    const int num_process = std::thread::hardware_concurrency();
    cout << std::this_thread::get_id() << endl;
    std::thread t1 = std::thread([]() { std::this_thread::get_id() << endl; while(true) {}});
    t1.join();
}

그 이후 간단하게 여러개의 Thread 를 만들어서, join() 을 시켜보자.

using namespace std;

int main()
{
    const int num_process = std::thread::hardware_concurrency(); // number of core
    cout << std::this_thread::get_id() << endl; // main thread

    vector<std::thread> my_threads;
    my_threads.resize(num_process);
    for (auto& e : my_threads)
    {
        e = std::thread([]() {
        cout << std::this_thread::get_id() << endl;
        while (true) {}});
    }
    for (auto& e : my_threads)
    {
        e.join();
    }
    return 0;
}

이런식으로 하였을때 보면, threadID 들이 고르지 않게 나온다. 그 이유는 여러개의 Thread 가 동시에 Spawn 이 되고, 그리고 CPU 가 열심히 일하는것을 볼수 있다. 그렇다면 아래와 같이 Lambda 함수로 간단하게 만들어보자

int main()
{
    auto work_func = [](const string& name)
	{
		for (int i = 0; i < 5; ++i)
		{
			std::this_thread::sleep_for(std::chrono::milliseconds(100));
			cout << name << " " << std::this_thread::get_id() << " is working " << i << endl;
		}
	};

    std::thread t1 = std::thread(work_func, "Jack");
	std::thread t2 = std::thread(work_func, "Nick");

    t1.join();
	t2.join();
}

이걸 Output 으로 보자면, 이것도 마찬가지로 ID 가 고르지 않을수 있다. 하지만 여기에서는, 실제 work_func 라는 lambda 함수를 사용해서 thread 를 binding 시켜서 실행하는 예제라고 보면 굉장히 쉽게 와다을수 있다. 그렇다면 어떻게 고르게, 우리가 PrintOut 할까가 문제이다. 사실 std::cout 은 t1 과 t2 가 공용으로 접근을 하려고 하기때문에, Race Condition 이 일어나서 서로 std::cout 을 하려고 난리를 칠것이다. 이것을 방지 할수 있는것이 바로 std::mutex => mutex 즉 mutual exclusive, 상호 배제라는 뜻이다. 즉 서로 못 건드리게 한다 바꿔 말하면 이건 내꺼 나만 쓸수 있어 하는 이렇게 선언을 할수 있는 존재이다. mutex 를 걸어주고, cout 그 부분에만 mtx.lock() 과 데이터의 사용을 마무리 짖는 mtx.unlock() 을 아래와 같이해주면 하나의 thread 가 일이 cout 이 끝나고 다른 Thread 가 잡아서 작업을 할수 있게 한다.

using namespace std;

mutex mtx; // mutual exclusion
int main()
{
	std::vector<float> myArr;
	myArr.resize(10);
	const int num_process = std::thread::hardware_concurrency(); // number of core
	auto work_func = [](const string& name, std::vector<float>& myArr)
		{
			for (int i = 0; i < 5; ++i)
			{
				std::this_thread::sleep_for(std::chrono::milliseconds(100));

				// Don't touch it
				mtx.lock();
				myArr.push_back(i);
				cout << name << " " << std::this_thread::get_id() << " is working " << i << endl;
				mtx.unlock();
			}
		};

    std::thread t1 = std::thread(work_func, "JackJack", std::ref(myArr));
	std::thread t2 = std::thread(work_func, "DashDash", std::ref(myArr));
	t1.join();
	t2.join();

	for (int i = 0; i < myArr.size(); i++)
	{
		cout << myArr[i] << endl;
	}
    return 0;
}

이런식으로 하면, 기본적인 std::thread 에 관련된 내용을 커버했다. 뭐든지 직접해보고 손대보고 알아나가야 진정한 공부고, 기술이다.

Race Condition

잠깐 아래와 같은 예제를 보자. 예제로 shared_memory 라는 int 타입이라는 메모리를 공유한다고 하자. 일단 실행시킨다고 가정을 한다면, shared_memory 는 1000 이 되어있을거다.

int main()
{

	int shared_memory(0);
	auto count_func = [&]()
		{
			for (int i = 0; i < 1000; ++i)
			{
				this_thread::sleep_for(chrono::microseconds(1));
				shared_memory++;
			}
		};

	std::thread t1 = thread(count_func);

	t1.join();

	cout << "After" << endl;
	cout << shared_memory << endl;

	return 0;
}

그렇다면, 여러개의 Thread 를 동시에 시킨다고 하자. 그렇다면, shared_memory 가 2000 이 되어야하지만, 그렇지 않다. 그 원인은 일단 CPU 에서 shared_memory 값을 읽어서, CPU 안에서 값을 하나 더하고 그 더해진 결과 값을 shared_memory 변수로 다시 보내는데 t1 이 10 값을 읽어드렸을 사이에 t2 가 재빨리 11 로 값을 바꾼다거나 그러다가 t1 이 다시 12 를 덮어 씌어버리기때문에 덧셈 하나가 사라진거나 마찬가지이다. 즉 동시에 카운트 역활을 주어졌을때, 일은 수행하지만, 같을 덮어씌운다는 점에서 덧셈 몇개까 빠져버린것처럼 보이기 때문이다.

int main()
{

	int shared_memory(0);
	auto count_func = [&]()
		{
			for (int i = 0; i < 1000; ++i)
			{
				this_thread::sleep_for(chrono::microseconds(1));
				shared_memory++;
			}
		};

	std::thread t1 = thread(count_func);
	std::thread t2 = thread(count_func);

	t1.join();
	t2.join();

	cout << "After" << endl;
	cout << shared_memory << endl;

	return 0;
}

이걸 해결하기 위해서는 방법 하나중에 std::atomic 이 있다. 즉 atomic 이라는건 아까 말했던 그 CPU 에서 값을 더하고, 불러오고 내보내는 이 Operation 자체를 하나의 원자로 감싸준다. 라는 의미이다.

int main()
{
	std::atomic<int> shared_memory(0);
	auto count_func = [&]()
		{
			for (int i = 0; i < 1000; ++i)
			{
				this_thread::sleep_for(chrono::microseconds(1));
				shared_memory++; // shared_memory.fetch_add(1);
			}
		};

	std::thread t1 = thread(count_func);
	std::thread t2 = thread(count_func);

	t1.join();
	t2.join();

	cout << "After" << endl;
	cout << shared_memory << endl;

	return 0;
}

그래서 위처럼 할때, 정확히 shared_memory 가 2000 이나온걸 확인 할 수 있다. 하지만, Atomic Operation 이 느려질수도 있으니 그거에 주의를 두고 사용할때만 사용하자. 우리가 위에서 봤던 예제처럼 std::mutex 를 선언한 이후에, Operation(덧셈) 에 lock() 과 unlock() 을 걸어준다면, 사실 문제는 없다. 근데 프로그래머도 실수는 할수 있으므로 그걸 사용한 std::lock_guard 가 있다. 또 std17 에서는 std::scoped_lock() 을 사용하면 된다.

mutex mtx;
int main()
{
	std::atomic<int> shared_memory(0);
	auto count_func = [&]()
		{
			for (int i = 0; i < 1000; ++i)
			{
				std::lock_guard lock(mtx);
				shared_memory++; // shared_memory.fetch_add(1);
			}
		};

	std::thread t1 = thread(count_func);
	std::thread t2 = thread(count_func);

	t1.join();
	t2.join();

	cout << "After" << endl;
	cout << shared_memory << endl;

	return 0;
}

주의할 점은 this_thread::sleep_for(chrono::milliseconds(1)) 이 부분을 지운다고 하면, 동작을 제대로 할수 있다. 그 이유로는 일단 t1 이 다 더해버리고, t2 가 더할시점에서, t1 이 그냥 다 더하기 때문에 실제로는 두개가 처리가 안됬을수도 있기때문에 병렬처럼 처리한것 처럼 보일수 있다.

Task (작업) 기반 Asynchronous Programming

위에서 봤던 내용 처럼, Thread 로 어떠한 작업을 functor 또는 Lambda 함수로 지정해서 Task 를 parallel 하게 수행할수 있었다. 다른 방법으로는 어떤 Task 기반으로 되는 future, std::async() 를 사용하는 방법이다. Thread 와 비슷하게 사용할수 있지만, 조금 다르게 작동한다는걸 확인 해야하고, 공식 문서에도 이 방법이 선호되고 있는 추세이다. 일단 한번 비교를 하기위해서 두개의 코드를 봐보자.

#include <iostream>
#include <future>
#include <thread>
int main()
{
	// Thread
	int result;
	std::thread t1([&] {result = 1 + 2; });
	t1.join(); 
	cout << result << endl;

	// Future [Async]
	auto fut = std::async([] { return 1 + 2; });
	cout << fut.get() << endl;
}

위의 코드를 확인 하면, Thread 같은 경우 join() 이라는 함수를 통해서, Thread 에서 행해지는 작업을 기다리는거고, async 는 어떤 Task 를 미리 지정해주고, std::future<int> 라는 것을 통해서 값을 받아서, 작업이 끝나면, fut.get() 을 통해서 작업이 끝났다는걸 알수 있다. (즉 .get() 은 어떤 Task 가 있을때까지 기다렸다가 받을수 있는 형태로 되어있다는 소리). 아주 미묘한 차이이지만, Thread 에 t1.join() 이 없더라면 또는 어떤 작업에따라서, main thread 가 죽을수도 있는 현상이 발생할수도 있다. 하지만 그와 반대로 async 는 어떤 특정 작업이 진행해야한다라고 지정한 이후 (미래)에 끝날때까지를 기다리는거다라고 확인해서, 조금 편할수도 있다도 되겠다.

그렇다면 둘중 하나만 써야하냐? 그건 아니다. thread 와 future 를 같이 사용할수 있지만, promise (약속) 을 해야한다. 아래의 코드를 보자면, promise 로 부터 future 를 return 값으로 받는다. 이거 같은 경우는 이 Promise 가 처리가 되서 잘나오는지를 Return 값으로 받기위해서 넣어주고, r-value reference 로 thread 의 lambda 함수에 인자로 넘겨준다. 그리고 이 처리가 잘끝나면, std::move 를 통해서 promise 값이 나와서, future.get() 으로 받을수 있다. 여기서 중요한거는 debugging 을 해보면 처음에는 prom 과 fut 값이 pending 이라는걸 확인 할 수 있다. 즉 아직 값을 받을 준비 또는 처리단계를 거치지 못했다 라고 볼수 있다.

또 여기서 의문점이라고 할수 있는건 std::thread 대신에, 그냥 std::async() 쓰면 되지 않느냐라고 물어 볼수 있는데, 또 다르게 생각해보면, 그렇게 된다면, promise 를 애초에 쓸필요가 없어진다.

#include <iostream>
#include <future>
#include <thread>
int main()
{
	std::promise<int> prom;
	auto fut = prom.get_future();

	auto t = std::thread([](std::promise<int>&& prom)
			{
				prom.set_value(1 + 2);
			}, std::move(prom));

	cout << fut.get() << endl;
	t.join();
	return 0;
}

여러개를 Thread 를 이용했던것처럼, std::async() 도 여러개를 사용할수 있다. 아래의 코드를 한번 봐보자. 아래의 코드를 수행시에는 main 이 시작되고, async1 이 먼저 시작이되고 그이후에 async2 가 시작된다. 근데 sleep 조건으로 인해서 async2 가 끝나고 async1 이 끝나게 되어있다. 이러면 일단 Parallel 하게 작업한다는걸 짐작할수 있다.

만약에 return 값을 지정을 안한다고 하면 어떻게 될까? 그렇다면, async1 start 가 print 가 되고 async1 end 그다음에 async2 start, async2 end 그 이후에 main 이 작동된다.

그리고 만약에 이 예제를 thread 로 바꾸고 thread 로 바꾸면, 우리의 기댓값과는 다르게 작동이 된다. 이걸 통해서, async 와 thread 의 방식이 다르다는 점과 조심해야할 점을 생각할수 있다.

#include <iostream>
#include <future>
#include <thread>
int main()
{
	auto f1 = std::async([] {
			cout << "async1 start" << endl;
			this_thread::sleep_for(chrono::seconds(2));
			cout << "async1 end" << endl;
			});

		auto f2 = std::async([] {
			cout << "async2 start" << endl;
			this_thread::sleep_for(chrono::seconds(1));
			cout << "async2 end" << endl;
			});

		cout << "Main Function" << endl;
}

Example: Calculating Inner Product

#include <iostream>
#include <chrono>
#include <mutex>
#include <random>
#include <utility>
#include <vector>
#include <atomic>
#include <numeric>   // std::inner_product
#include <execution> // Parallel Execution
#include <future>
#include <thread>

using namespace std;
mutex mtx;

void dotProductNaive(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end, unsigned long long& sumOut)
{
	for (unsigned i = i_start; i < i_end; ++i)
	{
		sumOut += v0[i] * v1[i];
	}
}

void dotProductLock(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end, unsigned long long& sumOut)
{
	for (unsigned i = i_start; i < i_end; ++i)
	{
		std::scoped_lock lock(mtx);
		sumOut += v0[i] * v1[i];
	}
}

void dotProductAtomic(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end, atomic<unsigned long long>& sumOut)
{
	for (unsigned i = i_start; i < i_end; ++i)
	{
		sumOut += v0[i] * v1[i];
	}
}

auto dotProductFuture(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end)
{
	int sum = 0;
	for (unsigned i = i_start; i < i_end; ++i)
	{
		sum += v0[i] * v1[i];
	}
	return sum;
}

int main()
{
	const long long n_data = 100'000'000;
	const unsigned n_threads = 4;

	std::vector<int> v0, v1;
	v0.reserve(n_data);
	v1.reserve(n_data);

	random_device seed;
	mt19937 engine(seed());

	uniform_int_distribution<> uniformDist(1, 10);
	for (long long i = 0; i < n_data; ++i)
	{
		v0.push_back(uniformDist(engine));
		v1.push_back(uniformDist(engine));
	}

	// inner product from c++ library
	{
		const auto sta = chrono::steady_clock::now();
		const auto sum = std::inner_product(v0.begin(), v0.end(), v1.begin(), 0ull);
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << dur.count() << endl;
		cout << sum << endl;
		cout << endl;
	}

	// Naive (Race Condition)
	{
		const auto sta = chrono::steady_clock::now();
		unsigned long long sum = 0;
		vector<thread> threads;
		threads.resize(n_threads);

		const unsigned n_per_thread = n_data / n_threads; // assumes the remainder = 0
		for (unsigned t = 0; t < n_threads; ++t)
		{
			threads[t] = std::thread(dotProductNaive, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread, std::ref(sum));
		}

		for (unsigned t = 0; t < n_threads; ++t)
		{
			threads[t].join();
		}
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << dur.count() << endl;
		cout << sum << endl;
		cout << endl;
	}

	// Lock Guard
	{
		const auto sta = chrono::steady_clock::now();
		unsigned long long sum = 0;
		vector<thread> threads;
		threads.resize(n_threads);

		const unsigned n_per_thread = n_data / n_threads; // assumes the remainder = 0
		for (unsigned t = 0; t < n_threads; ++t)
		{
			threads[t] = std::thread(dotProductLock, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread, std::ref(sum));
		}

		for (unsigned t = 0; t < n_threads; ++t)
		{
			threads[t].join();
		}
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << dur.count() << endl;
		cout << sum << endl;
		cout << endl;
	}

	// atomic
	{
		const auto sta = chrono::steady_clock::now();
		std::atomic<unsigned long long> sum = 0;
		vector<thread> threads;
		threads.resize(n_threads);

		const unsigned n_per_thread = n_data / n_threads; // assumes the remainder = 0
		for (unsigned t = 0; t < n_threads; ++t)
		{
			threads[t] = std::thread(dotProductAtomic, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread, std::ref(sum));
		}

		for (unsigned t = 0; t < n_threads; ++t)
		{
			threads[t].join();
		}
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << dur.count() << endl;
		cout << sum << endl;
		cout << endl;
	}

	// future
	{
		const auto sta = chrono::steady_clock::now();
		unsigned long long sum = 0;
		vector<std::future<int>> futures;
		futures.resize(n_threads);
		const unsigned n_per_thread = n_data / n_threads;
		for (unsigned t = 0; t < n_threads; ++t)
		{
			futures[t] = std::async(dotProductFuture, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread);
		}

		for (unsigned t = 0; t < n_threads; ++t)
		{
			futures[t].get();
		}
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << dur.count() << endl;
		cout << sum << endl;
		cout << endl;
	}

	// transform reduce
	{
		const auto sta = chrono::steady_clock::now();
		const auto sum = std::transform_reduce(std::execution::par, v0.begin(), v0.end(), v1.begin(), 0ull);
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << dur.count() << endl;
		cout << sum << endl;
		cout << endl;
	}

	return 0;
}

© 2021. All rights reserved.