[C++] Thread & Parallel

Thread 와 Parallel 알아보기
threadparallel
avatar
2025.02.11
·
19 min read

3367

Thread

씹어먹는 C ++ - <15 - 1. 동시에 실행을 시킨다고? - C++ 쓰레드(thread)>
https://modoocode.com/269

C++11 부터 표준에 쓰레드가 추가되면서, 쓰레드 사용이 편리해졌다.

#include <thread>
thread t1(func1);
t1.join();

<thread> 헤더를 포함하고, thread 객체를 생성하면 쓰레드가 만들어진다. 이 쓰레드들이 CPU 코어에 어떻게 할당되고, 언제 컨텍스트 스위치를 할 지는 전적으로 운영체제에게 달려있다. 한 코어에 쓰레드 3개가 컨텍스트 스위칭 하면서 돌아갈 수도 있다. 마지막으로 join 은, 해당하는 쓰레드들이 실행을 종료하면 리턴하는 함수이다. 만약 join 을 호출하지 않으면 쓰레드들의 내용이 채 실행되기 전에 main 함수가 종료되어 쓰레드 객체의 소멸자가 호출된다. C++ 표준에 따르면, join 되거나 detach 되지 않는 쓰레드들의 소멸자가 호출된다면 예외를 발생시키도록 명시되어 있다. detach 는 해당 쓰레드를 실행 시킨 후, 잊어버리는 것이라 생각하면 된다. 알아서 백그라운드에서 돌아가게 되고, 메인 함수는 detach 된 쓰레드가 종료되길 기다리지 않기 때문에 프로세스를 종료하게 된다.

쓰레드에 인자 전달하기

void worker(vector<int>::iterator start, vector<int>::iterator end,
            int* result);

vector<thread> workers;
for(int i=0; i<4; i++)
{
    workers.push_back(thread(worker, data.begin() + i * 2500,
                  data.begin + (i+1) * 2500, &partial_sums[i]));
}

thread 생성자의 첫 번째 인자로 함수를 전달하고, 이어서 해당 함수에 전달할 인자들을 쭈르륵 써주면 된다. 각 쓰레드에는 고유 아이디 번호가 할당되므로, 어떤 쓰레드에서 작업중인지 보고싶다면 this_thread::get_id 함수를 통해 돌아가고 있는 쓰레드의 아이디를 알 수 있다.

std::cout 과 printf

std::cout 의 경우 std::cout << A; 를 하게 된다면 A의 내용이 출력되는 동안 중간에 달느 쓰레드가 내용을 출력할 수 없게 보장해준다. 하지만 std::cout << A << B; 를 하게 되면 A 를 출력한 이후에 B 를 출력하기 전에 다른 쓰레드가 내용을 출력할 수 있다.

반면 printf"..." 안에 있는 문자열을 출력할 때, 컨텍스트 스위치가 되더라도 다른 쓰레드들이 그 사이에 메세지를 집어넣지 못하게 막는다. 따라서, 방해받지 않고 전체 메세지를 제대로 출력할 수 있게 해준다.

mutex

여러 쓰레드들이 동시에 어떠한 코드에 접근하는 것을 배제하도록 하는 객체

std::mutex m;

m.lock();
result += 1;
m.unlock();

한 번에 한 쓰레드에서만 m 의 사용 권한을 갖는다. 다른 쓰레드에서 m.lock() 을 했다면, m 을 소유한 쓰레드가 m.unlock() 을 통해 m 을 반환할 때까지 무한정 기다리게 된다. 이렇게 m.lock()m.unlock() 사이에 한 쓰레드만이 유일하게 실행할 수 있는 코드 부분을 임계 영역(critical section)이라고 부른다.

만약 unlock() 을 하지 않는다면 프로그램이 끝나지 않아 강제로 종료되게 되는데, 뮤텍스를 취득한 쓰레드가 unlock 을 하지 않으므로 다른 모든 쓰레드들이 기다리게 된다. 심지어 본인도 마찬가지로 m.lock() 을 다시 호출하게 되고, unlock 을 하지 않아 본인 역시 기다리게 된다. 결국 아무 쓰레드도 연산을 진행하지 못하게 되며, 이러한 상황을 데드락(deadlock) 이라고 한다.

