AWS Kinesis - Firehose Introduce (2)

Partition streaming data & Convert input data format & etc …
김주혁's avatar
Sep 23, 2024
AWS Kinesis - Firehose Introduce (2)
 
 
Kinesis Data Firehose 2 Chapter에서는 데이터 파티셔닝과 형식 변환 등을 다루고 있습니다.
 
Kinesis To Redshift
Kinesis To Redshift

Data Partitioning

 
일반적으로 Kinesis Firehose를 사용하여 아무런 파티셔닝 키 지정 없이 저장하게 되면 다음과 같은 형태로
S3 / Buckets / S3 bucket name / Year / Month / Day / Hour / … 형태의 Prefix를 가지게 됩니다.
 
만약 특정 카테고리(항목) 혹은 특정한 id 값 기준으로 S3 prefix를 가지게 하고 싶을 때 사용하는 것이 바로 Firehose에서 지원하는 Dynamic Partitioning입니다.
 
동적 파티셔닝을 사용하면 데이터 내의 키(예: customer_id또는 transaction_id)를 사용하여 Firehose에서 스트리밍 데이터를 지속적으로 파티셔닝한 다음 이러한 키로 그룹화된 데이터를 해당 Amazon Simple Storage Service(Amazon S3) 접두사(Prefix)로 전달할 수 있습니다.
 
데이터를 분할하면 스캔되는 데이터 양이 최소화되고, 성능이 최적화되며, Amazon S3에서 분석 쿼리 비용이 절감됩니다.
 

Enable Dynamic Partitioning

 
Firehose를 생성할 때 동적 파티셔닝을 활성화 할 수 있습니다.
  • 단, 동적 파티셔닝이 이미 활성화되지 않은 기존 Firehose 스트림에 대해서는 동적 파티셔닝을 활성화할 수 없습니다.
  • Firehose 스트림에서 동적 분할을 활성화하면 해당 Firehose 스트림에서 동적 분할을 비활성화할 수 없습니다.
즉, 생성할 때 지정된 파티셔닝 활성화 옵션은 Firehose가 삭제될 때 까지 생명주기를 같이 하게 됩니다.
 

Understand Partitioning Key

 
Dynamic Partitioning이라고 하면 무언가 있어 보이는 말처럼 느껴지겠지만, 사실 S3에 저장할 때 특정 Prefix를 지정하여 저장하는 것일 뿐입니다. Prefix를 지정하여 저장하기 때문에 자연스럽게 데이터 셋이 분리되어 관리되는 장점을 가지고 있습니다.
 
예를들어, customer Id와 country에 따라 필터링해야 하는 경우(데이터를 고객과 국가로 분리하고 싶은 경우)
  • 첫 번째 파티셔닝 키로 customer_id 필드를 사용할 수 있습니다. 즉, 데이터에서 customer_id 필드를 기준으로 데이터를 나누어 저장합니다.
  • 두 번째 파티셔닝 키로 country 필드를 지정할 수 있습니다. 즉, 데이터를 고객의 국가 정보에 따라 추가적으로 나눌 수 있습니다.
  • 데이터를 어떻게 구분할지를 나타내는 표현식을 정의해야 합니다. 이 표현식은 유효한 JQ 표현식이나 다른 필터링 방법을 사용할 수 있으며, 데이터 필드에 따라 동적으로 데이터를 구분하기 위한 규칙을 설정하는 것입니다.
    • 이 표현식에 따라 S3 버킷 접두사(prefix)가 정의됩니다. 예를 들어, customer_idcountry를 파티셔닝 키로 사용할 경우, S3에 저장될 경로가 s3://your-bucket/customer_id=123/country=USA/와 같은 방식으로 설정됩니다. 이렇게 하면 S3에 저장된 데이터가 고객 ID국가별로 자연스럽게 구분됩니다.
 
