ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark + Iceberg - 1(소개 및 연동)
    Data Engineering 2024. 1. 16. 20:25
    반응형

     

    소개

     

    일반적으로 데이터 플랫폼에서는 HDFS, S3(데이터 저장) + Hive Metastore(메타데이터 저장) + Spark, Trino, Hive(쿼리 엔진) 조합하여 사용한다. 그러나 문제점이 있다. 여러 사용자가 HMS를 공유하기 때문에 OOM이 발생할 확률이 크다. 그래서 이것을 분리하기 위해서 Iceberg를 사용한다. 예를 들어서 사용자1이 TABLE_A에 접근하려고하고 사용자2가 TABLE_B에 접근하려고 하면 하나의 인스턴스를 공유하고 있는 예전과 달리 Iceberg는 각 Table마다 File로 저장하여 관리하기 때문에 서로 영향이 가지않는다. 또한 Hive에 대한 의존성이 없어진다. S3 + Spark만 있으면 실행이 가능하다는 장점이 있다. 결국 트랜잭션을 가진 Datalakehouse를 구성하는데 큰 기여를 한다.

     

     

     

    Spark + Iceberg 연동

    * Spark: 3.5.0

    * Iceberg: 1.4.3

     

     

    1. Jar Download

     

    아래 링크에서 iceberg-spark-runtime-3.5_2.13-1.4.3.jar 다운로드. 이후 ${SPARK_HOME}/jars 로 이동.

    만약 hadoop-aws 및 aws-java-sdk-bundle 없다면 같이 다운로드하자.

    https://iceberg.apache.org/releases/

     

    주의사항: 난 Scala 2.12 버전을 Spark Jars로 사용하고 있었다. 그래서 Scala "ClassNotFoundException"이 계속 발생하였다. 스칼라 버전도 맞춰서 다운로드하자. 결국 iceberg-spark-runtime-3.5_2.12-1.4.3.jar를 찾아서 다운로드했다.

     

     

    2. ${SPARK_HOME}/conf/spark-defaults.conf 수정

     

    아래와 같이 S3 설정 및 Iceberg 설정을 한다. 

    # S3 Configuration
    spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hadoop.fs.s3a.access.key ${ACCESS_KEY}
    spark.hadoop.fs.s3a.secret.key ${SECRET_KEY}
    
    # Iceberg Configuration
    spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
    spark.sql.catalog.spark_catalog.type=hive

     

     

    3. Spark Thrift Server 시작

     

    ${SPARK_HOME}/sbin/start-thriftserver.sh

     

     

    4. CREATE TABLE

    하이브 메타스토어 및 S3 파일 시스템에 메타정보가 생겼다.

    CREATE TABLE icebergtbl (
        id bigint,
        name string
    ) USING iceberg LOCATION 's3a://martinispark/iceberg'

    {
      "format-version" : 2,
      "table-uuid" : "8913687e-597b-4312-b9a5-6f9e32395e65",
      "location" : "s3a://martinispark/iceberg",
      "last-sequence-number" : 0,
      "last-updated-ms" : 1705402838673,
      "last-column-id" : 2,
      "current-schema-id" : 0,
      "schemas" : [ {
        "type" : "struct",
        "schema-id" : 0,
        "fields" : [ {
          "id" : 1,
          "name" : "id",
          "required" : false,
          "type" : "long"
        }, {
          "id" : 2,
          "name" : "name",
          "required" : false,
          "type" : "string"
        } ]
      } ],
      "default-spec-id" : 0,
      "partition-specs" : [ {
        "spec-id" : 0,
        "fields" : [ ]
      } ],
      "last-partition-id" : 999,
      "default-sort-order-id" : 0,
      "sort-orders" : [ {
        "order-id" : 0,
        "fields" : [ ]
      } ],
      "properties" : {
        "owner" : "anonymous",
        "write.parquet.compression-codec" : "zstd"
      },
      "current-snapshot-id" : -1,
      "refs" : { },
      "snapshots" : [ ],
      "statistics" : [ ],
      "snapshot-log" : [ ],
      "metadata-log" : [ ]
    }

     

     

    5. INSERT INTO Data

     

    도중 에러가 발생했다. 난 JDK 17을 사용중이라 Scala-library 버전 업그레이드가 필요했다.

    참고: https://github.com/scala/bug/issues/12419

     

    LambdaDeserializer makes incorrect call to JDK's altMetafactory which raises an IllegalArgumentException on JDK 17 · Issue #124

    ✗ jabba install openjdk@17-ea+27=tgz+https://download.java.net/java/early_access/jdk17/27/GPL/openjdk-17-ea+27_macos-x64_bin.tar.gz ✗ jabba use openjdk@17-ea+27a ✗ SV=2.12.14; scalac --scala-versio...

    github.com

    INSERT INTO icebergtbl VALUES (1, 'martini')

     

    데이터를 넣으면 S3에 Data파일이 생긴다.

     

    파일 형식은 Parquet이다.

    반응형

    댓글

Designed by Tistory.