std::lock_guard

void  worker(int& result, std::mutex& m)
{
    for(int i=0; i<10000; i++)
    {
        // lock 생성 시에 m.lock() 실행하는 것과 같음
        std::lock_guard<std::mutex> lock(m);
        result += 1;
        // scope를 빠져 나가면 lock이 소멸되면서 알아서 unlock
    }
}

lock_guard 객체는 뮤텍스를 인자로 받아서 생성하게 되는데, 이 때 생성자에서 뮤텍스를 lock 하게 된다. 그리고 lock_guard 가 소멸될 때 알아서 lock 했던 뮤텍스를 unlock 하게 된다. 따라서, 사용자가 unlock 을 따로 신경쓰지 않아도 되서 편리하다.

생산자와 소비자 패턴

생산자는 무언가 처리할 일을 받아오는 쓰레드를 의미하고, 소비자는 받은 일을 처리하는 쓰레드를 의미한다. 페이지를 긁어서 분석하는 프로그램을 만들었다고 생각해보자.

// 페이지를 긁어 오는 쓰레드 (생산자)
void producer(
	std::queue<std::string>* downloaded_pages,
	std::mutex* m, int index)
{
	for (int i = 0; i < 5; i++)
	{
		// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
		// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
		std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
			std::to_string(index) + ")\n";

		// data는 쓰레드 사이에서 공유되므로 critical section에 넣어야 한다
		m->lock();
		downloaded_pages->push(content);
		m->unlock();
	}
}

producer 쓰레드에서는 웹사이트에 페이지를 계속 다운로드 하는 역할을 하게 된다. 이 때, 다운로드한 페이지들을 downloaded_pages 라는 큐에 저장하게 된다.

다운받은 웹사이트 내용이 content 라고 했을 때, 다운 받은 페이지를 작업 큐에 집어 넣어야 한다. 주의할 점은, producer 쓰레드가 1개가 아니라 5개나 있다는 점이다. 따라서 downloaded_pages 에 접근하는 쓰레드들 사이에 race condition이 발생할 수 있다. 이를 방지하기 위해 뮤텍스 m 으로 해당 코드를 감싸서 문제가 발생하지 않게 해준다.

// 받은 일을 처리하는 쓰레드 (소비자)
void consumer(
	std::queue<std::string>* downloaded_pages,
	std::mutex* m, int* num_processed)
{
	// 전체 처리하는 페이지 개수가 5 * 5 = 25개
	while (*num_processed < 25)
	{
		m->lock();
		if (downloaded_pages->empty())
		{
			m->unlock(); // 여기서 unlock을 안한다면 데드락이 발생한다

			// 10ms 뒤에 다시 확인한다.
			std::this_thread::sleep_for(std::chrono::milliseconds(10));
			continue;
		}

		// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
		std::string content = downloaded_pages->front();
		downloaded_pages->pop();

		(*num_processed)++;
		m->unlock();

		// content 를 처리한다
		std::cout << content;
		std::this_thread::sleep_for(std::chrono::milliseconds(80));
	}
}

consumer 쓰레드의 입장에서는, 언제 일이 올 지 알 수 없다. 따라서 downloaded_pages 가 비어있지 않을 때까지 계속 while 루프를 돌아야 한다. 한 가지 문제는, 컴퓨터 CPU의 속도에 비해 웹사이트 정보가 큐에 추가되는 속도는 매우 느리다는 점이다. producer 의 경우 대충 100ms 마다 웹사이트 정보를 큐에 추가하게 되는데, 이 시간 동안 downloaded_empty() 문장을 수십 만번 호출할 수 있을 것이다. 이는 상당한 CPU 자원의 낭비를 유발한다. 따라서 강제로 쓰레드를 sleep 시켜서 10ms 뒤에 다시 확인하는 식으로 구현한다. 마지막으로 content 를 처리하는 과정은 간단하다. front 를 통해서 맨 앞의 원소를 얻은 뒤에, pop 을 호출하면 맨 앞의 원소를 큐에서 제거하게 된다.

