MPI Programming with c/c++ (part2)
MPI programming example in c++
Jan 21, 2024

서론
안녕하세요! 이번 시간에는 지난번에 마무리 짓지 못한 MPI programming 관련 글을 소개할 예정입니다!
내용으로는 MPI Probe, MPI_Scatter, MPI_Gather, MPI_Allgather에 대해 다룰 예정입니다~!
Dynamic Receiving with MPI probe
- 이전 시간에 MPI_send와 MPI_Recv 를 활용해 P2P 방식을 살펴보았습니다.
- 여기에 더해서 특정 message의 보내는 총 길이를 인지한채 메세지를 보내는 방법에 대해서만 살펴봤네요
- 그런데 MPI는 원래 Dynamic message를 주고받는 것도 지원해준다고 합니다.
- 해당 기능을 실행하려면 몇 가지의 function들을 사용해야 합니다.
- 이번 1에서는 이와 관련된 내용을 살펴보겠습니다.
1. MPI_status 구조체
- 이전 시간에 살펴봤듯이 MPI_Recv는 MPI_status 구조체의 주소값을 인자로 받습니다.
- MPI_status_Ignore를 통해 이를 받지 않을 수도 있죠..!
- 만약 MPI_Recv에 MPI_status 구조체를 넘겨준다면 세 가지 중요한 정보를 담고 있어요
- sender의 rank
- MPI_source로 접근할 수 있습니다.
MPI_status stat stat.MPI_SOURCE
- MPI_TAG로 접근할 수 있습니다.
MPI_status stat stat.MPI_TAG
- 이 값은 미리 결정된 값이 존재하지 않아서 MPI_Get_count를 사용해서 접근합니다.
MPI_Get_count(
MPI_Status* status,
MPI_Datatype datatype,
int* count)
- 결과적으로 MPI_status 포인터와 Datatype를 넘겨주고 count에 원하는 정보가 저장됩니다.
Q. 그래서 근데 이 MPI_status 구조체를 왜 알아야하는걸까요?
A. MPI_Recv는 MPI_ANY_SOURCE, MPI_ANY_TAG 를 받는다면 위에서 원하는 Dynamic message 가 안된다.
➕ 또한 실제 sender의 정보 그리고 tag도 알고 싶다는 더더욱 이 구조체를 사용해야합니다.더욱이, MPI_Recv 는 function 호출 시에 인자로 들어온 message 전체를 받는 것을 장담하지 못합니다. ㅎㅎ이 대신, 실제로 Recv한테 전송된 message만 체크하고 보낼 수 있는 크기보다 더 보내면 error를 반환합니다.
→ 따라서 MPI_Status를 사용해야 한다는 것입니다!
MPI_probe로 message size 알아내기
- 그냥 비효율적인 큰 buffer를 제공하는 것보다 MPI_probe를 이용해 message의 크기를 파악해서 buffer를 사용하는 것이 좋다고 합니다.
MPI_Probe(
int source,
int tag,
MPI_Comm comm,
MPI_Status* status)
- MPI_Recv와 비슷한 역할을 수행합니다.
- MPI_probe는 특정 sender와 tag인지 필터링하는 기능을 갖고 있습니다.
MPI_Probe(0, 0, MPI_COMM_WORLD, &status);
// When probe returns, the status object has the size and other
// attributes of the incoming message. Get the message size
MPI_Get_count(&status, MPI_INT, &number_amount);
// Allocate a buffer to hold the incoming numbers
int* number_buf = (int*)malloc(sizeof(int) * number_amount);
// Now receive the message with the allocated buffer
MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("1 dynamically received %d numbers from 0.\n",
number_amount);
free(number_buf);
- 먼저 특정 sender가 보낸 경우에서만 MPI_status 정보를 채워넣습니다.
- MPI_Get_count를 통해 message 총 길이를 구합니다.
- 그 길이에 맞는 buffer를 만들어서 Recv를 통해 message를 받습니다.
Random walk
- 앞서 살펴본 MPI Probe를 활용한 간단한 예제를 살펴보겠습니다.
문제 정의
→ Min, Max 값과 walker W가 주어졌을 때, W는 S번의 random walk을 오른쪽으로 실행합니다.
→ 이때 범위를 벗어나면 다시 순환됩니다.
어떻게 병렬 처리로 구현할 수 있을까?
- 전체 MIN, MAX 거리를 process의 개수를 최대한 균등하게 잘라서 할당합니다.
- 0~20 까지 range와 사용 가능한 프로세스가 총 4개라면,
- process 0, 1, 2, 3 으로 나눠서 배분할 수 있는거죠.

