Data Engineering

Spark + Iceberg - 1(소개 및 연동)

HOONY_612 2024. 1. 16. 20:25
반응형

 

소개

 

일반적으로 데이터 플랫폼에서는 HDFS, S3(데이터 저장) + Hive Metastore(메타데이터 저장) + Spark, Trino, Hive(쿼리 엔진) 조합하여 사용한다. 그러나 문제점이 있다. 여러 사용자가 HMS를 공유하기 때문에 OOM이 발생할 확률이 크다. 그래서 이것을 분리하기 위해서 Iceberg를 사용한다. 예를 들어서 사용자1이 TABLE_A에 접근하려고하고 사용자2가 TABLE_B에 접근하려고 하면 하나의 인스턴스를 공유하고 있는 예전과 달리 Iceberg는 각 Table마다 File로 저장하여 관리하기 때문에 서로 영향이 가지않는다. 또한 Hive에 대한 의존성이 없어진다. S3 + Spark만 있으면 실행이 가능하다는 장점이 있다. 결국 트랜잭션을 가진 Datalakehouse를 구성하는데 큰 기여를 한다.

 

 

 

Spark + Iceberg 연동

* Spark: 3.5.0

* Iceberg: 1.4.3

 

 

1. Jar Download

 

아래 링크에서 iceberg-spark-runtime-3.5_2.13-1.4.3.jar 다운로드. 이후 ${SPARK_HOME}/jars 로 이동.

만약 hadoop-aws 및 aws-java-sdk-bundle 없다면 같이 다운로드하자.

https://iceberg.apache.org/releases/

 

주의사항: 난 Scala 2.12 버전을 Spark Jars로 사용하고 있었다. 그래서 Scala "ClassNotFoundException"이 계속 발생하였다. 스칼라 버전도 맞춰서 다운로드하자. 결국 iceberg-spark-runtime-3.5_2.12-1.4.3.jar를 찾아서 다운로드했다.

 

 

2. ${SPARK_HOME}/conf/spark-defaults.conf 수정

 

아래와 같이 S3 설정 및 Iceberg 설정을 한다. 

# S3 Configuration
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key ${ACCESS_KEY}
spark.hadoop.fs.s3a.secret.key ${SECRET_KEY}

# Iceberg Configuration
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive

 

 

3. Spark Thrift Server 시작

 

${SPARK_HOME}/sbin/start-thriftserver.sh

 

 

4. CREATE TABLE

하이브 메타스토어 및 S3 파일 시스템에 메타정보가 생겼다.

CREATE TABLE icebergtbl (
    id bigint,
    name string
) USING iceberg LOCATION 's3a://martinispark/iceberg'

{
  "format-version" : 2,
  "table-uuid" : "8913687e-597b-4312-b9a5-6f9e32395e65",
  "location" : "s3a://martinispark/iceberg",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1705402838673,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "anonymous",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}

 

 

5. INSERT INTO Data

 

도중 에러가 발생했다. 난 JDK 17을 사용중이라 Scala-library 버전 업그레이드가 필요했다.

참고: https://github.com/scala/bug/issues/12419

 

LambdaDeserializer makes incorrect call to JDK's altMetafactory which raises an IllegalArgumentException on JDK 17 · Issue #124

✗ jabba install openjdk@17-ea+27=tgz+https://download.java.net/java/early_access/jdk17/27/GPL/openjdk-17-ea+27_macos-x64_bin.tar.gz ✗ jabba use openjdk@17-ea+27a ✗ SV=2.12.14; scalac --scala-versio...

github.com

INSERT INTO icebergtbl VALUES (1, 'martini')

 

데이터를 넣으면 S3에 Data파일이 생긴다.

 

파일 형식은 Parquet이다.

반응형