-
Spark + Iceberg - 1(소개 및 연동)Data Engineering 2024. 1. 16. 20:25반응형
소개
일반적으로 데이터 플랫폼에서는 HDFS, S3(데이터 저장) + Hive Metastore(메타데이터 저장) + Spark, Trino, Hive(쿼리 엔진) 조합하여 사용한다. 그러나 문제점이 있다. 여러 사용자가 HMS를 공유하기 때문에 OOM이 발생할 확률이 크다. 그래서 이것을 분리하기 위해서 Iceberg를 사용한다. 예를 들어서 사용자1이 TABLE_A에 접근하려고하고 사용자2가 TABLE_B에 접근하려고 하면 하나의 인스턴스를 공유하고 있는 예전과 달리 Iceberg는 각 Table마다 File로 저장하여 관리하기 때문에 서로 영향이 가지않는다. 또한 Hive에 대한 의존성이 없어진다. S3 + Spark만 있으면 실행이 가능하다는 장점이 있다. 결국 트랜잭션을 가진 Datalakehouse를 구성하는데 큰 기여를 한다.
Spark + Iceberg 연동
* Spark: 3.5.0
* Iceberg: 1.4.3
1. Jar Download
아래 링크에서 iceberg-spark-runtime-3.5_2.13-1.4.3.jar 다운로드. 이후 ${SPARK_HOME}/jars 로 이동.
만약 hadoop-aws 및 aws-java-sdk-bundle 없다면 같이 다운로드하자.
https://iceberg.apache.org/releases/
주의사항: 난 Scala 2.12 버전을 Spark Jars로 사용하고 있었다. 그래서 Scala "ClassNotFoundException"이 계속 발생하였다. 스칼라 버전도 맞춰서 다운로드하자. 결국 iceberg-spark-runtime-3.5_2.12-1.4.3.jar를 찾아서 다운로드했다.
2. ${SPARK_HOME}/conf/spark-defaults.conf 수정
아래와 같이 S3 설정 및 Iceberg 설정을 한다.
# S3 Configuration spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3a.access.key ${ACCESS_KEY} spark.hadoop.fs.s3a.secret.key ${SECRET_KEY} # Iceberg Configuration spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type=hive
3. Spark Thrift Server 시작
${SPARK_HOME}/sbin/start-thriftserver.sh
4. CREATE TABLE
하이브 메타스토어 및 S3 파일 시스템에 메타정보가 생겼다.
CREATE TABLE icebergtbl ( id bigint, name string ) USING iceberg LOCATION 's3a://martinispark/iceberg'
{ "format-version" : 2, "table-uuid" : "8913687e-597b-4312-b9a5-6f9e32395e65", "location" : "s3a://martinispark/iceberg", "last-sequence-number" : 0, "last-updated-ms" : 1705402838673, "last-column-id" : 2, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "name", "required" : false, "type" : "string" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ ] } ], "last-partition-id" : 999, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "anonymous", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] }
5. INSERT INTO Data
도중 에러가 발생했다. 난 JDK 17을 사용중이라 Scala-library 버전 업그레이드가 필요했다.
참고: https://github.com/scala/bug/issues/12419
LambdaDeserializer makes incorrect call to JDK's altMetafactory which raises an IllegalArgumentException on JDK 17 · Issue #124
✗ jabba install openjdk@17-ea+27=tgz+https://download.java.net/java/early_access/jdk17/27/GPL/openjdk-17-ea+27_macos-x64_bin.tar.gz ✗ jabba use openjdk@17-ea+27a ✗ SV=2.12.14; scalac --scala-versio...
github.com
INSERT INTO icebergtbl VALUES (1, 'martini')
데이터를 넣으면 S3에 Data파일이 생긴다.
파일 형식은 Parquet이다.
반응형'Data Engineering' 카테고리의 다른 글
Spark + Iceberg - 3(Hidden Partitioning) (0) 2024.01.17 Spark + Iceberg - 2(Partition, Schema evolution) (0) 2024.01.16 SparkSQL CLI 대신 Spark Thrift Server (0) 2024.01.15 Spark + S3 연동하기 (0) 2024.01.15 Hive Metastore & SparkSQL & Local FileSystem (0) 2024.01.14