MPI Programming with c/c++ (part2)
MPI programming example in c++
Jan 21, 2024
서론
안녕하세요! 이번 시간에는 지난번에 마무리 짓지 못한 MPI programming 관련 글을 소개할 예정입니다!
내용으로는 MPI Probe, MPI_Scatter, MPI_Gather, MPI_Allgather에 대해 다룰 예정입니다~!
Dynamic Receiving with MPI probe
- 이전 시간에 MPI_send와 MPI_Recv 를 활용해 P2P 방식을 살펴보았습니다.
- 여기에 더해서 특정 message의 보내는 총 길이를 인지한채 메세지를 보내는 방법에 대해서만 살펴봤네요
- 그런데 MPI는 원래 Dynamic message를 주고받는 것도 지원해준다고 합니다.
- 해당 기능을 실행하려면 몇 가지의 function들을 사용해야 합니다.
- 이번 1에서는 이와 관련된 내용을 살펴보겠습니다.
1. MPI_status 구조체
- 이전 시간에 살펴봤듯이 MPI_Recv는 MPI_status 구조체의 주소값을 인자로 받습니다.
- MPI_status_Ignore를 통해 이를 받지 않을 수도 있죠..!
- 만약 MPI_Recv에 MPI_status 구조체를 넘겨준다면 세 가지 중요한 정보를 담고 있어요
- sender의 rank
- MPI_source로 접근할 수 있습니다.
MPI_status stat stat.MPI_SOURCE
- MPI_TAG로 접근할 수 있습니다.
MPI_status stat stat.MPI_TAG
- 이 값은 미리 결정된 값이 존재하지 않아서 MPI_Get_count를 사용해서 접근합니다.
MPI_Get_count( MPI_Status* status, MPI_Datatype datatype, int* count)
- 결과적으로 MPI_status 포인터와 Datatype를 넘겨주고 count에 원하는 정보가 저장됩니다.
Q. 그래서 근데 이 MPI_status 구조체를 왜 알아야하는걸까요?
A. MPI_Recv는 MPI_ANY_SOURCE, MPI_ANY_TAG 를 받는다면 위에서 원하는 Dynamic message 가 안된다.
➕ 또한 실제 sender의 정보 그리고 tag도 알고 싶다는 더더욱 이 구조체를 사용해야합니다.더욱이, MPI_Recv 는 function 호출 시에 인자로 들어온 message 전체를 받는 것을 장담하지 못합니다. ㅎㅎ이 대신, 실제로 Recv한테 전송된 message만 체크하고 보낼 수 있는 크기보다 더 보내면 error를 반환합니다.
→ 따라서 MPI_Status를 사용해야 한다는 것입니다!
MPI_probe로 message size 알아내기
- 그냥 비효율적인 큰 buffer를 제공하는 것보다 MPI_probe를 이용해 message의 크기를 파악해서 buffer를 사용하는 것이 좋다고 합니다.
MPI_Probe( int source, int tag, MPI_Comm comm, MPI_Status* status)
- MPI_Recv와 비슷한 역할을 수행합니다.
- MPI_probe는 특정 sender와 tag인지 필터링하는 기능을 갖고 있습니다.
MPI_Probe(0, 0, MPI_COMM_WORLD, &status); // When probe returns, the status object has the size and other // attributes of the incoming message. Get the message size MPI_Get_count(&status, MPI_INT, &number_amount); // Allocate a buffer to hold the incoming numbers int* number_buf = (int*)malloc(sizeof(int) * number_amount); // Now receive the message with the allocated buffer MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); printf("1 dynamically received %d numbers from 0.\n", number_amount); free(number_buf);
- 먼저 특정 sender가 보낸 경우에서만 MPI_status 정보를 채워넣습니다.
- MPI_Get_count를 통해 message 총 길이를 구합니다.
- 그 길이에 맞는 buffer를 만들어서 Recv를 통해 message를 받습니다.
Random walk
- 앞서 살펴본 MPI Probe를 활용한 간단한 예제를 살펴보겠습니다.
문제 정의
→ Min, Max 값과 walker W가 주어졌을 때, W는 S번의 random walk을 오른쪽으로 실행합니다.
→ 이때 범위를 벗어나면 다시 순환됩니다.
어떻게 병렬 처리로 구현할 수 있을까?
- 전체 MIN, MAX 거리를 process의 개수를 최대한 균등하게 잘라서 할당합니다.
- 0~20 까지 range와 사용 가능한 프로세스가 총 4개라면,
- process 0, 1, 2, 3 으로 나눠서 배분할 수 있는거죠.
- 그리고 walker를 초기화해줘야 합니다.
- 이 walker의 규칙은 아래처럼 정리할 수 있어요!
- process 0에서 4보다 큰 크기로 walk를 진행하면 process 0는 process 1과 통신해서 walker를 이동시켜야 합니다.
- process 1은 walker를 전달받아 움직임이 총 6을 넘기전까지 계속 움직이고 6을 넘기면 또 process 2로 walker를 움직여야 합니다.
그럼 어떻게 코드로 구현할 수 있을까?
이 코드는 MPI_Send와 MPI_Recv를 활용해서 구현할 수 있다. 이 코드를 구현하려면 한 가지 규칙을 다시 되짚어야 합니다.
walker는 현재 남은 걸음의 수와 현재 위치를 저장하고 있어야 한다.
해당 블로그에서 이 코드의 주제를 “domain decomposition”이라고 명시했습니다.
- 아무래도 병렬처리를 위해 쪼개다보니 그렇게 이름을 지은 것이 아닐까 싶네요.
구현한 함수에서 domain의 size와 walker에게 맞는 subdomain을 할당해줄 것 입니다.
decompose_domain 함수
void decompose_domain(int domain_size, int world_rank, int world_size, int* subdomain_start, int* subdomain_size) { if (world_size > domain_size) { // Don't worry about this special case. Assume the domain // size is greater than the world size. MPI_Abort(MPI_COMM_WORLD, 1); } *subdomain_start = domain_size / world_size * world_rank; *subdomain_size = domain_size / world_size; if (world_rank == world_size - 1) { // Give remainder to last process *subdomain_size += domain_size % world_size; } }
- 해당 함수는 domain를 decomposition 하는 과정을 담당합니다.
- MPI_Abort함수를 통해서 에러를 처리합니다.
- 만약 world_size가 domain_size보다 크다면 예외를 처리합니다.
- 이 경우는 process 마다 할당할 수 있는 총 크기가 다루려는 domain size보다 크다면 굳이 process별로 나눌 필요가 없기 때문입니다.
- subdomain_start와 subdomain_size는 프로세스당 할당하는 프로세스의 시작 position과 길이를 할당해줍니다.
- 그리고 마지막 rank 라면 remainder도 더해줍니다.
- 예시로 20의 domain_size가 주어졌다면, 마지막 프로세스가 4라면 5+1인 6만큼을 할당해줘야 합니다.
walker 초기화
typedef struct { int location; int num_steps_left_in_walk; } Walker;
- 해당 구조체는 location과 해당 step에서 남은 walk의 수를 저장합니다.
initialize_walkers 함수
void initialize_walkers(int num_walkers_per_proc, int max_walk_size, int subdomain_start, int subdomain_size, vector<Walker>* incoming_walkers) { Walker walker; for (int i = 0; i < num_walkers_per_proc; i++) { // Initialize walkers in the middle of the subdomain walker.location = subdomain_start; walker.num_steps_left_in_walk = (rand() / (float)RAND_MAX) * max_walk_size; incoming_walkers->push_back(walker); } }
- walker 변수로 구조체 선언을 진행합니다.
- 해당 함수에서는 walker들을 정보를 받아서 incoming_walkers에 vector 형태로 틍록해줍니다.
Walk 함수
void walk(Walker* walker, int subdomain_start, int subdomain_size, int domain_size, vector<Walker>* outgoing_walkers) { while (walker->num_steps_left_in_walk > 0) { if (walker->location == subdomain_start + subdomain_size) { // Take care of the case when the walker is at the end // of the domain by wrapping it around to the beginning if (walker->location == domain_size) { walker->location = 0; } outgoing_walkers->push_back(*walker); break; } else { walker->num_steps_left_in_walk--; walker->location++; } } }
- 초기화 후 실제 walk 함수의 동작을 살펴보겠습니다.
- walker가 walk를 마무리짓기 전까지 계속 실행합니다.
- 만약 walker의 Location이 subdomain의 끝을 가게되었다면,
- outgoing_walkers 벡터에 해당 walker를 추가합니다.
- 만약 location이 총 domain_size에 ( 제일 끝인 20에 도달했다) 도달했다면 location을 0 으로 초기화합니다.
- 그게 아니라면! 남은 walk할 수 있는 크기와 location을 증감시킵니다.
이제 domain 초기화, walkers 초기화 그리고 walk 함수를 구현했는데, 이제 outgoing 된 walker들을 다음 process로 넘길 때 필요한 send, recv 기능을 구현하면 됩니다.
Send_outgoing_walkers
void send_outgoing_walkers(vector<Walker>* outgoing_walkers, int world_rank, int world_size) { // Send the data as an array of MPI_BYTEs to the next process. // The last process sends to process zero. MPI_Send((void*)outgoing_walkers->data(), outgoing_walkers->size() * sizeof(Walker), MPI_BYTE, (world_rank + 1) % world_size, 0, MPI_COMM_WORLD); // Clear the outgoing walkers outgoing_walkers->clear(); }
- MPI_Send를 통해서 outgoing_walker의 size와 data를 다음 process로 전송합니다.
- 이때 여러 개의 walker들이 전송될 수 있다는 점을 인지해야 합니다.
Receive_incoming_walkers
void receive_incoming_walkers(vector<Walker>* incoming_walkers, int world_rank, int world_size) { MPI_Status status; // Receive from the process before you. If you are process zero, // receive from the last process int incoming_rank = (world_rank == 0) ? world_size - 1 : world_rank - 1; MPI_Probe(incoming_rank, 0, MPI_COMM_WORLD, &status); // Resize your incoming walker buffer based on how much data is // being received int incoming_walkers_size; MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size); incoming_walkers->resize( incoming_walkers_size / sizeof(Walker)); MPI_Recv((void*)incoming_walkers->data(), incoming_walkers_size, MPI_BYTE, incoming_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); }
- 이 함수에서는 전달받으려는 데이터의 정보를 모르기에 MPI_Status를 사용해서 이전 process로부터 몇 개의 walker를 받을지 파악합니다.
MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size);
- 이후 incoming_walkers에 data와 incoming_walker_size를 받아오는 것을 볼 수 있습니다.
실제 코드
// Find your part of the domain decompose_domain(domain_size, world_rank, world_size, &subdomain_start, &subdomain_size); // Initialize walkers in your subdomain initialize_walkers(num_walkers_per_proc, max_walk_size, subdomain_start, subdomain_size, &incoming_walkers); while (!all_walkers_finished) { // Determine walker completion later // Process all incoming walkers for (int i = 0; i < incoming_walkers.size(); i++) { walk(&incoming_walkers[i], subdomain_start, subdomain_size, domain_size, &outgoing_walkers); } // Send all outgoing walkers to the next process. send_outgoing_walkers(&outgoing_walkers, world_rank, world_size); // Receive all the new incoming walkers receive_incoming_walkers(&incoming_walkers, world_rank, world_size); }
- decompose_main으로 process마다 domain 분할을 진행
- initialize_walkers로 process마다 주어질 walker의 개수만큼 walker 초기화
- walker들이 모두 종료하기 전까지 walker들의 walk, outgoing walker 처리, incoming walker들 업데이트
이제 p2p 통신 방법론 이외에 집합 통신에서의 지식을 더 정리해보겠다. 지난번에 다룬 BroadCast외에 scatter와 gather를 주로 다룰 예정입니다.
MPI-Scatter란
이전 첫 번쨰 글에서 소개된 BroadCast와 비교해서 다른 프로세스에 데이터를 전송하는 특징이 다른 것을 볼 수 있습니다.
MPI_BCAST는 하나의 data element 를 root process에서 다른 프로세스로 똑같이 보내는 것을 알 수 있습니다.
그런데 process의 rank마다 서로 다른 data element를 전송하는 것을 볼 수 있습니다.
→함수의 형태로 좀 더 살펴보겠습니다.
MPI_Scatter( void* send_data, int send_count, MPI_Datatype send_datatype, void* recv_data, int recv_count, MPI_Datatype recv_datatype, int root, MPI_Comm communicator)
- send_data : root process에서 실제로 보내야할 데이터를 가리키는 포인터 변수
- send_count : 실제로 받는 process가 받아야하는 data element의 개수
- send_datatype : 보내는 data의 type
- recv_data : recv하는 쪽에서 저장할 수 있는 특정 buffer
- recv_count : recv할 수 있는 data element의 개수
- recv_datatype : receive하는 datatype의 종류
- root: 실제로 root 프로세스의 number를 저장하는 변수
- Communicator: 프로세스들의 공간
MPI_Gather
그렇다면 Gather는 어떻게 작동하는걸까요?
- Gather는 Scatter의 정반대의 역할을 담당합니다.
- 위 사진자료에서 볼 수 있듯이 여러 개의 프로세스로부터 root 프로세스에 data를 모으는 것을 볼 수 있습니다.
- 함수의 인자는 아래와 같습니다.
MPI_Gather( void* send_data, int send_count, MPI_Datatype send_datatype, void* recv_data, int recv_count, MPI_Datatype recv_datatype, int root, MPI_Comm communicator)
그래서 MPI_Scatter와 MPI_Gather를 가지고 무엇을 만들어볼 수 있을까?
위 2개의 함수를 활용해서 MPI Tutorial에서는 숫자의 평균을 구현하는 방법을 구현했다고 합니다.
코드의 흐름은 세 가지로 나눌 수 있습니다.
- MPI를 이용해 data를 process들에게 나눕니다.
- 각 process마다 주어진 data에 대한 계산을 진행합니다.
- 프로세스들마다 계산된 값을 합쳐서 최종 결과를 얻습니다.
그렇다면 어떤 점들을 고려해서 코드로 구현해야 할까요?
- root process인 0번에서 난수로 구성된 array를 생성합니다.
- 1번에서 생성한 array를 process들에게
scatter
시킵니다. - 이때 최대한 균등한 개수만큼 분배해야 합니다.
- 각 process안에서 2번에서 넘겨받은 data들을 가지고 평균을 구합니다.
- 3번에서 만든 각 평균은 root 프로세스에게
gather
시킵니다.
if (world_rank == 0) { rand_nums = create_rand_nums(elements_per_proc * world_size); } // Create a buffer that will hold a subset of the random numbers float *sub_rand_nums = malloc(sizeof(float) * elements_per_proc); // Scatter the random numbers to all processes MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums, elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD); // Compute the average of your subset float sub_avg = compute_avg(sub_rand_nums, elements_per_proc); // Gather all partial averages down to the root process float *sub_avgs = NULL; if (world_rank == 0) { sub_avgs = malloc(sizeof(float) * world_size); } MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0, MPI_COMM_WORLD); // Compute the total average of all numbers. if (world_rank == 0) { float avg = compute_avg(sub_avgs, world_size); }
위 코드 부분 중 특정 코드만 살펴보겠습니다.
- 우선 MPI_Scatter 코드 부분을 살펴보자면 아래와 같습니다.
- root 프로세스(0번)에게 process마다 갖고 있는 element (elements_per_proc) 개수 만큼 현재 선언된 sub_rand_nums에게 분배해주는 코드입니다.
- 이 코드에서 rand_nums로 저장된 난수 array를 process마다 sub_rand_nums안에 균등한 개수만큼 갖고 있게 되는 것 입니다.
MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums, elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD);
- sub_avg 변수에서는 process 당 할당된 sub_rand_nums의 평균값을 갖습니다.
- 마지막으로 MPI_Gather에서의 코드를 살펴보겠습니다.
- sub_avgs 포인터는 각 process마다 계산된 평균 값을 여러 개 갖고 있기 위해 사용됩니다.
- world_size(프로세스 개수)만큼 할당된 sub_avgs는 MPI_Gather에서 호출됩니다.
- MPI_Gather함수에서 호출된 sub_avgs는 각 process마다 존재하는 sub_avg의 값을 적재합니다.
float *sub_avgs = NULL; if (world_rank == 0) { sub_avgs = malloc(sizeof(float) * world_size); } MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
>>>> 코드 결과
/bin/mpirun -n 4 ./avg 100 Avg of all elements is 0.436742 Avg computed across original data is 0.436742
MPI_Allgather를 활용한 average computation 코드 수정
>> 앞서 우리는 MPI_Scatter와 MPI_Gather를 사용해서 average 코드를 구현했습니다.
>> 하지만, 이 패턴을 우리는 하나의 MPI 함수를 사용해서 똑같이 만들 수 있습니다.
원래 MPI_Scatter와 MPI_Gather를 사용한 패턴은 흔히 Many-to-one나 one-to-many 형태의 통신이 필요할 때 쓰입니다. 하지만 Many-to-many가 필요한 경우, 우리는 MPI_Allgather를 사용합니다.
- MPI_Allgather는 모든 process에게 모든 element(data)들을 합친 것을 보낼 것 입니다.
이 뜻은 MPI_Gather를 적용한 후, MPI_Bcast를 사용한 것과 같다고 볼 수 있어요 ❗
- 함수의 원형을 살펴보면 아래와 같습니다.
MPI_Allgather( void* send_data, int send_count, MPI_Datatype send_datatype, void* recv_data, int recv_count, MPI_Datatype recv_datatype, MPI_Comm communicator)
- 수정된 코드를 살펴보겠습니다.
float *sub_avgs = (float *)malloc(sizeof(float) * world_size); MPI_Allgather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, MPI_COMM_WORLD); // Compute the total average of all numbers. float avg = compute_avg(sub_avgs, world_size);
Avg of all elements from proc 3 is 0.436742 Avg of all elements from proc 2 is 0.436742 Avg of all elements from proc 0 is 0.436742 Avg of all elements from proc 1 is 0.436742
결론
이번 시간에는 MPI 관련 기초적인 내용들을 다뤄봤습니다. 사실 AllReduce쪽을 더 깊게 다뤄야하는데, 관련 지식을 더 세분하게 준비하기 어렵다고 판단이 들어서 여기까지 준비했습니다,,ㅎㅎ
다음 시간에는 Model inference 관련 최적화 방법론에 대해 다룰 수 있도록 준비해오겠습니다 🙂
다루는 주제들이 서로 안 맞아보일 수 있는데, 저는 L 서빙 시스템 구현 쪽에 관심이 있어서 조금 다양한 분야들을 다룰 예정인 점 미리 양해 부탁드립니다.
다음에는 더 매끄러운 글로 돌아오도록 해보겠습니다!
Share article