AWS Kinesis - Firehose Introduce (1)
Introduce & Tutorial : Stream Create & Transform source data
Sep 23, 2024
AWS Kinesis FirehoseData Firehose 데이터 흐름Stream 생성1. 소스 설정 구성2. 레코드 변환 및 형식 변환 구성 (Optional)3. 목적지 설정 구성4. 백업 설정 구성5. 고급 설정 구성소스 데이터 변환데이터 변환에 필요한 매개변수데이터 변환 실패 처리
Kinesis Firehose는 양이 많아서 2 챕터로 나눠서 설명됩니다. 1 Chapter는 데이터 흐름과 생성 그리고 소스 데이터를 다루고, 2 Chapter는 Streaming Data Prititioning과 입력 데이터 형식 변환 등을 다룹니다.
AWS Kinesis Firehose
Amazon Data Firehose는 실시간 스트리밍 데이터를 제공하기 위한 완전 관리형 서비스입니다. S3, Redshift, Datadog, MongoDB 등 다양한 공급자와 통합되며 사용자가 지정한 HTTP 엔드포인트 또는 HTTP 엔드포인트와 같은 목적지로 전송합니다. 데이터 생산자가 Amazon Data Firehose로 데이터를 보내도록 구성하면 지정된 목적지로 데이터가 자동으로 전송됩니다. 또한 Amazon Data Firehose를 구성하여 데이터를 전송하기 전에 데이터를 변환할 수도 있습니다.
- Firehose Stream
Amazon Data Firehose의 기본 엔터티입니다. Firehose 스트림을 생성한 다음 데이터를 전송하여 Amazon Data Firehose를 사용합니다.
- Record
Producer가 Firehose 스트림으로 보내는 데이터입니다. 최대 1,000KB까지 가능합니다.
- Producer
Firehose Stream으로 데이터를 보내는 생산자(웹 서버)
Data Firehose 데이터 흐름
Firehos의 데이터는 다양하게 흐를 수 있습니다.
Kinesis Firehose로 전달된 데이터 스트림은 변환(transformed) 과정을 거쳐 S3나 Redshift 등 AWS 내의 리소스에 저장될 수 있습니다.
중요한 것은 Data Firehose가 데이터 스트림을 AWS 데이터 저장소에 적재하기에 최적화되어 있다는 점 입니다.
Stream 생성
1. 소스 설정 구성
가장 먼저 소스와 대상을 선택해야 합니다.
Firehose 스트림으로 정보를 보내기 위해 선택한 소스에 따라 소스 설정을 구성할 수 있습니다.
- Amazon MSK (Kafka)
- Amazon Kinesis Data Stream
- Direct PUT (소스 설정 없음)
Firehose가 지원하는 Stream 대상자
- Amazon OpenSearch 서비스
- Amazon OpenSearch 서버리스
- 아마존 레드시프트
- 아마존 S3
- 아파치 아이스버그 테이블
- 코랄로직스
- 데이터독
- 다이나트레이스
- 탄력 있는
- HTTP 엔드포인트
- 벌집
- 로직 모니터
- 로그즈.아이오
- 몽고DB 클라우드
- 새로운 유물
- 스플렁크
- Splunk 관찰성 클라우드
- 수모 로직
- 눈송이
2. 레코드 변환 및 형식 변환 구성 (Optional)
- Firehose 스트림의 소스로 Amazon MSK를 선택하는 경우
- AWS Lambda를 통한 소스 레코드 변형 섹션을 제공합니다.
- AWS Lambda에는 Payload 할당량이 6MB이 있습니다. 이는 Lambda 함수로 전달되는 데이터의 크기가 최대 6MB까지 허용된다는 의미입니다. 버퍼 크기가 작을수록 데이터가 확장될 경우 더 많은 공간을 확보할 수 있습니다.
- 버퍼 간격은 최소 0초 ~ 최대 900초 입니다.
- 레코드 형식 변환을 제공합니다.
- 일반적으로 JSON 보다 Apache Parquet 또는 Apache ORC 형식의 데이터가 JSON 같은 행 기준의 데이터보다 쿼리하기 쉽습니다. Firehose는 MSK를 통해 들어온 데이터 레코드를 AWS Glue에서 정의한 데이터 테이블 스키마를 사용하여 JSON ⇒ Parquert / ORC 형태로 데이터를 변환하여 S3에 저장하는 것을 지원합니다.
- MSK의 경우 대상이 S3만 지정할 수 있습니다.
- Firehose 스트림의 소스로 Apache Flink 또는 Direct PUT에 대한 관리 서비스를 선택하는 경우
- AWS Lambda를 통한 소스 레코드 변형 섹션을 제공합니다.
- 대상이 S3나 Splunk인 경우 압축 해제 소스 레코드 Amazon CloudWatch Logs 섹션 에서 압축 해제 켜기를 선택합니다.
- 그 외에는 MSK의 경우와 같습니다.
- 레코드 형식 변환을 제공합니다.
- MSK의 경우 대상이 S3 하나 뿐이라 형식 변환이 제공이 디폴트로 선택옵션으로 존재하지만, MSK가 아닌 소스를 선택하는 경우 대상이 S3가 아니라면 레코드 형식 변환을 제공하지 않습니다.
- 그 외에는 MSK의 경우와 같습니다.
3. 목적지 설정 구성
다양한 대상(목적지)가 있겠지만, 여기서는 S3와 Redshift만 살펴보겠습니다. 이 게시물에 대한 성격이 Introduce보단 Practice를 위한 사전 CaseStudy에 가깝기 때문에 사용하려는 주 리소스만 다루겠습니다.
- S3
- S3 버킷을 선택합니다.
- 새 줄 구분 기호
- 새 줄 구분 기호는 일반적으로 텍스트 파일이나 프로그래밍 언어에서 줄을 구분하는 데 사용되는 특수 문자입니다.
- Windows: 캐리지 리턴(CR)과 라인 피드(LF)의 조합인
\r\n
을 사용합니다. - Unix/Linux 및 macOS: 라인 피드(LF)인
\n
을 사용합니다. - Old Mac OS: 캐리지 리턴(CR)인
\r
을 사용했습니다. - Athena를 사용하여 집계된 레코드가 있는 S3 객체를 쿼리하려는 경우 이 옵션을 활성화합니다.
- Dynamic Partitioning
- 동적 파티셔닝을 활성화하면 파티셔닝 키를 기반으로 스트리밍 S3 데이터를 파티셔닝하여 데이터 셋을 생성할 수 있습니다.
- 대신 기존 Firehose 스트림이 제공하는 동적 파티셔닝 기능을 사용할 수 없습니다.
- 동적 파티셔닝을 활성화하면 파티셔닝한 데이터의 GiB당 추가 비용이 발생합니다.
- Kinesis Firehose 스트림이 데이터를 받을 때, Firehose는 그 데이터를 구문 분석하여 유효한 JSON 형식이나 줄 바꿈 같은 구분 기호를 기준으로 여러 개의 레코드로 분리할 수 있습니다.
- 여러 이벤트나 로그 데이터를 한 번의 API 호출(PutRecord 또는 PutRecordBatch)을 통해 모아 처리하는 방식에도 Dynamic Partitioning을 설정하고 구성할 수 있습니다.
- 집계된 데이터를 Firehose가 받을 때, Dynamic Partitioning이 활성화되면 Firehose는 해당 데이터를 분석하여 API 호출 내에서 유효한 JSON 객체를 찾아냅니다.
- Firehose가 Kinesis Data Stream을 소스로 사용할 경우, Kinesis Producer Library(KPL)를 이용해 기본적인 데이터 집계 기능을 사용할 수 있습니다.
- Dynamic Partitioning은 데이터가 집계 해제된 후에 작동합니다. 즉, 데이터를 한 번에 모아 놓은 상태에서 분리된 개별 레코드들이 먼저 처리된 다음 파티셔닝이 이루어집니다.
- 각 API 호출에서 처리된 레코드를 각각의 Amazon S3 접두사(prefix)에 맞춰 다른 위치로 보낼 수 있습니다.
- Lambda 함수를 Firehose와 통합하여 데이터를 파티셔닝하기 전에 다른 형태의 집계 해제나 데이터 변환 작업을 수행할 수 있습니다.
- 데이터가 집계된 경우, Dynamic Partitioning은 집계가 해제된 이후에만 적용이 가능합니다. 즉, 데이터를 모아둔 상태에서는 Dynamic Partitioning이 불가능하고, 데이터를 개별 레코드로 분리한 후에만 적용할 수 있습니다.
- 집계된 데이터에서 Dynamic Partitioning을 사용하려면 다중 레코드 집계 해제를 사용해야합니다.
- 다중 레코드 분해 집계를 활성화한 경우 Firehose가 데이터를 분해하는 방법을 지정해야 합니다.
- 이 함수를 사용하여 S3에 바인딩된 데이터를 동적으로 분할할 수 있으며
- Lambda 함수는 Amazon S3에 저장될 데이터를 동적 파티셔닝할 수 있습니다. 동적 파티셔닝은 데이터를 특정 키(날짜, 지역 등)에 따라 분리하여 S3의 여러 폴더로 나누는 과정입니다. 즉, Lambda 함수가 데이터를 변환할 뿐만 아니라 S3에 저장될 때 어떻게 데이터를 분할할지도 결정할 수 있습니다.
- 인라인 파싱으로 여전히 분할 키를 만들 수 있습니다.
- 인라인 파싱을 통해서도 데이터를 분석하고 분할 키를 생성할 수 있습니다. 인라인 파싱은 Lambda 함수를 사용하지 않고도 Firehose 자체적으로 데이터를 분석하여 분할 키(예: 날짜, 카테고리)를 만들 수 있는 기능입니다.
- 동적 분할을 사용하면 인라인 파싱 또는 AWS Lambda 함수를 사용하여 분할 키를 만들 수 있습니다.
- 동적 파티셔닝을 활성화할 경우, 데이터를 나누는 방식은 두 가지로 설정할 수 있습니다:
- 인라인 파싱을 통해 Firehose가 자동으로 데이터를 분석하고 분할 키를 생성하는 방식
- Lambda 함수를 통해 데이터를 변환하고 분할 키를 생성하는 방식 이 두 가지 중 하나를 선택해서 사용할 수 있습니다.
- 또는 인라인 파싱과 AWS Lambda 함수를 동시에 사용하여 분할 키를 만들 수 있습니다.
- 인라인 파싱과 Lambda 함수를 동시에 사용하여 데이터를 처리할 수도 있습니다. 즉, 일부 데이터를 인라인 파싱을 통해 처리하고, 다른 부분은 Lambda 함수를 사용해 처리할 수 있습니다.
- (JSON) 인라인 구문 분석이 활성화된 경우
- 위에서 언급된 JSON 인라인 구문 분석을 의미합니다. Kinesis Firehose가 들어오는 데이터를 미리 지정한 기준(예: JSON 구조)으로 자동 분석합니다.
- 유효한 JQ 표현식을 지정하고
- JQ 표현식은 JSON 데이터를 처리하고 변환하는 데 사용되는 일종의 필터링 언어입니다. 데이터를 분석하고 파싱할 때 사용할 수 있는 규칙을 정의한 것입니다.
- Amazon S3 버킷 접두사에 지정된 동적 파티셔닝 키를 사용해야 합니다.
- 동적 파티셔닝이란, 데이터를 특정 조건에 따라 여러 폴더(접두사)로 나눠 저장하는 기능입니다. Amazon S3에 데이터를 저장할 때 **접두사(prefix)**는 해당 데이터가 저장되는 폴더 경로의 일부를 의미합니다.
- 동적 파티셔닝을 활성화하면 데이터를 구문 분석한 결과에 따라 데이터를 다양한 S3 폴더에 나눠서 저장할 수 있습니다. 이때 파티셔닝 키는 데이터를 어떻게 나눌지를 결정하는 기준이 됩니다. 예를 들어, 날짜, 지역, 또는 사용자 ID 같은 키를 기반으로 데이터를 나눌 수 있습니다.
- 동적 분할을 올바르게 구성하려면 S3 버킷 접두사의 수가 지정된 분할 키의 수와 동일해야 합니다.
- 인라인 파싱 또는 지정된 AWS Lambda 함수를 사용하여 소스 데이터를 분할할 수 있습니다.
- 소스 데이터에 대한 분할 키를 생성하기 위해 AWS Lambda 함수를 지정한 경우 다음 형식을 사용하여 S3 버킷 접두사 값을 수동으로 입력해야 합니다.
- "partitionKeyFromLambda:keyID"
- 인라인 파싱을 사용하여 소스 데이터에 대한 분할 키를 지정하는 경우 다음 형식을 사용하여 S3 버킷 미리 보기 값을 수동으로 입력할 수 있습니다.
- "partitionKeyFromQuery:keyID”
- 인라인 파싱 또는 AWS Lambda를 사용하여 데이터를 분할하는 동안 S3 버킷 접두사에서 다음 표현식 형식을 사용할 수도 있습니다.
- { namespace:value}, 여기서 namespace는 partitionKeyFromQuery 또는 partitionKeyFromLambda일 수 있습니다.
- S3 압축
- GZIP, Snappy, Zip 또는 Hadoop 호환 Snappy 데이터 압축을 선택하거나 데이터 압축을 사용하지 않습니다.
- Snappy, Zip 및 Hadoop 호환 Snappy 압축은 Amazon Redshift를 대상으로 하는 Firehose 스트림에는 사용할 수 없습니다.
Dynamic Partitioning의 다중 레코드 집계 해제(Multi record deaggregation)
JSON을 위한 인라인 구문 분석
공식 문서에는 다음과 같이 설명되있습니다.
위의 단계에서 소스 레코드를 변환하기 위해 AWS Lambda 함수를 지정한 경우 이 함수를 사용하여 S3에 바인딩된 데이터를 동적으로 분할할 수 있으며 인라인 파싱으로 여전히 분할 키를 만들 수 있습니다. 동적 분할을 사용하면 인라인 파싱 또는 AWS Lambda 함수를 사용하여 분할 키를 만들 수 있습니다. 또는 인라인 파싱과 AWS Lambda 함수를 동시에 사용하여 분할 키를 만들 수 있습니다. - Firehose Docs
하나씩 풀이하면 이런 의미입니다.
Dynamic Partitioning Key
인라인 구문 분석이 활성화된 경우 성공적인 Firehose 스트림을 생성하려면 유효한 JQ 표현식을 지정하고 Amazon S3 버킷 접두사에 지정된 동적 파티셔닝 키를 사용해야 합니다. - AWS Console Firehose Target S3 Description
하나씩 풀이하면 다음과 같은 의미를 가집니다.
S3 버킷 접두사(Prefix)
- Redshift 프로비저닝 클러스터
- Cluster
- Cluster를 선택하고, Amazon Redshift 클러스터를 공개적으로 액세스할 수 있도록 구성하고 Amazon Data Firehose IP 주소 차단을 해제합니다.
- Authentication
- username : Amazon Redshift 클러스터에 액세스할 수 있는 권한이 있는 Amazon Redshift 사용자를 지정합니다. 이 사용자는 S3 버킷에서 Amazon Redshift 클러스터로 데이터를 복사하기 위한 Amazon Redshift 권한이 있어야 합니다.
- password : 클러스터에 액세스할 수 있는 권한이 있는 사용자의 비밀번호를 지정합니다.
- secret : Amazon Redshift에 액세스 권한이 있는 secret을 AWS Secrets Manager에서 선택합니다.
- Database : 데이터가 Copy되는 Amazon Redshift 데이터베이스입니다.
- Table : 데이터가 Copy되는 Amazon Redshift 테이블
- Column (Optinal) : Amazon S3 객체에 정의된 열 수가 Amazon Redshift 테이블 내의 열 수보다 적은 경우 이 옵션을 사용합니다.
- 중간 S3 목적지(Target)
- Firehose는 먼저 S3 버킷에 데이터를 전달한 다음 Amazon Redshift COPY 명령을 실행하여 데이터를 Amazon Redshift 클러스터에 로드합니다.
- 스트리밍 데이터를 전달해야 하는 본인이 소유한 S3 버킷을 지정합니다.
- Copy 명령 및 옵션 ⇒ https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html 참조
- 재시도 기간
- Amazon Redshift 클러스터로의 데이터 COPY가 실패할 경우 Firehose가 재시도하는 시간 기간(0~7200초)
- Firehose는 재시도 기간이 끝날 때까지 5분마다 재시도합니다.
- 재시도 기간을 0(영)초로 설정하면 Firehose는 COPY 명령이 실패할 때 재시도하지 않습니다.
4. 백업 설정 구성
Amazon Data Firehose는 Amazon S3를 사용하여 선택한 대상지에 전송하려고 시도한 모든 데이터 또는 전송에 실패한 데이터만 백업합니다.
- 백업 설정은 Firehose 스트림의 소스가 Direct PUT 또는 Kinesis Data Streams인 경우에만 지원됩니다.
- 제로 버퍼링 기능은 애플리케이션 대상에만 사용할 수 있으며 Amazon S3 백업 대상에는 사용할 수 없습니다.
- 다음 선택 사항 중 하나를 선택한 경우 Firehose 스트림에 대한 S3 백업 설정을 지정할 수 있습니다.
- Firehose 스트림의 대상으로 Amazon S3를 설정하고 데이터 레코드를 변환하기 위한 AWS Lambda 함수를 지정하거나 Firehose 스트림의 데이터 레코드 형식을 변환하기로 선택한 경우
- Firehose 스트림의 대상으로 Amazon Redshift를 설정하고 데이터 레코드를 변환하기 위한 AWS Lambda 함수를 지정하기로 선택한 경우
- 다음 서비스 중 하나를 Firehose 스트림의 대상으로 설정한 경우 - Amazon OpenSearch Service, Datadog, Dynatrace, HTTP Endpoint, LogicMonitor, MongoDB Cloud, New Relic, Splunk 또는 Sumo Logic, Snowflake, Apache Iceberg Tables.
5. 고급 설정 구성
소스 데이터 변환
Lambda에서 변환된 모든 레코드는 다음 매개변수를 포함해야 하며, 그렇지 않으면 Amazon Data Firehose에서 해당 레코드를 거부하고 데이터 변환 실패로 처리합니다.
데이터 변환에 필요한 매개변수
- Kinesis Data Streams 및 Direct PUT의 경우
- recordId
- 호출 중에 레코드 ID가 Amazon Data Firehose에서 Lambda로 전달됩니다. 변환된 레코드에는 동일한 레코드 ID가 포함되어야 합니다. 원래 레코드의 ID와 변환된 레코드의 ID가 일치하지 않으면 데이터 변환 실패로 처리됩니다.
- result
- 레코드의 데이터 변환 상태입니다. 가능한 값은 다음과 같습니다.
- Ok(레코드가 성공적으로 변환되었습니다),
- Dropped(레코드가 처리 논리에 의해 의도적으로 삭제되었습니다),
- ProcessingFailed(레코드를 변환할 수 없습니다).
- 레코드의 상태가 Ok또는 Dropped이면 Amazon Data Firehose는 성공적으로 처리된 것으로 간주합니다. 그렇지 않으면 Amazon Data Firehose는 처리에 실패한 것으로 간주합니다.
- data
- base64 인코딩 후 변환된 데이터 페이로드.
- MSK
- recordId
- 호출 중에 레코드 ID가 Amazon Data Firehose에서 Lambda로 전달됩니다. 변환된 레코드에는 동일한 레코드 ID가 포함되어야 합니다. 원래 레코드의 ID와 변환된 레코드의 ID가 일치하지 않으면 데이터 변환 실패로 처리됩니다.
- result
- 레코드의 데이터 변환 상태입니다. 가능한 값은 다음과 같습니다.
- Ok(레코드가 성공적으로 변환되었습니다),
- Dropped(레코드가 처리 논리에 의해 의도적으로 삭제되었습니다),
- ProcessingFailed(레코드를 변환할 수 없습니다).
- 레코드의 상태가 Ok또는 Dropped이면 Amazon Data Firehose는 성공적으로 처리된 것으로 간주합니다. 그렇지 않으면 Amazon Data Firehose는 처리에 실패한 것으로 간주합니다.
- kafkaRecordValue
- base64 인코딩 후 변환된 데이터 페이로드.
데이터 변환 실패 처리
Lambda 함수 호출이 네트워크 시간 초과나 Lambda 호출 제한에 도달하여 실패하는 경우 Amazon Data Firehose는 기본적으로 호출을 세 번 재시도합니다. 호출이 성공하지 못하면 Amazon Data Firehose는 해당 레코드 배치를 건너뜁니다. 건너뛴 레코드는 처리가 실패한 레코드로 처리됩니다. CreateDeliveryStream 또는 API를 사용하여 재시도 옵션을 지정하거나 재정의할 수 있습니다.
데이터 변환이 실패하면, 처리가 실패한 레코드는 폴더의 S3 버킷에 전달됩니다
processing-failed
.{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade
시도된 호출 요청 수.
arrivalTimestamp
Amazon Data Firehose가 레코드를 수신한 시간입니다.
errorCode
Lambda가 반환한 HTTP 오류 코드.
errorMessage
Lambda가 반환한 오류 메시지입니다.
attemptEndingTimestamp
Amazon Data Firehose가 Lambda 호출 시도를 중단한 시간입니다.
rawData
base64로 인코딩된 레코드 데이터.
lambdaArn
Lambda 함수의 Amazon 리소스 이름(ARN)입니다.
Share article