다음 방법으로 파티셔닝 키를 생성할 수 있습니다.
  • 인라인 파싱 - 이 방법은 Firehose 내장 지원 메커니즘인 jq 파서를 사용합니다., JSON 형식의 데이터 레코드에서 분할을 위한 키를 추출합니다. 현재는 jq 1.6 버전만 지원합니다.
    • JSON을 위한 인라인 구문 분석
       
      공식 문서에는 다음과 같이 설명되있습니다.
      위의 단계에서 소스 레코드를 변환하기 위해 AWS Lambda 함수를 지정한 경우 이 함수를 사용하여 S3에 바인딩된 데이터를 동적으로 분할할 수 있으며 인라인 파싱으로 여전히 분할 키를 만들 수 있습니다. 동적 분할을 사용하면 인라인 파싱 또는 AWS Lambda 함수를 사용하여 분할 키를 만들 수 있습니다. 또는 인라인 파싱과 AWS Lambda 함수를 동시에 사용하여 분할 키를 만들 수 있습니다. - Firehose Docs
       
      하나씩 풀이하면 이런 의미입니다.
       
      • 이 함수를 사용하여 S3에 바인딩된 데이터를 동적으로 분할할 수 있으며
        • Lambda 함수는 Amazon S3에 저장될 데이터를 동적 파티셔닝할 수 있습니다. 동적 파티셔닝은 데이터를 특정 키(날짜, 지역 등)에 따라 분리하여 S3의 여러 폴더로 나누는 과정입니다. 즉, Lambda 함수가 데이터를 변환할 뿐만 아니라 S3에 저장될 때 어떻게 데이터를 분할할지도 결정할 수 있습니다.
      • 인라인 파싱으로 여전히 분할 키를 만들 수 있습니다.
        • 인라인 파싱을 통해서도 데이터를 분석하고 분할 키를 생성할 수 있습니다. 인라인 파싱은 Lambda 함수를 사용하지 않고도 Firehose 자체적으로 데이터를 분석하여 분할 키(예: 날짜, 카테고리)를 만들 수 있는 기능입니다.
      • 동적 분할을 사용하면 인라인 파싱 또는 AWS Lambda 함수를 사용하여 분할 키를 만들 수 있습니다.
        • 동적 파티셔닝을 활성화할 경우, 데이터를 나누는 방식은 두 가지로 설정할 수 있습니다:
          • 인라인 파싱을 통해 Firehose가 자동으로 데이터를 분석하고 분할 키를 생성하는 방식
          • Lambda 함수를 통해 데이터를 변환하고 분할 키를 생성하는 방식 이 두 가지 중 하나를 선택해서 사용할 수 있습니다.
      • 또는 인라인 파싱과 AWS Lambda 함수를 동시에 사용하여 분할 키를 만들 수 있습니다.
        • 인라인 파싱과 Lambda 함수를 동시에 사용하여 데이터를 처리할 수도 있습니다. 즉, 일부 데이터를 인라인 파싱을 통해 처리하고, 다른 부분은 Lambda 함수를 사용해 처리할 수 있습니다.
      Dynamic Partitioning Key
       
      인라인 구문 분석이 활성화된 경우 성공적인 Firehose 스트림을 생성하려면 유효한 JQ 표현식을 지정하고 Amazon S3 버킷 접두사에 지정된 동적 파티셔닝 키를 사용해야 합니다. - AWS Console Firehose Target S3 Description
       
      하나씩 풀이하면 다음과 같은 의미를 가집니다.
       
      • (JSON) 인라인 구문 분석이 활성화된 경우
        • 위에서 언급된 JSON 인라인 구문 분석을 의미합니다. Kinesis Firehose가 들어오는 데이터를 미리 지정한 기준(예: JSON 구조)으로 자동 분석합니다.
      • 유효한 JQ 표현식을 지정하고
        • JQ 표현식은 JSON 데이터를 처리하고 변환하는 데 사용되는 일종의 필터링 언어입니다. 데이터를 분석하고 파싱할 때 사용할 수 있는 규칙을 정의한 것입니다.
      • Amazon S3 버킷 접두사에 지정된 동적 파티셔닝 키를 사용해야 합니다.
        • 동적 파티셔닝이란, 데이터를 특정 조건에 따라 여러 폴더(접두사)로 나눠 저장하는 기능입니다. Amazon S3에 데이터를 저장할 때 **접두사(prefix)**는 해당 데이터가 저장되는 폴더 경로의 일부를 의미합니다.
        • 동적 파티셔닝을 활성화하면 데이터를 구문 분석한 결과에 따라 데이터를 다양한 S3 폴더에 나눠서 저장할 수 있습니다. 이때 파티셔닝 키는 데이터를 어떻게 나눌지를 결정하는 기준이 됩니다. 예를 들어, 날짜, 지역, 또는 사용자 ID 같은 키를 기반으로 데이터를 나눌 수 있습니다.
  • AWS Lambda 함수 – 이 방법은 지정된 AWS Lambda 함수를 사용하여 파티셔닝에 필요한 데이터 필드를 추출하여 반환합니다.
 

