MPI Programming with c/c++ (part2)

MPI programming example in c++
Jan 21, 2024
MPI Programming with c/c++ (part2)

서론

안녕하세요! 이번 시간에는 지난번에 마무리 짓지 못한 MPI programming 관련 글을 소개할 예정입니다!
내용으로는 MPI Probe, MPI_Scatter, MPI_Gather, MPI_Allgather에 대해 다룰 예정입니다~!

Dynamic Receiving with MPI probe

  • 이전 시간에 MPI_send와 MPI_Recv 를 활용해 P2P 방식을 살펴보았습니다.
  • 여기에 더해서 특정 message의 보내는 총 길이를 인지한채 메세지를 보내는 방법에 대해서만 살펴봤네요
  • 그런데 MPI는 원래 Dynamic message를 주고받는 것도 지원해준다고 합니다.
    • 해당 기능을 실행하려면 몇 가지의 function들을 사용해야 합니다.
    • 이번 1에서는 이와 관련된 내용을 살펴보겠습니다.

1. MPI_status 구조체

  • 이전 시간에 살펴봤듯이 MPI_Recv는 MPI_status 구조체의 주소값을 인자로 받습니다.
    • MPI_status_Ignore를 통해 이를 받지 않을 수도 있죠..!
    • 만약 MPI_Recv에 MPI_status 구조체를 넘겨준다면 세 가지 중요한 정보를 담고 있어요
      • sender의 rank
        • MPI_source로 접근할 수 있습니다.
        • MPI_status stat stat.MPI_SOURCE
      • message의 tag
        • MPI_TAG로 접근할 수 있습니다.
          • MPI_status stat stat.MPI_TAG
      • message의 길이
        • 이 값은 미리 결정된 값이 존재하지 않아서 MPI_Get_count를 사용해서 접근합니다.
          • MPI_Get_count( MPI_Status* status, MPI_Datatype datatype, int* count)
          • 위 함수를 좀 더 살펴보자면,,,
            • 결과적으로 MPI_status 포인터와 Datatype를 넘겨주고 count에 원하는 정보가 저장됩니다.
      💡
      Q. 그래서 근데 이 MPI_status 구조체를 왜 알아야하는걸까요?
      A. MPI_Recv는 MPI_ANY_SOURCE, MPI_ANY_TAG 를 받는다면 위에서 원하는 Dynamic message 가 안된다.
      ➕ 또한 실제 sender의 정보 그리고 tag도 알고 싶다는 더더욱 이 구조체를 사용해야합니다.
      더욱이, MPI_Recv 는 function 호출 시에 인자로 들어온 message 전체를 받는 것을 장담하지 못합니다. ㅎㅎ
      이 대신, 실제로 Recv한테 전송된 message만 체크하고 보낼 수 있는 크기보다 더 보내면 error를 반환합니다.
      → 따라서 MPI_Status를 사용해야 한다는 것입니다!
       
 

MPI_probe로 message size 알아내기

  • 그냥 비효율적인 큰 buffer를 제공하는 것보다 MPI_probe를 이용해 message의 크기를 파악해서 buffer를 사용하는 것이 좋다고 합니다.
MPI_Probe( int source, int tag, MPI_Comm comm, MPI_Status* status)
  • MPI_Recv와 비슷한 역할을 수행합니다.
  • MPI_probe는 특정 sender와 tag인지 필터링하는 기능을 갖고 있습니다.
MPI_Probe(0, 0, MPI_COMM_WORLD, &status); // When probe returns, the status object has the size and other // attributes of the incoming message. Get the message size MPI_Get_count(&status, MPI_INT, &number_amount); // Allocate a buffer to hold the incoming numbers int* number_buf = (int*)malloc(sizeof(int) * number_amount); // Now receive the message with the allocated buffer MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); printf("1 dynamically received %d numbers from 0.\n", number_amount); free(number_buf);
  1. 먼저 특정 sender가 보낸 경우에서만 MPI_status 정보를 채워넣습니다.
  1. MPI_Get_count를 통해 message 총 길이를 구합니다.
  1. 그 길이에 맞는 buffer를 만들어서 Recv를 통해 message를 받습니다.
 