- 그리고 walker를 초기화해줘야 합니다.
- 이 walker의 규칙은 아래처럼 정리할 수 있어요!
- process 0에서 4보다 큰 크기로 walk를 진행하면 process 0는 process 1과 통신해서 walker를 이동시켜야 합니다.
- process 1은 walker를 전달받아 움직임이 총 6을 넘기전까지 계속 움직이고 6을 넘기면 또 process 2로 walker를 움직여야 합니다.
그럼 어떻게 코드로 구현할 수 있을까?
이 코드는 MPI_Send와 MPI_Recv를 활용해서 구현할 수 있다. 이 코드를 구현하려면 한 가지 규칙을 다시 되짚어야 합니다.
walker는 현재 남은 걸음의 수와 현재 위치를 저장하고 있어야 한다.
해당 블로그에서 이 코드의 주제를 “domain decomposition”이라고 명시했습니다.
- 아무래도 병렬처리를 위해 쪼개다보니 그렇게 이름을 지은 것이 아닐까 싶네요.
구현한 함수에서 domain의 size와 walker에게 맞는 subdomain을 할당해줄 것 입니다.
decompose_domain 함수
void decompose_domain(int domain_size, int world_rank,
int world_size, int* subdomain_start,
int* subdomain_size) {
if (world_size > domain_size) {
// Don't worry about this special case. Assume the domain
// size is greater than the world size.
MPI_Abort(MPI_COMM_WORLD, 1);
}
*subdomain_start = domain_size / world_size * world_rank;
*subdomain_size = domain_size / world_size;
if (world_rank == world_size - 1) {
// Give remainder to last process
*subdomain_size += domain_size % world_size;
}
}
- 해당 함수는 domain를 decomposition 하는 과정을 담당합니다.
- MPI_Abort함수를 통해서 에러를 처리합니다.
- 만약 world_size가 domain_size보다 크다면 예외를 처리합니다.
- 이 경우는 process 마다 할당할 수 있는 총 크기가 다루려는 domain size보다 크다면 굳이 process별로 나눌 필요가 없기 때문입니다.
- subdomain_start와 subdomain_size는 프로세스당 할당하는 프로세스의 시작 position과 길이를 할당해줍니다.
- 그리고 마지막 rank 라면 remainder도 더해줍니다.
- 예시로 20의 domain_size가 주어졌다면, 마지막 프로세스가 4라면 5+1인 6만큼을 할당해줘야 합니다.
walker 초기화
typedef struct {
int location;
int num_steps_left_in_walk;
} Walker;
- 해당 구조체는 location과 해당 step에서 남은 walk의 수를 저장합니다.
initialize_walkers 함수
void initialize_walkers(int num_walkers_per_proc, int max_walk_size,
int subdomain_start, int subdomain_size,
vector<Walker>* incoming_walkers) {
Walker walker;
for (int i = 0; i < num_walkers_per_proc; i++) {
// Initialize walkers in the middle of the subdomain
walker.location = subdomain_start;
walker.num_steps_left_in_walk =
(rand() / (float)RAND_MAX) * max_walk_size;
incoming_walkers->push_back(walker);
}
}
- walker 변수로 구조체 선언을 진행합니다.
- 해당 함수에서는 walker들을 정보를 받아서 incoming_walkers에 vector 형태로 틍록해줍니다.
Walk 함수
void walk(Walker* walker, int subdomain_start, int subdomain_size,
int domain_size, vector<Walker>* outgoing_walkers) {
while (walker->num_steps_left_in_walk > 0) {
if (walker->location == subdomain_start + subdomain_size) {
// Take care of the case when the walker is at the end
// of the domain by wrapping it around to the beginning
if (walker->location == domain_size) {
walker->location = 0;
}
outgoing_walkers->push_back(*walker);
break;
} else {
walker->num_steps_left_in_walk--;
walker->location++;
}
}
}
- 초기화 후 실제 walk 함수의 동작을 살펴보겠습니다.
- walker가 walk를 마무리짓기 전까지 계속 실행합니다.
- 만약 walker의 Location이 subdomain의 끝을 가게되었다면,
- outgoing_walkers 벡터에 해당 walker를 추가합니다.
- 만약 location이 총 domain_size에 ( 제일 끝인 20에 도달했다) 도달했다면 location을 0 으로 초기화합니다.
- 그게 아니라면! 남은 walk할 수 있는 크기와 location을 증감시킵니다.
이제 domain 초기화, walkers 초기화 그리고 walk 함수를 구현했는데, 이제 outgoing 된 walker들을 다음 process로 넘길 때 필요한 send, recv 기능을 구현하면 됩니다.
Send_outgoing_walkers
void send_outgoing_walkers(vector<Walker>* outgoing_walkers,
int world_rank, int world_size) {
// Send the data as an array of MPI_BYTEs to the next process.
// The last process sends to process zero.
MPI_Send((void*)outgoing_walkers->data(),
outgoing_walkers->size() * sizeof(Walker), MPI_BYTE,
(world_rank + 1) % world_size, 0, MPI_COMM_WORLD);
// Clear the outgoing walkers
outgoing_walkers->clear();
}
- MPI_Send를 통해서 outgoing_walker의 size와 data를 다음 process로 전송합니다.
- 이때 여러 개의 walker들이 전송될 수 있다는 점을 인지해야 합니다.
Receive_incoming_walkers
void receive_incoming_walkers(vector<Walker>* incoming_walkers,
int world_rank, int world_size) {
MPI_Status status;
// Receive from the process before you. If you are process zero,
// receive from the last process
int incoming_rank =
(world_rank == 0) ? world_size - 1 : world_rank - 1;
MPI_Probe(incoming_rank, 0, MPI_COMM_WORLD, &status);
// Resize your incoming walker buffer based on how much data is
// being received
int incoming_walkers_size;
MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size);
incoming_walkers->resize(
incoming_walkers_size / sizeof(Walker));
MPI_Recv((void*)incoming_walkers->data(), incoming_walkers_size,
MPI_BYTE, incoming_rank, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
- 이 함수에서는 전달받으려는 데이터의 정보를 모르기에 MPI_Status를 사용해서 이전 process로부터 몇 개의 walker를 받을지 파악합니다.
MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size);
- 이후 incoming_walkers에 data와 incoming_walker_size를 받아오는 것을 볼 수 있습니다.
실제 코드
// Find your part of the domain
decompose_domain(domain_size, world_rank, world_size,
&subdomain_start, &subdomain_size);
// Initialize walkers in your subdomain
initialize_walkers(num_walkers_per_proc, max_walk_size,
subdomain_start, subdomain_size,
&incoming_walkers);
while (!all_walkers_finished) { // Determine walker completion later
// Process all incoming walkers
for (int i = 0; i < incoming_walkers.size(); i++) {
walk(&incoming_walkers[i], subdomain_start, subdomain_size,
domain_size, &outgoing_walkers);
}
// Send all outgoing walkers to the next process.
send_outgoing_walkers(&outgoing_walkers, world_rank,
world_size);
// Receive all the new incoming walkers
receive_incoming_walkers(&incoming_walkers, world_rank,
world_size);
}
- decompose_main으로 process마다 domain 분할을 진행
- initialize_walkers로 process마다 주어질 walker의 개수만큼 walker 초기화
- walker들이 모두 종료하기 전까지 walker들의 walk, outgoing walker 처리, incoming walker들 업데이트
이제 p2p 통신 방법론 이외에 집합 통신에서의 지식을 더 정리해보겠다. 지난번에 다룬 BroadCast외에 scatter와 gather를 주로 다룰 예정입니다.
MPI-Scatter란