Amazon S3 버킷 Prefix를 사용하여 데이터 전달

 
  • Firehose 스트림에서 Amazon S3 버킷 지정
    • Firehose 스트림을 만들 때, Firehose가 데이터를 저장할 Amazon S3 버킷을 지정해야 합니다.
    • 접두사(prefix)는 S3 버킷 내에서 데이터를 구분하는 데 사용되며, 이는 디렉토리처럼 작동해 관련 데이터를 그룹화합니다.
  • 동적 파티셔닝의 역할
    • 동적 파티셔닝을 사용하면 데이터를 자동으로 나눠서 저장할 수 있습니다. 파티셔닝 키를 설정하고, 해당 키에 따라 데이터를 각각 다른 S3 폴더에 저장합니다.
    • 예를 들어 고객 ID날짜와 같은 값을 기준으로 데이터를 나누어 저장할 수 있습니다.
  • 동적 파티셔닝을 활성화한 경우 접두사 지정이 필수
    • 동적 파티셔닝을 활성화하지 않으면 접두사를 지정하는 것은 선택 사항입니다. 그러나 동적 파티셔닝을 활성화한 경우, Firehose가 데이터를 저장할 때 사용할 S3 접두사를 반드시 지정해야 합니다. 이 접두사는 파티셔닝 키에 기반해 정의됩니다.
  • 표현식의 역할
    • S3 접두사는 파티셔닝 키에 기반한 표현식으로 구성됩니다. 이 표현식을 통해 S3에서 데이터가 저장될 경로가 자동으로 만들어집니다.
    • 예시에서 파티셔닝 키로 고객 ID, 디바이스, 연도, , , 시간을 지정한 경우, Firehose는 이러한 키 값을 사용해 S3에 저장될 폴더 경로를 자동으로 생성합니다.
  • S3 접두사 표현식 형식
    • 동적 파티셔닝을 사용하면, 파티셔닝 키에 따라 S3 접두사를 자동으로 설정하는 표현식을 사용해야 합니다.
    • !{namespace:value}라는 형식을 사용해 파티셔닝 키를 지정합니다. 여기서 namespace인라인 파싱을 통해 데이터를 구분하는지 또는 Lambda 함수를 사용해 데이터를 구분하는지에 따라 partitionKeyFromQuery 또는 partitionKeyFromLambda가 됩니다.
  • 예시
    • "ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }
    • 이 설정에 따라, Firehose는 customer_id, device, year, month, day, hour 값을 기반으로 데이터를 구분해 S3에 저장합니다.
    • 예를 들어, customer_id=1234567890, device=mobile, year=2019, month=08, day=09, hour=20이라는 값이 주어진다면, Firehose는 데이터를 다음과 같은 S3 경로에 저장합니다.
      • s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa
 
  • 새 줄 구분 기호
    •  
      이는 Amazon S3에서 객체를 구문 분석하는 데 도움이 될 수 있습니다. 또한 동적 분할이 집계된 데이터에 적용될 때 특히 유용한데, 다중 레코드 분해(동적으로 분할되기 전에 집계된 데이터에 적용해야 함)는 구문 분석 프로세스의 일부로 레코드에서 새 줄을 제거하기 때문입니다.
 