condition_variable

근데 consumer 쓰레드가 10ms마다 할일이 있는지 확인하고 없으면 다시 기다리는 형태를 취하고 있는 것은 매우 비효율적이다. 언제 올 지 모르는 데이터를 확인하기 위해 지속적으로 mutexlock 하고, 큐를 확인해야 하기 때문이다. 차라리 consumer 를 재워놓고, producer 가 데이터가 생기면 consumer 를 깨우는 방식이 더 낫다. 쓰레드를 재워놓게 되면, 그 사이에 다른 쓰레드들이 일을 할 수 있기 때문에 CPU를 더 효율적으로 쓸 수 있을 것이다.

위와 같은 상황에서 쓰레드들을 10ms마다 재웠다 깨웠다 할 수 밖에 없었던 이유는 어떠한 조건을 만족할때까지 자라! 는 명령을 내릴 수 없었기 때문이다. 이는 조건 변수(condition_variable)를 통해 해결할 수 있다.

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

// 페이지를 긁어서 분석하는 프로그램

// 페이지를 긁어 오는 쓰레드
void producer(
	std::queue<std::string>* downloaded_pages,
	std::mutex* m, int index,
	std::condition_variable* cv)
{
	for (int i = 0; i < 5; i++)
	{
		// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
		// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
		std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
			std::to_string(index) + ")\n";

		// data는 쓰레드 사이에서 공유되므로 critical section에 넣어야 한다
		m->lock();
		downloaded_pages->push(content);
		m->unlock();

		// consumer에게 content가 준비되었음을 알린다.
		cv->notify_one();
	}
}

// 받은 일을 처리하는 쓰레드
void consumer(
	std::queue<std::string>* downloaded_pages,
	std::mutex* m, int* num_processed,
	std::condition_variable* cv)
{
	// 전체 처리하는 페이지 개수가 5 * 5 = 25개
	while (*num_processed < 25)
	{
		std::unique_lock<std::mutex> lk(*m);

		cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
        // 모든 페이지 처리가 끝나서 탈출한 것이라면 쓰레드를 종료해야 함
		if (*num_processed == 25)
		{
			lk.unlock();
			return;
		}

		// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
		std::string content = downloaded_pages->front();
		downloaded_pages->pop();

		(*num_processed)++;
		lk.unlock();

		// content 를 처리한다
		std::cout << content;
		std::this_thread::sleep_for(std::chrono::milliseconds(80));
	}
}

int main()
{
	// 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
	std::queue<std::string> downloaded_pages;
	std::mutex m;
	std::condition_variable cv;

	std::vector<std::thread> producers;
	for (int i = 0; i < 5; i++)
	{
		producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
	}

	int num_processed = 0;
	std::vector<std::thread> consumers;
	for (int i = 0; i < 3; i++)
	{
		consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
	}

	for (int i = 0; i < 5; i++)
	{
		producers[i].join();
	}

	// 나머지 자고 있는 쓰레드들을 모두 깨운다
	cv.notify_all();

	for (int i = 0; i < 3; i++)
	{
		consumers[i].join();
	}
}
std::unique_lock<std::mutex> lk(*m);
cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

condition_variablewait 함수에 어떤 조건이 참이 될 때까지 기다릴지 해당 조건을 인자로 전달해야 한다. downloaded_pages 에 원소들이 있거나, 전체 처리된 페이지의 개수가 25 개 일 때 wait 을 중지하도록 하였다.

조건 변수는 만일 해당 조건이 거짓이라면, lkunlock 한 뒤에, 영원히 sleep 하게 된다. 반면에 해당 조건이 참이라면 cv.wait 은 그대로 리턴하여 consumercontent 를 처리하는 부분이 그대로 실행되게 된다.

lock_guard 의 경우 생성자 말고는 따로 lock 을 할 수 없는데, unique_lockunlock 후에 다시 lock 할 수 있다. 덧붙여 unique_lock 을 사용한 이유는 cv->waitunique_lock 을 인자로 받기 때문이다.

// consumer에게 content가 준비되었음을 알린다.
cv->notify_one();

