Spark + Iceberg - 1(소개 및 연동)
소개
일반적으로 데이터 플랫폼에서는 HDFS, S3(데이터 저장) + Hive Metastore(메타데이터 저장) + Spark, Trino, Hive(쿼리 엔진) 조합하여 사용한다. 그러나 문제점이 있다. 여러 사용자가 HMS를 공유하기 때문에 OOM이 발생할 확률이 크다. 그래서 이것을 분리하기 위해서 Iceberg를 사용한다. 예를 들어서 사용자1이 TABLE_A에 접근하려고하고 사용자2가 TABLE_B에 접근하려고 하면 하나의 인스턴스를 공유하고 있는 예전과 달리 Iceberg는 각 Table마다 File로 저장하여 관리하기 때문에 서로 영향이 가지않는다. 또한 Hive에 대한 의존성이 없어진다. S3 + Spark만 있으면 실행이 가능하다는 장점이 있다. 결국 트랜잭션을 가진 Datalakehouse를 구성하는데 큰 기여를 한다.
Spark + Iceberg 연동
* Spark: 3.5.0
* Iceberg: 1.4.3
1. Jar Download
아래 링크에서 iceberg-spark-runtime-3.5_2.13-1.4.3.jar 다운로드. 이후 ${SPARK_HOME}/jars 로 이동.
만약 hadoop-aws 및 aws-java-sdk-bundle 없다면 같이 다운로드하자.
https://iceberg.apache.org/releases/
주의사항: 난 Scala 2.12 버전을 Spark Jars로 사용하고 있었다. 그래서 Scala "ClassNotFoundException"이 계속 발생하였다. 스칼라 버전도 맞춰서 다운로드하자. 결국 iceberg-spark-runtime-3.5_2.12-1.4.3.jar를 찾아서 다운로드했다.
2. ${SPARK_HOME}/conf/spark-defaults.conf 수정
아래와 같이 S3 설정 및 Iceberg 설정을 한다.
# S3 Configuration
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key ${ACCESS_KEY}
spark.hadoop.fs.s3a.secret.key ${SECRET_KEY}
# Iceberg Configuration
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
3. Spark Thrift Server 시작
${SPARK_HOME}/sbin/start-thriftserver.sh
4. CREATE TABLE
하이브 메타스토어 및 S3 파일 시스템에 메타정보가 생겼다.
CREATE TABLE icebergtbl (
id bigint,
name string
) USING iceberg LOCATION 's3a://martinispark/iceberg'
{
"format-version" : 2,
"table-uuid" : "8913687e-597b-4312-b9a5-6f9e32395e65",
"location" : "s3a://martinispark/iceberg",
"last-sequence-number" : 0,
"last-updated-ms" : 1705402838673,
"last-column-id" : 2,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "name",
"required" : false,
"type" : "string"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "anonymous",
"write.parquet.compression-codec" : "zstd"
},
"current-snapshot-id" : -1,
"refs" : { },
"snapshots" : [ ],
"statistics" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
}
5. INSERT INTO Data
도중 에러가 발생했다. 난 JDK 17을 사용중이라 Scala-library 버전 업그레이드가 필요했다.
참고: https://github.com/scala/bug/issues/12419
LambdaDeserializer makes incorrect call to JDK's altMetafactory which raises an IllegalArgumentException on JDK 17 · Issue #124
✗ jabba install openjdk@17-ea+27=tgz+https://download.java.net/java/early_access/jdk17/27/GPL/openjdk-17-ea+27_macos-x64_bin.tar.gz ✗ jabba use openjdk@17-ea+27a ✗ SV=2.12.14; scalac --scala-versio...
github.com
INSERT INTO icebergtbl VALUES (1, 'martini')
데이터를 넣으면 S3에 Data파일이 생긴다.
파일 형식은 Parquet이다.