BLOG

[사용 사례] 데이터 분석 플랫폼 구축을 위해 ‘FactSet’사가 택한 방법… Amazon DynamoDB에서 Amazon S3 Parquet로 데이터 추출 자동화하기
작성일: 2020-02-25

본 글의 원문FactSet사 의 수석 소프트웨어 엔지니어인 Arvind Godbole AWS Principal Solutions Architect의 Tarik Makota에 의해 작성되었습니다. 

FactSet은 전 세계 수만 명의 투자 전문가를 위해 유연하고 개방적인 데이터 및 소프트웨어 솔루션을 개발하여, 투자자가 중요한 결정을 내리는 데 사용하는 재무 데이터 및 분석에 즉시 액세스 할 수 있게 해주는 기업으로, 항상 제품이 제공하는 가치를 향상시키기 위해 노력하고 있습니다.

 

‘FactSet’사는 AWS 고객에 대한 검색 결과의 관련성을 살펴 보고자 했습니다. 고객의 사용 사례가 다양하고 하루 동안의 검색 횟수가 많다는 점을 감안하여, 익명화된 사용 데이터를 저장하고 그 데이터를 분석하여 사용자 지정 점수 알고리즘을 통해 결과를 높일 수 있는 플랫폼이 필요했습니다. Amazon EMR은 계산을 호스팅 하는데 있어선 확실한 선택이었지만, 익명화된 데이터를 사용할 수 있는 형식으로 만드는 방법에 대해서는 의문이 제기되었습니다. 이러한 이유로 FactSet사는 AWS와 협력하여 Amazon DynamoDB를 통해 Amazon EMR에서 사용할 데이터를 준비하기로 했습니다.

 

오늘 메가존 테크블로그에서는 FactSet가 어떻게 DynamoDB 테이블에서 데이터를 가져와 해당 데이터를 Apache Parquet로 변환했는지에 대해 알려드리도록 하겠습니다. Amazon EMR을 사용하여 거의 실시간으로 분석할 수 있도록 Parquet 파일을 Amazon S3에 저장하는 과정과, 그 과정에서 데이터 유형 변환과 관련된 문제가 발생했었는데 이를 어떻게 극복했는지를 자세히 살펴보겠습니다.

 

워크 플로우 개요

오늘 다룬 워크 플로우에는 다음의 단계들이 포함되어 있습니다.

  1. 익명 로그 데이터는 DynamoDB 테이블에 저장됩니다. 이러한 항목은 로그 생성 방법에 따라 다른 필드를 갖습니다. 테이블에서 항목을 생성할 때마다 DynamoDB 스트림을 사용하여 레코드를 작성합니다. 스트림 레코드에는 DynamoDB 테이블의 단일 항목 정보가 포함됩니다.
  2. AWS 람다 함수는 DynamoDB의 테이블에 저장된 새로운 아이템을 확보하는 DynamoDB 스트림에 걸려있습니다. 우리는 GitHub 의 lambda-streams-to-firehose 프로젝트에서 Lambda 함수를 구축하여 DynamoDB 스트림 이미지를 JSON으로 변환했습니다. 이를 Stringize 하고 Amazon Kinesis Data Firehose로 푸시합니다.
  3. Kinesis Data Firehose는 AWS Glue Data Catalog 테이블에 포함된 데이터를 사용하여 JSON 데이터를 Parquet로 변환합니다.
  4. Kinesis Data Firehose는 Par3 파일을 S3에 저장합니다.
  5. AWS Glue 크롤러는 DynamoDB 항목의 스키마를 감지하고 관련 메타 데이터를 데이터 카탈로그에 저장합니다.

 

다음은 방금 설명 드린 워크플로우를 다이어그램으로 나타낸 것입니다.

 

AWS Glue는 데이터 준비 및 분석에 도움이 되는 도구를 제공합니다. 크롤러는 DynamoDB 테이블에서 실행하여 테이블 데이터의 인벤토리를 가져 와서 해당 정보를 데이터 카탈로그에 저장할 수 있습니다. 다른 서비스는 데이터 카탈로그를 테이블 데이터의 위치, 스키마 및 유형에 대한 인덱스로 사용할 수 있습니다. 메타 데이터를 데이터 카탈로그에 추가하는 다른 방법이 있지만 핵심 아이디어는 메타 데이터를 쉽게 업데이트하고 수정할 수 있다는 것입니다. 자세한 내용은 AWS Glue 데이터 카탈로그 채우기를 참고해 주십시오.

 

문제: 데이터 유형 불일치