Random walk

  • 앞서 살펴본 MPI Probe를 활용한 간단한 예제를 살펴보겠습니다.

문제 정의

→ Min, Max 값과 walker W가 주어졌을 때, W는 S번의 random walk을 오른쪽으로 실행합니다.
→ 이때 범위를 벗어나면 다시 순환됩니다.
 

어떻게 병렬 처리로 구현할 수 있을까?

  • 전체 MIN, MAX 거리를 process의 개수를 최대한 균등하게 잘라서 할당합니다.
  • 0~20 까지 range와 사용 가능한 프로세스가 총 4개라면,
    • process 0, 1, 2, 3 으로 나눠서 배분할 수 있는거죠.
      • notion image
  • 그리고 walker를 초기화해줘야 합니다.
    • 이 walker의 규칙은 아래처럼 정리할 수 있어요!
        1. process 0에서 4보다 큰 크기로 walk를 진행하면 process 0는 process 1과 통신해서 walker를 이동시켜야 합니다.
        1. process 1은 walker를 전달받아 움직임이 총 6을 넘기전까지 계속 움직이고 6을 넘기면 또 process 2로 walker를 움직여야 합니다.

그럼 어떻게 코드로 구현할 수 있을까?

이 코드는 MPI_Send와 MPI_Recv를 활용해서 구현할 수 있다. 이 코드를 구현하려면 한 가지 규칙을 다시 되짚어야 합니다.
walker는 현재 남은 걸음의 수와 현재 위치를 저장하고 있어야 한다.
해당 블로그에서 이 코드의 주제를 “domain decomposition”이라고 명시했습니다.
  • 아무래도 병렬처리를 위해 쪼개다보니 그렇게 이름을 지은 것이 아닐까 싶네요.
구현한 함수에서 domain의 size와 walker에게 맞는 subdomain을 할당해줄 것 입니다.
 

decompose_domain 함수

