Python 비동기 스케줄링
ML 엔지니어의 입장에서, 비동기 프로그래밍과 스케줄링은 데이터 처리, 모델 훈련 및 예측 작업을 자동화하는 데 필수적인 기술입니다. 이 글에서는 비동기 프로그래밍과 스케줄링에 대한 개념, Python의 asyncio와 APScheduler를 사용한 구현 방법에 대해 알아봅니다.
Mar 16, 2024
서론
지속적으로 데이터를 수집하고 학습되는 모델을 관리해야하는 ML 엔지니어라면, 데이터 수집, 전처리, 모델 학습, 추론 등 실행 시간이 몇 초부터 몇 시간에 이르는 작업들을 관리하기 위해 비동기 스키줄링을 구현해야 할 필요가 있습니다. 이 글에서는 Python을 통한 비동기 프로그래밍과 비동기 스케줄링에 대한 개념과 간단한 구현을 진행해봅니다.
I/O 작업 요약
I/O 작업(Input/Output 작업)은 데이터를 입력하고 결과를 출력하는 모든 프로세스를 통칭하는 개념이며, 파일 시스템 작업, 네트워크 요청, DB 쿼리 등의 작업을 포함합니다. User space의 프로세스가 I/O 작업을 요청했을 경우, 시스템 호출이 어떤 방식으로 이루어지는지에 대해 blocking/non-blocking, Synchronous/Asynchronous 로 구분해 4가지의 케이스로 구분할 수 있습니다.
- blocking: I/O 작업이 진행되는 동안 다른 작업을 수행할 수 없음
- non-blocking: I/O 작업이 완료되지 않더라도 다음 작업으로 넘어갈 수 있음
- Synchronous: 한 작업이 완료된 후 순차적으로 다음 작업 진행
- Asynchronous: 한 작업이 완료되지 않더라도 다음 작업 진행 가능
비동기 프로그래밍
동기 프로그래밍은 작업을 순차적으로 실행하며, 한 작업이 완료될 때까지 다음 작업이 대기하는 방식입니다. 반면, 비동기 프로그래밍은 여러 작업이 동시에 실행될 수 있으며, 각 작업은 서로의 완료를 기다리지 않습니다. 즉, non-blocking & Asynchronous 영역의 프로그래밍 방식이며, 주로 싱글 스레드 환경에서 사용됩니다(멀티 스레딩과는 다른 개념).
Python에서는
asyncio
모듈을 통해 비동기 프로그래밍을 구현할 수 있습니다. asyncio
는 비동기 I/O, 이벤트 루프, 코루틴 및 태스크를 지원하며, async
와 await
를 사용해 비동기 함수를 쉽게 정의하고 관리할 수 있습니다.- 이벤트 루프(Event Loop)
- 프로그램 실행 동안 발생하는 이벤트를 관리하고, 프로그램의 메인 루프를 제어
asyncio.run()
함수를 사용해 자동으로 이벤트 루프를 생성하고 시작할 수 있음
- 코루틴(Coroutine)
- 비동기 프로그래밍에서 사용되는 동시 실행 가능한 코드의 단위
- 비동기 작업의 시작점이며, 이벤트 루프에 의해 실행되고 관리됨
async def
로 정의되며,await
표현식을 사용하여 실행을 일시 중단하고, 이벤트 루프가 다른 작업을 수행할 수 있도록 허용
- 태스크(Task)
- 코루틴을 이벤트 루프에서 실행할 수 있도록 래핑하는 클래스
- 코루틴을 태스크로 스케줄링함으로써, 코루틴이 비동기적으로 실행될 수 있음
asyncio.create_task()
함수를 사용하여 코루틴을 태스크로 만듦
비동기 스케줄링
스케줄링은 작업을 특정 시간에 실행되도록 예약하는 것이고, Python에서는
APScheduler
라이브러리를 통해 스케줄링을 구현할 수 있습니다. APScheduler
에서 제공하는 AsyncIOScheduler
와 asyncio
를 함께 사용하면 비동기 스케줄링을 구현할 수 있습니다. AsyncIOScheduler
는 APScheduler의 비동기 스케줄러로, asyncio
이벤트 루프와 함께 작동하도록 설계되어 있습니다. from apscheduler.schedulers.asyncio import AsyncIOScheduler import asyncio import time async def async_task(): print("메인 비동기 작업: ", time.strftime("%Y-%m-%d %H:%M:%S")) await asyncio.sleep(2) print("메인 비동기 작업 완료: ", time.strftime("%Y-%m-%d %H:%M:%S")) async def additional_async_task(): while True: print("추가 비동기 작업: ", time.strftime("%Y-%m-%d %H:%M:%S")) await asyncio.sleep(1) async def main(): scheduler = AsyncIOScheduler() scheduler.add_job(async_task, 'interval', seconds=5) scheduler.start() task = asyncio.create_task(additional_async_task()) try: await task except asyncio.CancelledError: pass if __name__ == '__main__': asyncio.run(main()) #### 결과 예시 #### # 추가 비동기 작업: 2024-03-17 01:37:17 # 추가 비동기 작업: 2024-03-17 01:37:18 # 추가 비동기 작업: 2024-03-17 01:37:19 # 추가 비동기 작업: 2024-03-17 01:37:20 # 추가 비동기 작업: 2024-03-17 01:37:21 # 메인 비동기 작업: 2024-03-17 01:37:22 # 추가 비동기 작업: 2024-03-17 01:37:22 # 추가 비동기 작업: 2024-03-17 01:37:23 # 메인 비동기 작업 완료: 2024-03-17 01:37:24 # 추가 비동기 작업: 2024-03-17 01:37:24 # 추가 비동기 작업: 2024-03-17 01:37:25 # ...
async_task
함수는 2초 동안 비동기적으로 대기하도록 하며, AsyncIOScheduler를 통해 5초 간격으로 실행되도록 했습니다.additional_async_task
함수는 1초 간격으로 작업이 이루어지는 태스크를 모방하도록 했습니다.asyncio
의 create_task
메서드를 사용하여 additional_async_task
를 반복적으로 실행하는 새로운 비동기 작업을 생성했고, 코드를 실행하면 두 작업이 비동기로 실행되는 것을 확인할 수 있습니다.References
- ChatGPT
Share article