다양한 기술을 사용하여 솔루션을 구축하려면 종종 이러한 기술 간에 데이터 유형을 매핑하고 변환해야 합니다. 클라우드도 예외는 아닙니다. 이 경우 DynamoDB에 저장된 로그 항목에는 String Set 유형의 속성이 포함되었습니다. Kinesis가 데이터를 Parquet으로 변환하려고 할 때 문자열 설정 값으로 인해 데이터 변환 예외가 발생했습니다. 문제를 조사한 결과, 다음을 발견했습니다.

  • 크롤러가 DynamoDB 테이블을 인덱싱할 때 Set 데이터 타입 ( StringSet, NumberSet)이 Glue 메타 데이터 카탈로그에 set<string>및  set<bigint>로 저장됩니다.
  • Kinesis Data Firehose는 Apache Parquet로 변환을 수행할 때 동일한 카탈로그를 사용합니다. 변환에는 유효한 Hive 데이터 유형이 필요합니다.
  • set<string>및 set<bigint>가 유효하지 않은 데이터 타입이므로 변환에 실패하게 되고, 변수가 발생하게 됩니다. 변수는 다음 코드와 유사합니다.

 

[{

   “lastErrorCode”: “DataFormatConversion.InvalidSchema”,

   “lastErrorMessage”: “The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of ‘array,used:bigint>>’ but ‘set’ is found.”

}]

 

솔루션: 데이터 매핑 구성

AWS 팀과 협력하면서 Kinesis Data Firehose 변환기가 데이터 카탈로그에서 유효한 Hive 데이터 유형이 필요하다는 것을 확인했습니다. 복잡한 데이터 유형과 관련하여 Hive는 set<data_type>을 지원하지 않지만 다음을 지원합니다.

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], …>
  • UNIONTYPE<data_type, data_type, …>

 

이 경우, 반드시 set<string>와 set<bigint>를  array<string>와 array<bigint>로 변환해야 함을 뜻합니다. 첫 번째 단계는 데이터 카탈로그에서 직접 유형을 직접 변경하는 것이었습니다. 모든 항목을 set<data_type>로 변경하도록 데이터 카탈로그를 업데이트 한 후 array<data_type>Kinesis 변환을 Parquet로 성공적으로 완료했습니다.

 

AWS의 비즈니스 사례는 동일한 테이블에 다른 속성을 가진 항목을 저장하고 새로운 속성을 즉각 추가 할 수 있는 데이터 저장소를 요구합니다. 우리는 스키마가 없는 DynamoDB의 특성과 필요에 따라 확장 및 축소할 수 있는 기능을 활용하여 기본 인프라 관리가 아닌 기능에 집중할 수 있었습니다. 자세한 내용은 DynamoDB 테이블의 정규화 또는 비정규 화를 참고해 주십시오.

 

데이터에 정적 스키마가 있는 경우 수동 변경으로 충분합니다. 비즈니스 사례를 감안할 때 수동 솔루션은 확장되지 않았습니다. DynamoDB 테이블에 새로운 속성을 도입할 때마다 크롤러를 실행해야 했습니다. 크롤러는 메타 데이터를 다시 작성하고 변경 사항을 덮어 썼습니다.

 

서버리스 이벤트 아키텍처

데이터 카탈로그에 대한 데이터 유형 업데이트를 자동화하기 위해 Amazon EventBridge 및 Lambda를 사용하여 데이터 유형 매핑에 대한 수정 사항을 구현했습니다. EventBridge는 이벤트를 사용하여 애플리케이션을 연결하는 서버리스 이벤트 버스입니다. 이벤트는 데이터 카탈로그 테이블의 상태와 같이 시스템 상태가 변경되었다는 신호입니다.

 

다음 다이어그램은 새로운 아키텍처의 이전 워크 플로우를 보여줍니다.

 

  1. 롤러는 있는 그대로 유지하고 DynamoDB 테이블을 크롤링하여 메타 데이터를 얻습니다.
  2. 크롤러가 얻은 메타 데이터는 데이터 카탈로그에 저장됩니다. 이전 메타 데이터가 업데이트 또는 제거되고 변경 사항(수동 또는 자동)이 덮어쓰기 됩니다.
  3. EventBridge의 GlueTableChanged 이벤트는 데이터 카탈로그 테이블의 변경 사항을 청취합니다. 테이블이 변경되었다는 이벤트가 수신되면 Lambda함수가 트리거됩니다.
  4. set<data_type>발생을  array<data_type>로 대체하기 위해 람다 함수는 글루 카탈로그 테이블을 업데이트할 때 glue.update_table() API를 사용하는 AWS SDK를 사용합니다.

 