이전 첫 번쨰 글에서 소개된 BroadCast와 비교해서 다른 프로세스에 데이터를 전송하는 특징이 다른 것을 볼 수 있습니다.
MPI_BCAST는 하나의 data element 를 root process에서 다른 프로세스로 똑같이 보내는 것을 알 수 있습니다.
그런데 process의 rank마다 서로 다른 data element를 전송하는 것을 볼 수 있습니다.
→함수의 형태로 좀 더 살펴보겠습니다.
MPI_Scatter(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
int root,
MPI_Comm communicator)
- send_data : root process에서 실제로 보내야할 데이터를 가리키는 포인터 변수
- send_count : 실제로 받는 process가 받아야하는 data element의 개수
- send_datatype : 보내는 data의 type
- recv_data : recv하는 쪽에서 저장할 수 있는 특정 buffer
- recv_count : recv할 수 있는 data element의 개수
- recv_datatype : receive하는 datatype의 종류
- root: 실제로 root 프로세스의 number를 저장하는 변수
- Communicator: 프로세스들의 공간
MPI_Gather
그렇다면 Gather는 어떻게 작동하는걸까요?
- Gather는 Scatter의 정반대의 역할을 담당합니다.
- 위 사진자료에서 볼 수 있듯이 여러 개의 프로세스로부터 root 프로세스에 data를 모으는 것을 볼 수 있습니다.

