Introduction
일반적으로 학교에서 배우는 모든 단순한 project level 은 단일 Core CPU 라고 생각하고 하나의 프로세스에서 순차적으로 작업을 해나가는것 으로 배웠다. 하지만 실무 또는 OS, Computer System 을 배우다 보면 MultiThreading 또는 MultiProcessing 이라는 이야기를 많이 한다. 그러면 일단 Process 와 Thread 의 개념을 알아야한다.
Process vs Thread
Process 라는건 OS 에서 작성한 프로그램을 실행시키는 단위라고 보면 되고, 하나의 Process 에서 여러개의 Thread 를 관리할수 있다라는것이다. 즉, 간단하게 말해서, 우리가 운동하기, 저녁먹기, 청소하기 이런것들이 Process 개념이며, 하나 운동이라는 Task 를 주어졌을때, Gym 을 가기 또는 저녁 반찬 준비하기가 Thread 라고 할수 있다.
MultiProcessing vs MultiThreading
Multiprocessing 같은 경우, 하나의 Program 을 만들고, 그 프로그램이 여러개의 Thread 를 만들어서, 여러개의 코어를 동시에 활용해서 효율성을 높이는 작업이며, Multithreading 은 하나의 CPU 에 여러개의 코어가 들어있는 경우, 그 코어들을 활용해서 동시에 여러가지 작업을 수행하는것이라고 생각하면 편하다.
단편적으로 multithreading 은 보편적으로 대세라고 보면되고, multiprocessing 같은 경우에는 여러개의 PC 들을 네트워크로 연결시켜서, 그 여러개의 PC 에 들어있는 코어를 전부 활용하는 Distributed Computing 이라고 본다. Multiprocessing 의 단점이라고 말을 할수 있는건 네트워크로 연결된 Phsycially 하게 멀어져있는 Computer 가 흩어져 있기때문에 하나의 PC 가 다른 PC 에 어떤 데이터를 가지고 있는지가 알수가 없다. 하지만 Multithread 는 여러개의 Thread 가 Memory 를 공유한다는 점에서는 장점이다. 이게 양날의 검일수도 있고 정말 잘사용하면 효율이 잘나온다.
Multithreading
위의 그림을 보면 Thread 1 이 시작이되고, 그 다음으로 Thread 2, 3 이렇게 시작된다 어떻게 보면 순차적으로 일어나는것과 같아보이지만, 개념상으로 어떤 Thread 가 먼저 끝낼지는 모르고, thread 3 개를 동시에 띄운다라고 생각하면 되고, 하나하나 Thread 를 실행시킨다고 하면된다. 즉 Main Thread 로 부터 시작해서, Thread 1, 2, 3 이라는 자식 Thread 를 만들어서 어떤 3000 작업을 한다고 하면 1000 개씩 각 3 개의 Thread 에게 일을 시키면된다. 아마 Ctrl + Alt + Delete
를 누르다보면, Task Manager 에서 Performance 를 가다보면 Core 개수를 확인 할수 있을거다.
C++ Thread Basics
바로 코드로 넘어가보자, 아마 이런 코드를 한번 실행시키면 굉장히 좋은 질문일것 같다. Thread 를 생성하고, while 문으로 돌린다. 근데 끝나 버린다. 이 이유 같은 경우 Main Thread 에서 Child Thread 를 만들었는데 (ID 는 모름) 근데 Main Thread 가 끝나 버린 케이스이다. 그걸 위해서는 t1.join()
이게 필요하다.
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include <mutex>
using namespace std;
int main()
{
const int num_process = std::thread::hardware_concurrency();
std::thread t1 = std::thread([]() {while(true) {}});
}
만약의 위의 코드를 Debugging 용도로 사용하려면, 아래와 방식의 코드로 ID 를 Checking 할수 있다.
using namespace std;
int main()
{
const int num_process = std::thread::hardware_concurrency();
cout << std::this_thread::get_id() << endl;
std::thread t1 = std::thread([]() { std::this_thread::get_id() << endl; while(true) {}});
t1.join();
}
그 이후 간단하게 여러개의 Thread 를 만들어서, join() 을 시켜보자.
using namespace std;
int main()
{
const int num_process = std::thread::hardware_concurrency(); // number of core
cout << std::this_thread::get_id() << endl; // main thread
vector<std::thread> my_threads;
my_threads.resize(num_process);
for (auto& e : my_threads)
{
e = std::thread([]() {
cout << std::this_thread::get_id() << endl;
while (true) {}});
}
for (auto& e : my_threads)
{
e.join();
}
return 0;
}
이런식으로 하였을때 보면, threadID 들이 고르지 않게 나온다. 그 이유는 여러개의 Thread 가 동시에 Spawn 이 되고, 그리고 CPU 가 열심히 일하는것을 볼수 있다. 그렇다면 아래와 같이 Lambda 함수로 간단하게 만들어보자
int main()
{
auto work_func = [](const string& name)
{
for (int i = 0; i < 5; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
cout << name << " " << std::this_thread::get_id() << " is working " << i << endl;
}
};
std::thread t1 = std::thread(work_func, "Jack");
std::thread t2 = std::thread(work_func, "Nick");
t1.join();
t2.join();
}
이걸 Output 으로 보자면, 이것도 마찬가지로 ID 가 고르지 않을수 있다. 하지만 여기에서는, 실제 work_func 라는 lambda 함수를 사용해서 thread 를 binding 시켜서 실행하는 예제라고 보면 굉장히 쉽게 와다을수 있다. 그렇다면 어떻게 고르게, 우리가 PrintOut 할까가 문제이다. 사실 std::cout
은 t1 과 t2 가 공용으로 접근을 하려고 하기때문에, Race Condition 이 일어나서 서로 std::cout 을 하려고 난리를 칠것이다. 이것을 방지 할수 있는것이 바로 std::mutex
=> mutex 즉 mutual exclusive, 상호 배제라는 뜻이다. 즉 서로 못 건드리게 한다 바꿔 말하면 이건 내꺼 나만 쓸수 있어 하는 이렇게 선언을 할수 있는 존재이다. mutex 를 걸어주고, cout 그 부분에만 mtx.lock() 과 데이터의 사용을 마무리 짖는 mtx.unlock() 을 아래와 같이해주면 하나의 thread 가 일이 cout 이 끝나고 다른 Thread 가 잡아서 작업을 할수 있게 한다.
using namespace std;
mutex mtx; // mutual exclusion
int main()
{
std::vector<float> myArr;
myArr.resize(10);
const int num_process = std::thread::hardware_concurrency(); // number of core
auto work_func = [](const string& name, std::vector<float>& myArr)
{
for (int i = 0; i < 5; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Don't touch it
mtx.lock();
myArr.push_back(i);
cout << name << " " << std::this_thread::get_id() << " is working " << i << endl;
mtx.unlock();
}
};
std::thread t1 = std::thread(work_func, "JackJack", std::ref(myArr));
std::thread t2 = std::thread(work_func, "DashDash", std::ref(myArr));
t1.join();
t2.join();
for (int i = 0; i < myArr.size(); i++)
{
cout << myArr[i] << endl;
}
return 0;
}
이런식으로 하면, 기본적인 std::thread
에 관련된 내용을 커버했다. 뭐든지 직접해보고 손대보고 알아나가야 진정한 공부고, 기술이다.
Race Condition
잠깐 아래와 같은 예제를 보자. 예제로 shared_memory 라는 int 타입이라는 메모리를 공유한다고 하자. 일단 실행시킨다고 가정을 한다면, shared_memory
는 1000 이 되어있을거다.
int main()
{
int shared_memory(0);
auto count_func = [&]()
{
for (int i = 0; i < 1000; ++i)
{
this_thread::sleep_for(chrono::microseconds(1));
shared_memory++;
}
};
std::thread t1 = thread(count_func);
t1.join();
cout << "After" << endl;
cout << shared_memory << endl;
return 0;
}
그렇다면, 여러개의 Thread 를 동시에 시킨다고 하자. 그렇다면, shared_memory
가 2000 이 되어야하지만, 그렇지 않다. 그 원인은 일단 CPU 에서 shared_memory
값을 읽어서, CPU 안에서 값을 하나 더하고 그 더해진 결과 값을 shared_memory
변수로 다시 보내는데 t1 이 10 값을 읽어드렸을 사이에 t2 가 재빨리 11 로 값을 바꾼다거나 그러다가 t1 이 다시 12 를 덮어 씌어버리기때문에 덧셈 하나가 사라진거나 마찬가지이다. 즉 동시에 카운트 역활을 주어졌을때, 일은 수행하지만, 같을 덮어씌운다는 점에서 덧셈 몇개까 빠져버린것처럼 보이기 때문이다.
int main()
{
int shared_memory(0);
auto count_func = [&]()
{
for (int i = 0; i < 1000; ++i)
{
this_thread::sleep_for(chrono::microseconds(1));
shared_memory++;
}
};
std::thread t1 = thread(count_func);
std::thread t2 = thread(count_func);
t1.join();
t2.join();
cout << "After" << endl;
cout << shared_memory << endl;
return 0;
}
이걸 해결하기 위해서는 방법 하나중에 std::atomic
이 있다. 즉 atomic 이라는건 아까 말했던 그 CPU 에서 값을 더하고, 불러오고 내보내는 이 Operation 자체를 하나의 원자로 감싸준다. 라는 의미이다.
int main()
{
std::atomic<int> shared_memory(0);
auto count_func = [&]()
{
for (int i = 0; i < 1000; ++i)
{
this_thread::sleep_for(chrono::microseconds(1));
shared_memory++; // shared_memory.fetch_add(1);
}
};
std::thread t1 = thread(count_func);
std::thread t2 = thread(count_func);
t1.join();
t2.join();
cout << "After" << endl;
cout << shared_memory << endl;
return 0;
}
그래서 위처럼 할때, 정확히 shared_memory
가 2000 이나온걸 확인 할 수 있다. 하지만, Atomic Operation 이 느려질수도 있으니 그거에 주의를 두고 사용할때만 사용하자. 우리가 위에서 봤던 예제처럼 std::mutex 를 선언한 이후에, Operation(덧셈) 에 lock() 과 unlock() 을 걸어준다면, 사실 문제는 없다. 근데 프로그래머도 실수는 할수 있으므로 그걸 사용한 std::lock_guard
가 있다. 또 std17 에서는 std::scoped_lock()
을 사용하면 된다.
mutex mtx;
int main()
{
std::atomic<int> shared_memory(0);
auto count_func = [&]()
{
for (int i = 0; i < 1000; ++i)
{
std::lock_guard lock(mtx);
shared_memory++; // shared_memory.fetch_add(1);
}
};
std::thread t1 = thread(count_func);
std::thread t2 = thread(count_func);
t1.join();
t2.join();
cout << "After" << endl;
cout << shared_memory << endl;
return 0;
}
주의할 점은 this_thread::sleep_for(chrono::milliseconds(1))
이 부분을 지운다고 하면, 동작을 제대로 할수 있다. 그 이유로는 일단 t1 이 다 더해버리고, t2 가 더할시점에서, t1 이 그냥 다 더하기 때문에 실제로는 두개가 처리가 안됬을수도 있기때문에 병렬처럼 처리한것 처럼 보일수 있다.
Task (작업) 기반 Asynchronous Programming
위에서 봤던 내용 처럼, Thread 로 어떠한 작업을 functor 또는 Lambda 함수로 지정해서 Task 를 parallel 하게 수행할수 있었다. 다른 방법으로는 어떤 Task 기반으로 되는 future
, std::async()
를 사용하는 방법이다. Thread 와 비슷하게 사용할수 있지만, 조금 다르게 작동한다는걸 확인 해야하고, 공식 문서에도 이 방법이 선호되고 있는 추세이다. 일단 한번 비교를 하기위해서 두개의 코드를 봐보자.
#include <iostream>
#include <future>
#include <thread>
int main()
{
// Thread
int result;
std::thread t1([&] {result = 1 + 2; });
t1.join();
cout << result << endl;
// Future [Async]
auto fut = std::async([] { return 1 + 2; });
cout << fut.get() << endl;
}
위의 코드를 확인 하면, Thread 같은 경우 join() 이라는 함수를 통해서, Thread 에서 행해지는 작업을 기다리는거고, async 는 어떤 Task 를 미리 지정해주고, std::future<int>
라는 것을 통해서 값을 받아서, 작업이 끝나면, fut.get()
을 통해서 작업이 끝났다는걸 알수 있다. (즉 .get()
은 어떤 Task 가 있을때까지 기다렸다가 받을수 있는 형태로 되어있다는 소리). 아주 미묘한 차이이지만, Thread 에 t1.join() 이 없더라면 또는 어떤 작업에따라서, main thread 가 죽을수도 있는 현상이 발생할수도 있다. 하지만 그와 반대로 async 는 어떤 특정 작업이 진행해야한다라고 지정한 이후 (미래)에 끝날때까지를 기다리는거다라고 확인해서, 조금 편할수도 있다도 되겠다.
그렇다면 둘중 하나만 써야하냐? 그건 아니다. thread 와 future 를 같이 사용할수 있지만, promise
(약속) 을 해야한다. 아래의 코드를 보자면, promise 로 부터 future 를 return 값으로 받는다. 이거 같은 경우는 이 Promise 가 처리가 되서 잘나오는지를 Return 값으로 받기위해서 넣어주고, r-value reference 로 thread 의 lambda 함수에 인자로 넘겨준다. 그리고 이 처리가 잘끝나면, std::move 를 통해서 promise 값이 나와서, future.get() 으로 받을수 있다. 여기서 중요한거는 debugging 을 해보면 처음에는 prom 과 fut 값이 pending
이라는걸 확인 할 수 있다. 즉 아직 값을 받을 준비 또는 처리단계를 거치지 못했다 라고 볼수 있다.
또 여기서 의문점이라고 할수 있는건 std::thread 대신에, 그냥 std::async() 쓰면 되지 않느냐라고 물어 볼수 있는데, 또 다르게 생각해보면, 그렇게 된다면, promise 를 애초에 쓸필요가 없어진다.
#include <iostream>
#include <future>
#include <thread>
int main()
{
std::promise<int> prom;
auto fut = prom.get_future();
auto t = std::thread([](std::promise<int>&& prom)
{
prom.set_value(1 + 2);
}, std::move(prom));
cout << fut.get() << endl;
t.join();
return 0;
}
여러개를 Thread 를 이용했던것처럼, std::async()
도 여러개를 사용할수 있다. 아래의 코드를 한번 봐보자. 아래의 코드를 수행시에는 main
이 시작되고, async1
이 먼저 시작이되고 그이후에 async2
가 시작된다. 근데 sleep 조건으로 인해서 async2
가 끝나고 async1
이 끝나게 되어있다. 이러면 일단 Parallel 하게 작업한다는걸 짐작할수 있다.
만약에 return 값을 지정을 안한다고 하면 어떻게 될까? 그렇다면, async1 start 가 print 가 되고 async1 end 그다음에 async2 start, async2 end 그 이후에 main 이 작동된다.
그리고 만약에 이 예제를 thread 로 바꾸고 thread 로 바꾸면, 우리의 기댓값과는 다르게 작동이 된다. 이걸 통해서, async 와 thread 의 방식이 다르다는 점과 조심해야할 점을 생각할수 있다.
#include <iostream>
#include <future>
#include <thread>
int main()
{
auto f1 = std::async([] {
cout << "async1 start" << endl;
this_thread::sleep_for(chrono::seconds(2));
cout << "async1 end" << endl;
});
auto f2 = std::async([] {
cout << "async2 start" << endl;
this_thread::sleep_for(chrono::seconds(1));
cout << "async2 end" << endl;
});
cout << "Main Function" << endl;
}
Example: Calculating Inner Product
#include <iostream>
#include <chrono>
#include <mutex>
#include <random>
#include <utility>
#include <vector>
#include <atomic>
#include <numeric> // std::inner_product
#include <execution> // Parallel Execution
#include <future>
#include <thread>
using namespace std;
mutex mtx;
void dotProductNaive(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end, unsigned long long& sumOut)
{
for (unsigned i = i_start; i < i_end; ++i)
{
sumOut += v0[i] * v1[i];
}
}
void dotProductLock(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end, unsigned long long& sumOut)
{
for (unsigned i = i_start; i < i_end; ++i)
{
std::scoped_lock lock(mtx);
sumOut += v0[i] * v1[i];
}
}
void dotProductAtomic(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end, atomic<unsigned long long>& sumOut)
{
for (unsigned i = i_start; i < i_end; ++i)
{
sumOut += v0[i] * v1[i];
}
}
auto dotProductFuture(const vector<int>& v0, const vector<int>& v1, const unsigned i_start, const unsigned i_end)
{
int sum = 0;
for (unsigned i = i_start; i < i_end; ++i)
{
sum += v0[i] * v1[i];
}
return sum;
}
int main()
{
const long long n_data = 100'000'000;
const unsigned n_threads = 4;
std::vector<int> v0, v1;
v0.reserve(n_data);
v1.reserve(n_data);
random_device seed;
mt19937 engine(seed());
uniform_int_distribution<> uniformDist(1, 10);
for (long long i = 0; i < n_data; ++i)
{
v0.push_back(uniformDist(engine));
v1.push_back(uniformDist(engine));
}
// inner product from c++ library
{
const auto sta = chrono::steady_clock::now();
const auto sum = std::inner_product(v0.begin(), v0.end(), v1.begin(), 0ull);
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << dur.count() << endl;
cout << sum << endl;
cout << endl;
}
// Naive (Race Condition)
{
const auto sta = chrono::steady_clock::now();
unsigned long long sum = 0;
vector<thread> threads;
threads.resize(n_threads);
const unsigned n_per_thread = n_data / n_threads; // assumes the remainder = 0
for (unsigned t = 0; t < n_threads; ++t)
{
threads[t] = std::thread(dotProductNaive, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread, std::ref(sum));
}
for (unsigned t = 0; t < n_threads; ++t)
{
threads[t].join();
}
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << dur.count() << endl;
cout << sum << endl;
cout << endl;
}
// Lock Guard
{
const auto sta = chrono::steady_clock::now();
unsigned long long sum = 0;
vector<thread> threads;
threads.resize(n_threads);
const unsigned n_per_thread = n_data / n_threads; // assumes the remainder = 0
for (unsigned t = 0; t < n_threads; ++t)
{
threads[t] = std::thread(dotProductLock, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread, std::ref(sum));
}
for (unsigned t = 0; t < n_threads; ++t)
{
threads[t].join();
}
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << dur.count() << endl;
cout << sum << endl;
cout << endl;
}
// atomic
{
const auto sta = chrono::steady_clock::now();
std::atomic<unsigned long long> sum = 0;
vector<thread> threads;
threads.resize(n_threads);
const unsigned n_per_thread = n_data / n_threads; // assumes the remainder = 0
for (unsigned t = 0; t < n_threads; ++t)
{
threads[t] = std::thread(dotProductAtomic, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread, std::ref(sum));
}
for (unsigned t = 0; t < n_threads; ++t)
{
threads[t].join();
}
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << dur.count() << endl;
cout << sum << endl;
cout << endl;
}
// future
{
const auto sta = chrono::steady_clock::now();
unsigned long long sum = 0;
vector<std::future<int>> futures;
futures.resize(n_threads);
const unsigned n_per_thread = n_data / n_threads;
for (unsigned t = 0; t < n_threads; ++t)
{
futures[t] = std::async(dotProductFuture, std::ref(v0), std::ref(v1), t * n_per_thread, (t + 1) * n_per_thread);
}
for (unsigned t = 0; t < n_threads; ++t)
{
futures[t].get();
}
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << dur.count() << endl;
cout << sum << endl;
cout << endl;
}
// transform reduce
{
const auto sta = chrono::steady_clock::now();
const auto sum = std::transform_reduce(std::execution::par, v0.begin(), v0.end(), v1.begin(), 0ull);
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << dur.count() << endl;
cout << sum << endl;
cout << endl;
}
return 0;
}