EventBridge를 설정하기 위해 이벤트 패턴을 “서비스에 따라 사전 정의된 패턴”으로 설정했습니다. 서비스 제공 업체에 관해서는, AWS와 Glue를 서비스로 선택했습니다. 이벤트 타입으로 우리는“Glue Data Catalog Table State Change”를 선택했습니다. 다음 스크린 샷은 데이터 카탈로그를 업데이트하는 Lambda 함수로 이벤트를 보내는 EventBridge 구성을 보여줍니다.

 

다음은 기본 Lambda 코드입니다.

 

# This is NOT production worthy code please modify and implement error handling routines as appropriate

import json

import logging

import boto3

 

glue = boto3.client(‘glue’)

 

logger = logging.getLogger()

logger.setLevel(logging.INFO)

 

# Define subsegments manually

def table_contains_set(databaseName, tableName):

   

    # returns Glue Catalog description for Table structure

    response = glue.get_table( DatabaseName=databaseName,Name=tableName)

    logger.info(response) 

   

    # loop thru all the Columns of the table

    isModified = False

    for i in response[‘Table’][‘StorageDescriptor’][‘Columns’]:

        logger.info(## Column: ” + str(i[‘Name’]))

        # if Column datatype starts with set< then change it to array<

        if i[‘Type’].find(“set<“) != 1:

            i[‘Type’] = i[‘Type’].replace(“set<“, “array<“)

            isModified = True

            logger.info(i[‘Type’])

   

    if isModified:

        # following 3 statements simply clean up the response JSON so that update_table API call works

        del response[‘Table’][‘DatabaseName’]

        del response[‘Table’][‘CreateTime’]

        del response[‘Table’][‘UpdateTime’]

        glue.update_table(DatabaseName=databaseName,TableInput=response[‘Table’],SkipArchive=True)

       

    logger.info(============ ### =============”)

    logger.info(response)

   

    return True

   

def lambda_handler(event, context):

    logger.info(## EVENT’)

    # logger.info(event)

    # This is Sample of the event payload that would be received

    # { ‘version’: ‘0’,

    #   ‘id’: ‘2b402842-21f5-1d76-1a9a-c90076d1d7da’,

    #   ‘detail-type’: ‘Glue Data Catalog Table State Change’,

    #   ‘source’: ‘aws.glue’,

    #   ‘account’: ‘1111111111’,

    #   ‘time’: ‘2019-08-18T02:53:41Z’,

    #   ‘region’: ‘us-east-1’,

    #   ‘resources’: [‘arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample’],

    #   ‘detail’: {

    #           ‘databaseName’: ‘ddb-glue-fh’,

    #           ‘changedPartitions’: [],

    #           ‘typeOfChange’: ‘UpdateTable’,

    #           ‘tableName’: ‘ddb_glu_fh_sample’

    #    }

    # }

   

    # get the database and table name of the Glue table triggered the event

    databaseName = event[‘detail’][‘databaseName’]

    tableName = event[‘detail’][‘tableName’]

    logger.info(“DB: “ + databaseName + ” | Table: “ + tableName)

   

    table_contains_set(databaseName, tableName)

  

    # TODO implement and modify

    return {

        ‘statusCode’: 200,

        ‘body’: json.dumps(‘Hello from Lambda!’)

    }

Lambda 함수는 간단합니다. 이 게시물은 기본 프레임을 제공합니다. 이를 템플릿으로 사용하여 특정 데이터에 대한 고유한 기능을 구현할 수 있습니다.

 

결론

데이터 유형 변환 및 매핑과 같은 단순한 것들은 데이터가 서비스 경계를 ​​넘어설 때 예기치 않은 결과와 문제를 야기할 수 있습니다. AWS의 장점 중 하나는 필요에 맞게 강력하고 확장 가능한 솔루션을 만들 수 있는 다양한 도구입니다. 오늘 보여드린 바와 같이 이벤트 중심 아키텍처를 사용하여 데이터 유형 변환 오류를 해결하고 진행 과정에서 문제를 제거하기 위해 프로세스를 자동화하실 수 있습니다.

 

 


원문 URL: https://aws.amazon.com/ko/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

 

** 메가존 클라우드 TechBlog는 AWS BLOG 영문 게재 글 중에서 한국 사용자들에게 유용한 정보 및 콘텐츠를 우선적으로 번역하여 내부 엔지니어 검수를 받아서, 정기적으로 개제하고 있습니다. 추가로 번역 및 게재를 희망하는 글에 대해서 관리자에게 메일 또는 SNS 페이지에 댓글을 남겨주시면, 우선적으로 번역해서 전달드리도록 하겠습니다.