BLOG

AWS Glue와 Amazon Athena을 사용한 Salesforce.com 데이터 추출 및 분석 방법
작성일: 2019-08-16

Salesforce는 인기 있는 CRM 플랫폼으로 현재도 많은 사람들에게 널리 사용되고 있습니다. 이 플랫폼을 통해 기업은 연락처 정보, 계정, 리드 및 영업 기회와 같은 잠재 고객 및 기존 고객 정보를 한 곳에서 저장하고 관리 할 수 ​​있습니다. 또한 Salesforce에 저장된 잠재 고객 정보를 데이터 레이크의 다른 정형 및 비정형 데이터와 결합하여 유용한 정보를 많이 얻을 수 있습니다.

 

오늘자 블로그에선 AWS Glue를 사용해 Salesforce.com 계정 객체에서 데이터를 추출하고 Amazon S3에 저장하는 방법을 알려드리겠습니다. 그런 다음 Amazon Athena를 통해 Salesforce.com의 계정 객체 데이터를 별도의 주문 관리 시스템의 주문 데이터와 결합하여 보고서를 생성해보겠습니다.

 

데이터 준비

일단 저는 Salesforce.com의 무료 계정을 하나 만들었습니다. 이 계정으로도 많은 Salesforce.com 개체가 채워진 몇 개의 샘플 레코드가 제공됩니다. 여러분은 본인이 속한 조직의 개발 계정으로 AWS Glue 코드에서 SOQL 쿼리를 수정하여 여러 객체에서 동시에 데이터를 가져올 수 있습니다. 이러한 객체에서 데이터 추출을 시연하려면 Account 객체 만 사용해 쿼리를 단순하게 유지하십시오.

 

주문 관리 시스템에서 주문을 보여주는 샘플 데이터 파일을 생성하면, Amazon Athena를 사용해 다른 시스템의 데이터와 Salesforce.com 데이터를 결합하는 것을 시연해볼 수 있습니다.

 

AWS Glue 작업 설정

오픈 소스 springml 라이브러리를 사용하여 Apache Spark와 Salesforce.com을 연결하십시오. 라이브러리에는 Apache Spark 프레임 워크를 사용하여 Salesforce.com 객체를 읽고 쓰고 업데이트 할 수 있는 편리한 기능이 많이 있습니다.

 

springml GitHub 저장소에서 jar를 컴파일하거나 Maven 저장소에서 종속성을 가지고 다운로드 할 수 있습니다. 이 JAR 파일을 S3 버킷에 업로드하고 각각에 대한 전체 경로를 기록하십시오.

forcepartnerapi40.0.0.jar

forcewsc40.0.0.jar sales

forcewaveapi1.0.9.jar

sparksalesforce_2.111.1.1.jar

 

AWS 관리 콘솔에서 서비스를 실행하려는 지역의 AWS Glue를 선택합니다. 다음으로 Jobs, Add Job 선택하고, 필요한 세부 사항을 작성하여 마법사를 따르십시오.

 

Security configuration, script libraries, and job parameters (optional) 섹션 아래의 Dependent jars path값으론 이전에 쉼표로 구분하여 나열된 4개의 JAR 파일 경로를 입력합니다.

 

이 작업에서는 Maximum capacity를 2”로 할당했습니다. 이 필드는 해당 작업이 실행될 때 시스템이 할당 할 수 있는 AWS Glue 데이터 처리 장치(DPUs)의 수를 정의합니다. DPU는 컴퓨팅 용량의 vCPU 4개와 16GB 메모리로 구성되는 상대적 처리 능력 측정값입니다. Apache Spark ETL 작업을 지정하면 2–100 DPU를 할당할 수 있습니다. 기본값은 10 DPU입니다.

 

Salesforce.com 객체에서 데이터 추출을 위한 AWS Glue 작업 실행

다음의 스칼라 코드는 Salesforce.com의 Account 객체에서 몇 개의 필드를 추출하여 Apache Parquet 파일 형식의 S3에 테이블로 씁니다.

 

import com.amazonaws.services.glue.util.GlueArgParser 

import com.amazonaws.services.glue.util.Job 

import com.amazonaws.services.glue.util.JsonOptions 

import com.amazonaws.services.glue.{DynamicFrame, GlueContext} 

import org.apache.spark.SparkContext 

import scala.collection.JavaConverters.mapAsJavaMapConverter 

 

object SfdcExtractData { 

  def main(sysArgs: Array[String]) { 

     

    val sparkContext: SparkContext = new SparkContext() 

    val glueContext: GlueContext = new GlueContext(sparkContext) 

    val sparkSession = glueContext.getSparkSession 

     

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq(“JOB_NAME”).toArray) 

    Job.init(args(“JOB_NAME”), glueContext, args.asJava) 

     

    val soql = “select name, accountnumber, industry, type, billingaddress, sic from account” 

    val df = sparkSession.read.format(“com.springml.spark.salesforce”).option(“soql”,soql).option(“username”, “username”).option(“password”,”password+securitytoken”).load()

    