집계된 데이터(aggregated data)에 Dynamic Partitioning 적용

 
  • 집계된 데이터
    • 여러 이벤트, 로그 또는 레코드를 한 번의 API 호출로 집계(aggregation)하여 보낼 수 있습니다. 예를 들어, 여러 개의 로그 데이터를 하나의 요청에 묶어서 전송하는 방식입니다.
  • 동적 파티셔닝을 적용하기 위한 집계 해제
    • 동적 파티셔닝은 데이터를 특정 키에 따라 구분해 저장하는 기능입니다. 하지만 집계된 데이터에 동적 파티셔닝을 적용하려면, 먼저 데이터를 집계 해제(deaggregation)해야 합니다. 즉, 묶여있는 데이터를 각각의 개별 레코드로 풀어야만 파티셔닝 키를 적용할 수 있습니다.
  • 다중 레코드 분리 유형
    • 집계된 데이터를 해제할 때 두 가지 방식으로 데이터를 분리할 수 있습니다:
        1. JSON 기반 분리: 연속된 JSON 객체를 기반으로 데이터를 나누는 방식입니다. 예를 들어, 여러 JSON 레코드가 연속된 경우 이를 각각의 JSON 객체로 분리합니다.
        1. 구분 기호(Delimited) 기반 분리: 사용자가 지정한 구분 기호(예: ####)를 기준으로 데이터를 나누는 방식입니다. 이 구분 기호는 base-64로 인코딩된 문자열로 지정해야 합니다. 예를 들어, ####base-64로 인코딩하면 IyMjIw==로 변환됩니다.
  • 레코드 분리 제한
    • Firehose는 JSON 또는 구분 기호로 데이터를 분리할 때, 한 번의 API 호출로 최대 500개의 레코드를 분리할 수 있습니다. 즉, 집계 해제된 데이터는 최대 500개의 개별 레코드로 나뉩니다.
  • 올바른 JSON 형식 요구
    • JSON 데이터의 경우 유효한 형식이어야만 집계 해제를 할 수 있습니다. 예를 들어, 연속된 JSON 객체 또는 **줄바꿈으로 구분된 JSON 객체(JSONL)**는 올바른 형식입니다. 그러나 JSON 배열은 올바른 입력 형식이 아닙니다.
      • 올바른 예: {"a":1}{"a":2} 또는 {"a":1}\n{"a":2}
      • 잘못된 예: [{"a":1}, {"a":2}] (JSON 배열)
  • 집계 해제 후 파티셔닝
    • 데이터를 집계 해제한 후에야 동적 파티셔닝을 적용할 수 있습니다. 즉, 여러 개의 레코드를 개별적으로 풀어낸 다음, 파티셔닝 키에 따라 데이터를 나누어 저장할 수 있습니다.
  • 데이터 처리 순서
    • Firehose는 데이터를 처리할 때 다음과 같은 순서로 처리합니다:
        1. 분해(deaggregation): 집계된 데이터를 풀어서 개별 레코드로 분리합니다.
        1. Lambda를 통한 데이터 변환: Lambda 함수를 사용해 데이터를 변환할 수 있습니다. 예를 들어, 데이터를 다른 형식으로 변환하거나 추가적인 처리를 할 수 있습니다.
        1. 파티셔닝 키 적용: 변환된 데이터에 파티셔닝 키를 적용하여, S3 버킷에 데이터를 구분해 저장합니다.
 

Dynamic Partitioning을 위한 버퍼 데이터

 
Amazon Data Firehose는 수신 스트리밍 데이터를 특정 크기와 기간 동안 버퍼링한 다음 지정된 대상에 전달합니다. 버퍼 크기는 MB로 측정되고 버퍼 간격은 초로 측정됩니다.
 
💡
동적 분할에는 제로 버퍼링 기능을 사용할 수 없습니다.
 
동적 파티셔닝이 활성화되면 Firehose는 Amazon S3 버킷에 이러한 레코드를 전달하기 전에 구성된 버퍼링 힌트(크기 및 시간)에 따라 지정된 파티션에 속하는 레코드를 내부적으로 버퍼링합니다. 최대 크기의 객체를 전달하기 위해 Firehose는 내부적으로 다단계 버퍼링을 사용합니다. 따라서 레코드 배치의 엔드투엔드 지연은 구성된 버퍼링 힌트 시간의 1.5배가 될 수 있습니다.
 
  • 활성 파티션 수는 전달 버퍼 내의 활성 파티션의 총 수
    • 예를 들어, 동적 파티셔닝 쿼리가 초당 3개의 파티션을 구성하고 60초마다 전달을 트리거하는 버퍼 힌트 구성이 있는 경우 평균적으로 활성 파티션은 180개가 됩니다.
  • S3 접두사가 레코드 데이터 필드와 S3 접두사 표현식을 기반으로 새 값으로 평가되면 새 파티션이 생성됩니다. 각 활성 파티션에 대해 새 버퍼가 생성됩니다.
  • 버퍼가 버퍼 크기 제한이나 버퍼 시간 간격을 충족하면 Firehose는 버퍼 데이터로 객체를 생성하여 지정된 Amazon S3 Prefix로 전달합니다. 객체가 전달되면 해당 파티션의 버퍼와 파티션 자체가 삭제되고 활성 파티션 수에서 제거됩니다.
  • Firehose는 각 파티션에 대해 버퍼 크기 또는 간격이 개별적으로 충족되면 각 버퍼 데이터를 단일 객체로 전달합니다. 활성 파티션 수가 Firehose 스트림당 500개의 제한에 도달하면 Firehose 스트림의 나머지 레코드는 지정된 S3 오류 버킷 접두사(activePartitionExceeded)로 전달됩니다.
    • 이 값은 늘릴 수 있습니다.
 

레코드 형식 변환 활성화

 
레코드 형식 변환을 활성화하면 Amazon Data Firehose 대상을 Amazon OpenSearch Service, Amazon Redshift 또는 Splunk로 설정할 수 없습니다. 형식 변환을 활성화하면 Amazon S3가 Firehose 스트림에 사용할 수 있는 유일한 대상입니다. 또한 형식 변환을 활성화하면 Amazon S3 압축이 비활성화됩니다.
 

그 외

 

데이터 전달 이해

 
 

Apache Iceberg Tables

 
Apache Iceberg는 빅데이터 분석을 수행하기 위한 고성능 오픈소스 테이블 형식입니다. Apache Iceberg는 SQL 테이블의 안정성과 단순성을 Amazon S3 데이터 레이크에 제공하며, Spark, Flink, Trino, Hive, Impala와 같은 오픈소스 분석 엔진이 동일한 데이터로 동시에 작업할 수 있도록 합니다
 

태그

 
 

보안

 
 

모니터링

 
 

 
등이 있습니다.
Share article
RSSPowered by inblog