notify_one 함수는 말 그대로, 자고 있는 쓰레드 중 하나를 깨워서 조건을 다시 검사하게 해준다. 조건이 참이 된다면 그 쓰레드가 다시 일을 시작하는 것이다.

for (int i = 0; i < 5; i++)
{
   producers[i].join();
}

// 나머지 자고 있는 쓰레드들을 모두 깨운다.
cv.notify_all();

만약 cv.notify_all() 을 하지 않는다면, 자고 있는 consumer 쓰레드들의 경우 join 되지 않는 문제가 발생한다. 따라서 마지막으로 해당 함수를 호출하여 모든 쓰레드를 깨워 조건을 검사하도록 한다. 해당 시점에선 이미 num_processed 가 25가 되어있을 것이므로, 모든 쓰레드들이 잠에서 깨어나 종료하게 된다.

Parallel

public static : 네이버 블로그
https://blog.naver.com/kmc7468/221337487249

C++17 부터 STL은 병렬 알고리즘을 지원한다. 기존의 STL 알고리즘에 Execution Policy를 추가하여, 동일한 알고리즘 코드를 사용하면서도 내부적으로 병렬 처리를 적용할 수 있도록 한다.

Execution Policy

  1. std::execution::seq : 순차 실행

  2. std::execution::par : 병렬 실행

  3. std::execution::par_unseq : 병렬 및 벡터화 실행

주의할 점은, 병렬 실행을 요청했다고 해서 반드시 병렬으로 실행되는 것은 아님을 유의해야 한다.

std::for_each

int x = 0;
std::mutex m;
int a[] = {1,2};
std::for_each(std::execution::par, std::begin(a), std::end(a), [&](int)
{
    std::lock_guard<std::mutex> guard(m);
    ++x;
});

병렬 실행이 가능한 STL 함수 중 대표적인 것은 algorithm 헤더에 선언된 std::for_each 함수다. Execution Policy 는 가장 첫번째 매개 변수로 넘겨주면 된다.

par vs par_unseq

std::vector<int> v = { 1, 2, 3 };
int sum = 0;

// v에 있는 요소들의 제곱 값의 합을 구하는 코드
std::for_each(std::execution::par, std::begin(v), std::end(v), [&](int i)
{
    sum += i * i;
});

병렬로 실행되지만, 잘못된 코드이다. 경쟁 상황이 발생하기 때문이다.

Cpp
 복사
std::vector<int> v = { 1, 2, 3 };
int sum = 0;

std::mutex m;
std::for_each(std::execution::par, std::begin(v), std::end(v), [&](int i)
{
    std::lock_guard<std::mutex> lock{m};
    sum += i * i;
});

이렇게 뮤텍스를 추가해 주어야 한다. 그러나 이 상태에서 parpar_unseq 로 수정하면 데드락이 발생할 가능성이 높다. 왜냐하면, par 은 람다 함수가 호출되어 실행되는 동안 다른 쓰레드에 의해 중단되지 않음이 보장되지만, par_unseq 의 경우 람다 함수가 호출되어 실행되는 중간에 다른 쓰레드에 의해 실행이 중단될 수 있다.

즉, par 의 경우

m.lock();
sum += i * i;
m.unlock();

// 다른 쓰레드에 의해 중단됨
// 다른 쓰레드에 중단 되었지만, 이미 한 상태가 종료되어 데드락 발생 X

par_unseq 의 경우

m.lock();
// 다른 쓰레드에 의해 중단됨 
// 데드락 발생!! 코드가 더 진행되지 않음
sum += i * i;
m.unlock();

위와 같이 3개의 명령이 실행되는 중간에, 다른 쓰레드에 의해 중지될 수 있다. 그래서 데드락이 발생할 가능성이 있으며, 이러한 현상이 발생할 가능성이 있는 함수를 vectoriztion-unsafe 한 함수라고 한다. 그래서 par_unseq 를 사용하려면 vectoriztion-safe 한 함수로 바꾸어 줄 필요가 있으며, 그러려면 std::atomic 을 사용해 원자적으로 실행해야 한다.







- 컬렉션 아티클