    val datasource0 = DynamicFrame(df, glueContext).withName(“datasource0“).withTransformationContext(“datasource0″

       

    val datasink1 = glueContext.getSinkWithFormat(connectionType = “s3“, options = JsonOptions(Map(“path” -> “s3://replace-with-your-s3-bucket/sfdc-output”, “partitionKeys” -> Seq(“Industry”))), format = “parquet”, transformationContext = “datasink1”).writeDynamicFrame(datasource0) 

 

    Job.commit() 

  } 

}

 

이 코드는 몇 가지 주요 구성 요소에 의존합니다.

 

val df = sparkSession.read.format(“com.springml.spark.salesforce”).option(“soql”,soql).option(username”, “username”).option(“password”,“password+securitytoken”).load()

 

해당 코드 예제에서는 Salesforce.com 연결을 설정하고, Account 객체에 대한 SOQL 호환 쿼리를 제출하며, 반환된 레코드를 Spark DataFrame에 로드합니다. 사용자 비밀번호와 프로필의 보안 토큰을 조합하여 username을 Salesforce.com의 사용자 이름과 비밀번호로 바꾸는 것을 잊지 마십시오.

 

하드 코딩 대신 AWS Secrets Manager를 사용하여 비밀번호를 저장하고 검색하는 것이 가장 좋은 방법이지만, 본 예제에서는 간단히 보여드리기 위해 하드 코딩했습니다.

 

해당 쿼리는 단순하며 소수의 레코드만 반환합니다. 데이터양이 많을 경우 쿼리에서 반환되는 결과를 제한하거나 대량 쿼리 및 청크 같은 다른 기술을 사용할 수 있습니다. Salesforce.com이 지원하는 기능에 대한 자세한 내용은 springml 페이지를 확인하십시오.

 

val datasink1 = glueContext.getSinkWithFormat(connectionType = “s3”, options = JsonOptions(Map(“path” -> “s3://replace-with-your-s3-bucket/sfdc-output”, “partitionKeys” -> Seq(“Industry”))), format = “parquet”, transformationContext = “datasink1”).writeDynamicFrame(datasource0)

 

이 코드는 S3 버킷에 대한 쓰기 작업을 모두 수행합니다. 본 예시에서는 Industry세그먼트별로 데이터를 집계해 보겠습니다. 즉, Industry 필드 별로 데이터를 분할해야 합니다.

또한 코드는 Parquet 형식으로 작성됩니다. Athena는 쿼리 당 검색된 데이터 양에 따라 요금을 청구합니다. 데이터를 분할, 압축 또는 Parquet와 같은 열 형식으로 변환하면, 비용을 절감하고 성능을 향상시킬 수 있습니다.

AWS Glue에서 해당 코드를 실행한 후 싱크 지점이 있는 S3 버킷으로 이동하면 다음과 같은 구조를 찾을 수 있습니다.

 

 

Athena로 데이터 쿼리

코드가 올바른 파티션 및 형식으로 Salesforce.com 데이터를 S3 버킷에 떨어뜨리면 AWS Glue가 데이터 세트를 크롤링할 수 있고, 이는 AWS Glue 데이터 카탈로그에 적절한 스키마를 생성합니다. AWS Glue가 테이블을 생성할 때까지 기다리십시오. 테이블 생성이 완료되면, Athena는 테이블을 쿼리하고 카탈로그의 다른 테이블과 조인 할 수 있습니다.

먼저 AWS Glue 크롤러를 사용하여 이전에 S3 버킷에 저장한 Salesforce.com 계정 데이터를 검색합니다. 크롤러 사용 방법에 대한 자세한 내용은 AWS Glue 데이터 카탈로그 채우기를 참고해 주십시오.

해당 예시에서는 Salesforce.com Account 데이터를 저장한 S3 출력 접두사로 크롤러를 가리키고 실행합니다. 크롤러는 마지막으로 중지하기 전에 새 카탈로그 테이블을 작성합니다.

 

AWS Glue 데이터 카탈로그 테이블은 사용된 모든 열 이름, 유형 및 파티션 열을 자동으로 캡처하고 모든 것을 S3 버킷에 Parquet 파일 형식으로 저장합니다. 이제 Athena로 이 테이블을 쿼리 할 수 ​​있습니다. 해당 테이블의 단순 SELECT쿼리는 S3 버킷에서 데이터를 스캔한 결과를 보여줍니다.

 

이제 Salesforce.com 데이터를 Athena에서 쿼리할 준비가 되었습니다. 본 예시에서는 해당 데이터를 S3의 샘플 주문 관리 시스템의 샘플 주문과 결합했습니다. AWS Glue 크롤러가 샘플 주문 데이터 카탈로그를 완성하면 Athena가 쿼리할 수 ​​있게 됩니다.

 

마지막으로 Athena를 사용하여 집계 쿼리에서 두 테이블을 조인합니다.

 

결론

이 글에서는 AWS Glue 및 Apache Spark를 사용하여 Salesforce.com 객체 데이터를 추출하고 S3에 저장하는 간단한 예를 보여드렸습니다. 그 후엔 AWS Glue Data Catalog에서 Athena가 S3 데이터를 쿼리할 수 있습니다. 즉 이러한 매커니즘을 사용하게 되면 Salesforce 데이터를 AWS 기반 데이터 레이크에 쉽게 통합 할 수 있게 됩니다.

 

 

 

원문 URL: https://aws.amazon.com/ko/blogs/big-data/extracting-salesforce-com-data-using-aws-glue-and-analyzing-with-amazon-athena/

 

** 메가존 클라우드 TechBlog는 AWS BLOG 영문 게재 글 중에서 한국 사용자들에게 유용한 정보 및 콘텐츠를 우선적으로 번역하여 내부 엔지니어 검수를 받아서, 정기적으로 게재하고 있습니다. 추가로 번역 및 게재를 희망하는 글에 대해서 관리자에게 메일 또는 SNS 페이지에 댓글을 남겨주시면, 우선적으로 번역해서 전달해드리도록 하겠습니다.