- 함수의 인자는 아래와 같습니다.
MPI_Gather(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
int root,
MPI_Comm communicator)
그래서 MPI_Scatter와 MPI_Gather를 가지고 무엇을 만들어볼 수 있을까?
위 2개의 함수를 활용해서 MPI Tutorial에서는 숫자의 평균을 구현하는 방법을 구현했다고 합니다.
코드의 흐름은 세 가지로 나눌 수 있습니다.
- MPI를 이용해 data를 process들에게 나눕니다.
- 각 process마다 주어진 data에 대한 계산을 진행합니다.
- 프로세스들마다 계산된 값을 합쳐서 최종 결과를 얻습니다.
그렇다면 어떤 점들을 고려해서 코드로 구현해야 할까요?
- root process인 0번에서 난수로 구성된 array를 생성합니다.
- 1번에서 생성한 array를 process들에게
scatter
시킵니다. - 이때 최대한 균등한 개수만큼 분배해야 합니다.
- 각 process안에서 2번에서 넘겨받은 data들을 가지고 평균을 구합니다.
- 3번에서 만든 각 평균은 root 프로세스에게
gather
시킵니다.
if (world_rank == 0) {
rand_nums = create_rand_nums(elements_per_proc * world_size);
}
// Create a buffer that will hold a subset of the random numbers
float *sub_rand_nums = malloc(sizeof(float) * elements_per_proc);
// Scatter the random numbers to all processes
MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums,
elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD);
// Compute the average of your subset
float sub_avg = compute_avg(sub_rand_nums, elements_per_proc);
// Gather all partial averages down to the root process
float *sub_avgs = NULL;
if (world_rank == 0) {
sub_avgs = malloc(sizeof(float) * world_size);
}
MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0,
MPI_COMM_WORLD);
// Compute the total average of all numbers.
if (world_rank == 0) {
float avg = compute_avg(sub_avgs, world_size);
}
위 코드 부분 중 특정 코드만 살펴보겠습니다.
- 우선 MPI_Scatter 코드 부분을 살펴보자면 아래와 같습니다.
- root 프로세스(0번)에게 process마다 갖고 있는 element (elements_per_proc) 개수 만큼 현재 선언된 sub_rand_nums에게 분배해주는 코드입니다.
- 이 코드에서 rand_nums로 저장된 난수 array를 process마다 sub_rand_nums안에 균등한 개수만큼 갖고 있게 되는 것 입니다.
MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums,
elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD);
- sub_avg 변수에서는 process 당 할당된 sub_rand_nums의 평균값을 갖습니다.
- 마지막으로 MPI_Gather에서의 코드를 살펴보겠습니다.
- sub_avgs 포인터는 각 process마다 계산된 평균 값을 여러 개 갖고 있기 위해 사용됩니다.
- world_size(프로세스 개수)만큼 할당된 sub_avgs는 MPI_Gather에서 호출됩니다.
- MPI_Gather함수에서 호출된 sub_avgs는 각 process마다 존재하는 sub_avg의 값을 적재합니다.
float *sub_avgs = NULL;
if (world_rank == 0) {
sub_avgs = malloc(sizeof(float) * world_size);
}
MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0,
MPI_COMM_WORLD);
>>>> 코드 결과
/bin/mpirun -n 4 ./avg 100
Avg of all elements is 0.436742
Avg computed across original data is 0.436742
MPI_Allgather를 활용한 average computation 코드 수정
>> 앞서 우리는 MPI_Scatter와 MPI_Gather를 사용해서 average 코드를 구현했습니다.
>> 하지만, 이 패턴을 우리는 하나의 MPI 함수를 사용해서 똑같이 만들 수 있습니다.
원래 MPI_Scatter와 MPI_Gather를 사용한 패턴은 흔히 Many-to-one나 one-to-many 형태의 통신이 필요할 때 쓰입니다. 하지만 Many-to-many가 필요한 경우, 우리는 MPI_Allgather를 사용합니다.

- MPI_Allgather는 모든 process에게 모든 element(data)들을 합친 것을 보낼 것 입니다.
이 뜻은 MPI_Gather를 적용한 후, MPI_Bcast를 사용한 것과 같다고 볼 수 있어요 ❗
- 함수의 원형을 살펴보면 아래와 같습니다.
MPI_Allgather(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
MPI_Comm communicator)
- 수정된 코드를 살펴보겠습니다.
float *sub_avgs = (float *)malloc(sizeof(float) * world_size);
MPI_Allgather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT,
MPI_COMM_WORLD);
// Compute the total average of all numbers.
float avg = compute_avg(sub_avgs, world_size);
Avg of all elements from proc 3 is 0.436742
Avg of all elements from proc 2 is 0.436742
Avg of all elements from proc 0 is 0.436742
Avg of all elements from proc 1 is 0.436742
결론
이번 시간에는 MPI 관련 기초적인 내용들을 다뤄봤습니다. 사실 AllReduce쪽을 더 깊게 다뤄야하는데, 관련 지식을 더 세분하게 준비하기 어렵다고 판단이 들어서 여기까지 준비했습니다,,ㅎㅎ
다음 시간에는 Model inference 관련 최적화 방법론에 대해 다룰 수 있도록 준비해오겠습니다 🙂
다루는 주제들이 서로 안 맞아보일 수 있는데, 저는 L 서빙 시스템 구현 쪽에 관심이 있어서 조금 다양한 분야들을 다룰 예정인 점 미리 양해 부탁드립니다.
다음에는 더 매끄러운 글로 돌아오도록 해보겠습니다!
Share article