void decompose_domain(int domain_size, int world_rank, int world_size, int* subdomain_start, int* subdomain_size) { if (world_size > domain_size) { // Don't worry about this special case. Assume the domain // size is greater than the world size. MPI_Abort(MPI_COMM_WORLD, 1); } *subdomain_start = domain_size / world_size * world_rank; *subdomain_size = domain_size / world_size; if (world_rank == world_size - 1) { // Give remainder to last process *subdomain_size += domain_size % world_size; } }
  • 해당 함수는 domain를 decomposition 하는 과정을 담당합니다.
      1. MPI_Abort함수를 통해서 에러를 처리합니다.
          • 만약 world_size가 domain_size보다 크다면 예외를 처리합니다.
            • 이 경우는 process 마다 할당할 수 있는 총 크기가 다루려는 domain size보다 크다면 굳이 process별로 나눌 필요가 없기 때문입니다.
      1. subdomain_start와 subdomain_size는 프로세스당 할당하는 프로세스의 시작 position과 길이를 할당해줍니다.
      1. 그리고 마지막 rank 라면 remainder도 더해줍니다.
          • 예시로 20의 domain_size가 주어졌다면, 마지막 프로세스가 4라면 5+1인 6만큼을 할당해줘야 합니다.

walker 초기화

typedef struct { int location; int num_steps_left_in_walk; } Walker;
  • 해당 구조체는 location과 해당 step에서 남은 walk의 수를 저장합니다.
 

initialize_walkers 함수

void initialize_walkers(int num_walkers_per_proc, int max_walk_size, int subdomain_start, int subdomain_size, vector<Walker>* incoming_walkers) { Walker walker; for (int i = 0; i < num_walkers_per_proc; i++) { // Initialize walkers in the middle of the subdomain walker.location = subdomain_start; walker.num_steps_left_in_walk = (rand() / (float)RAND_MAX) * max_walk_size; incoming_walkers->push_back(walker); } }
  1. walker 변수로 구조체 선언을 진행합니다.
  1. 해당 함수에서는 walker들을 정보를 받아서 incoming_walkers에 vector 형태로 틍록해줍니다.
 

Walk 함수

void walk(Walker* walker, int subdomain_start, int subdomain_size, int domain_size, vector<Walker>* outgoing_walkers) { while (walker->num_steps_left_in_walk > 0) { if (walker->location == subdomain_start + subdomain_size) { // Take care of the case when the walker is at the end // of the domain by wrapping it around to the beginning if (walker->location == domain_size) { walker->location = 0; } outgoing_walkers->push_back(*walker); break; } else { walker->num_steps_left_in_walk--; walker->location++; } } }
  • 초기화 후 실제 walk 함수의 동작을 살펴보겠습니다.
  • walker가 walk를 마무리짓기 전까지 계속 실행합니다.
  • 만약 walker의 Location이 subdomain의 끝을 가게되었다면,
    • outgoing_walkers 벡터에 해당 walker를 추가합니다.
    • 만약 location이 총 domain_size에 ( 제일 끝인 20에 도달했다) 도달했다면 location을 0 으로 초기화합니다.
  • 그게 아니라면! 남은 walk할 수 있는 크기와 location을 증감시킵니다.
 
이제 domain 초기화, walkers 초기화 그리고 walk 함수를 구현했는데, 이제 outgoing 된 walker들을 다음 process로 넘길 때 필요한 send, recv 기능을 구현하면 됩니다.

Send_outgoing_walkers

void send_outgoing_walkers(vector<Walker>* outgoing_walkers, int world_rank, int world_size) { // Send the data as an array of MPI_BYTEs to the next process. // The last process sends to process zero. MPI_Send((void*)outgoing_walkers->data(), outgoing_walkers->size() * sizeof(Walker), MPI_BYTE, (world_rank + 1) % world_size, 0, MPI_COMM_WORLD); // Clear the outgoing walkers outgoing_walkers->clear(); }
  • MPI_Send를 통해서 outgoing_walker의 size와 data를 다음 process로 전송합니다.
  • 이때 여러 개의 walker들이 전송될 수 있다는 점을 인지해야 합니다.

Receive_incoming_walkers

void receive_incoming_walkers(vector<Walker>* incoming_walkers, int world_rank, int world_size) { MPI_Status status; // Receive from the process before you. If you are process zero, // receive from the last process int incoming_rank = (world_rank == 0) ? world_size - 1 : world_rank - 1; MPI_Probe(incoming_rank, 0, MPI_COMM_WORLD, &status); // Resize your incoming walker buffer based on how much data is // being received int incoming_walkers_size; MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size); incoming_walkers->resize( incoming_walkers_size / sizeof(Walker)); MPI_Recv((void*)incoming_walkers->data(), incoming_walkers_size, MPI_BYTE, incoming_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); }
  • 이 함수에서는 전달받으려는 데이터의 정보를 모르기에 MPI_Status를 사용해서 이전 process로부터 몇 개의 walker를 받을지 파악합니다.
    • MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size);
  • 이후 incoming_walkers에 data와 incoming_walker_size를 받아오는 것을 볼 수 있습니다.
 

실제 코드

// Find your part of the domain decompose_domain(domain_size, world_rank, world_size, &subdomain_start, &subdomain_size); // Initialize walkers in your subdomain initialize_walkers(num_walkers_per_proc, max_walk_size, subdomain_start, subdomain_size, &incoming_walkers); while (!all_walkers_finished) { // Determine walker completion later // Process all incoming walkers for (int i = 0; i < incoming_walkers.size(); i++) { walk(&incoming_walkers[i], subdomain_start, subdomain_size, domain_size, &outgoing_walkers); } // Send all outgoing walkers to the next process. send_outgoing_walkers(&outgoing_walkers, world_rank, world_size); // Receive all the new incoming walkers receive_incoming_walkers(&incoming_walkers, world_rank, world_size); }
  1. decompose_main으로 process마다 domain 분할을 진행
  1. initialize_walkers로 process마다 주어질 walker의 개수만큼 walker 초기화
  1. walker들이 모두 종료하기 전까지 walker들의 walk, outgoing walker 처리, incoming walker들 업데이트
 
이제 p2p 통신 방법론 이외에 집합 통신에서의 지식을 더 정리해보겠다. 지난번에 다룬 BroadCast외에 scatter와 gather를 주로 다룰 예정입니다.

MPI-Scatter란

notion image
이전 첫 번쨰 글에서 소개된 BroadCast와 비교해서 다른 프로세스에 데이터를 전송하는 특징이 다른 것을 볼 수 있습니다.
MPI_BCAST는 하나의 data element 를 root process에서 다른 프로세스로 똑같이 보내는 것을 알 수 있습니다.
그런데 process의 rank마다 서로 다른 data element를 전송하는 것을 볼 수 있습니다.
→함수의 형태로 좀 더 살펴보겠습니다.
MPI_Scatter( void* send_data, int send_count, MPI_Datatype send_datatype, void* recv_data, int recv_count, MPI_Datatype recv_datatype, int root, MPI_Comm communicator)
  • send_data : root process에서 실제로 보내야할 데이터를 가리키는 포인터 변수
  • send_count : 실제로 받는 process가 받아야하는 data element의 개수
  • send_datatype : 보내는 data의 type
  • recv_data : recv하는 쪽에서 저장할 수 있는 특정 buffer
  • recv_count : recv할 수 있는 data element의 개수
  • recv_datatype : receive하는 datatype의 종류
  • root: 실제로 root 프로세스의 number를 저장하는 변수
  • Communicator: 프로세스들의 공간
 

MPI_Gather

그렇다면 Gather는 어떻게 작동하는걸까요?
  • Gather는 Scatter의 정반대의 역할을 담당합니다.
    • notion image
    • 위 사진자료에서 볼 수 있듯이 여러 개의 프로세스로부터 root 프로세스에 data를 모으는 것을 볼 수 있습니다.
  • 함수의 인자는 아래와 같습니다.
    • MPI_Gather( void* send_data, int send_count, MPI_Datatype send_datatype, void* recv_data, int recv_count, MPI_Datatype recv_datatype, int root, MPI_Comm communicator)
    • 사실 위에서 다룬 MPI_Scatter의 함수와 같은 구성인 것을 알 수 있기에 인자들마다 역할에 대한 소개는 생략하도록 하겠습니다.
 

그래서 MPI_Scatter와 MPI_Gather를 가지고 무엇을 만들어볼 수 있을까?

위 2개의 함수를 활용해서 MPI Tutorial에서는 숫자의 평균을 구현하는 방법을 구현했다고 합니다.
코드의 흐름은 세 가지로 나눌 수 있습니다.
  1. MPI를 이용해 data를 process들에게 나눕니다.
  1. 각 process마다 주어진 data에 대한 계산을 진행합니다.
  1. 프로세스들마다 계산된 값을 합쳐서 최종 결과를 얻습니다.
그렇다면 어떤 점들을 고려해서 코드로 구현해야 할까요?
  1. root process인 0번에서 난수로 구성된 array를 생성합니다.
  1. 1번에서 생성한 array를 process들에게 scatter 시킵니다.
      • 이때 최대한 균등한 개수만큼 분배해야 합니다.
  1. 각 process안에서 2번에서 넘겨받은 data들을 가지고 평균을 구합니다.
  1. 3번에서 만든 각 평균은 root 프로세스에게 gather 시킵니다.
if (world_rank == 0) { rand_nums = create_rand_nums(elements_per_proc * world_size); } // Create a buffer that will hold a subset of the random numbers float *sub_rand_nums = malloc(sizeof(float) * elements_per_proc); // Scatter the random numbers to all processes MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums, elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD); // Compute the average of your subset float sub_avg = compute_avg(sub_rand_nums, elements_per_proc); // Gather all partial averages down to the root process float *sub_avgs = NULL; if (world_rank == 0) { sub_avgs = malloc(sizeof(float) * world_size); } MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0, MPI_COMM_WORLD); // Compute the total average of all numbers. if (world_rank == 0) { float avg = compute_avg(sub_avgs, world_size); }
위 코드 부분 중 특정 코드만 살펴보겠습니다.
 
  1. 우선 MPI_Scatter 코드 부분을 살펴보자면 아래와 같습니다.
    1. MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums, elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD);
      • root 프로세스(0번)에게 process마다 갖고 있는 element (elements_per_proc) 개수 만큼 현재 선언된 sub_rand_nums에게 분배해주는 코드입니다.
      • 이 코드에서 rand_nums로 저장된 난수 array를 process마다 sub_rand_nums안에 균등한 개수만큼 갖고 있게 되는 것 입니다.
  1. sub_avg 변수에서는 process 당 할당된 sub_rand_nums의 평균값을 갖습니다.
  1. 마지막으로 MPI_Gather에서의 코드를 살펴보겠습니다.
    1. float *sub_avgs = NULL; if (world_rank == 0) { sub_avgs = malloc(sizeof(float) * world_size); } MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
      • sub_avgs 포인터는 각 process마다 계산된 평균 값을 여러 개 갖고 있기 위해 사용됩니다.
      • world_size(프로세스 개수)만큼 할당된 sub_avgs는 MPI_Gather에서 호출됩니다.
      • MPI_Gather함수에서 호출된 sub_avgs는 각 process마다 존재하는 sub_avg의 값을 적재합니다.
>>>> 코드 결과
/bin/mpirun -n 4 ./avg 100 Avg of all elements is 0.436742 Avg computed across original data is 0.436742
 

MPI_Allgather를 활용한 average computation 코드 수정

>> 앞서 우리는 MPI_Scatter와 MPI_Gather를 사용해서 average 코드를 구현했습니다.
>> 하지만, 이 패턴을 우리는 하나의 MPI 함수를 사용해서 똑같이 만들 수 있습니다.
원래 MPI_Scatter와 MPI_Gather를 사용한 패턴은 흔히 Many-to-one나 one-to-many 형태의 통신이 필요할 때 쓰입니다. 하지만 Many-to-many가 필요한 경우, 우리는 MPI_Allgather를 사용합니다.
notion image
  • MPI_Allgather는 모든 process에게 모든 element(data)들을 합친 것을 보낼 것 입니다.
💡
이 뜻은 MPI_Gather를 적용한 후, MPI_Bcast를 사용한 것과 같다고 볼 수 있어요 ❗
  • 함수의 원형을 살펴보면 아래와 같습니다.
    • MPI_Allgather( void* send_data, int send_count, MPI_Datatype send_datatype, void* recv_data, int recv_count, MPI_Datatype recv_datatype, MPI_Comm communicator)
    • 여기서 기존에 본 MPI_Scatter나 MPI_Gather와의 차이점은 root process가 따로 명시되지 않았다는 점 입니다.
  • 수정된 코드를 살펴보겠습니다.
    • float *sub_avgs = (float *)malloc(sizeof(float) * world_size); MPI_Allgather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, MPI_COMM_WORLD); // Compute the total average of all numbers. float avg = compute_avg(sub_avgs, world_size);
    • 확실히 기존 코드와 차이점은 MPI_Allgather에서는 모든 process마다 sub_avgs안에 값이 다 똑같이 구성되어 있기에 아래와 같은 출력 결과가 나옵니다.
      • Avg of all elements from proc 3 is 0.436742 Avg of all elements from proc 2 is 0.436742 Avg of all elements from proc 0 is 0.436742 Avg of all elements from proc 1 is 0.436742
 

결론

이번 시간에는 MPI 관련 기초적인 내용들을 다뤄봤습니다. 사실 AllReduce쪽을 더 깊게 다뤄야하는데, 관련 지식을 더 세분하게 준비하기 어렵다고 판단이 들어서 여기까지 준비했습니다,,ㅎㅎ 다음 시간에는 Model inference 관련 최적화 방법론에 대해 다룰 수 있도록 준비해오겠습니다 🙂
다루는 주제들이 서로 안 맞아보일 수 있는데, 저는 L 서빙 시스템 구현 쪽에 관심이 있어서 조금 다양한 분야들을 다룰 예정인 점 미리 양해 부탁드립니다.
다음에는 더 매끄러운 글로 돌아오도록 해보겠습니다!
Share article

allaboutml