From 84f0c6cb98d9d3060fb6ef79f49352a6d0e546a2 Mon Sep 17 00:00:00 2001 From: anujgaud <146576725+anujgaud@users.noreply.github.com> Date: Wed, 15 Oct 2025 15:24:30 +0530 Subject: [PATCH 1/3] Add DeltaLake --- .../java/com/baeldung/delta/DeltaLake.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 apache-spark-2/src/main/java/com/baeldung/delta/DeltaLake.java diff --git a/apache-spark-2/src/main/java/com/baeldung/delta/DeltaLake.java b/apache-spark-2/src/main/java/com/baeldung/delta/DeltaLake.java new file mode 100644 index 000000000000..cf1a3fed7b33 --- /dev/null +++ b/apache-spark-2/src/main/java/com/baeldung/delta/DeltaLake.java @@ -0,0 +1,60 @@ +package com.baeldung.delta; + +import org.apache.spark.sql.*; +import java.io.Serializable; +import java.nio.file.Files; + +public class DeltaLake { + public static SparkSession createSession() { + return SparkSession.builder() + .appName("DeltaLake") + .master("local[*]") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .getOrCreate(); + } + + public static String preparePeopleTable(SparkSession spark) { + try { + String tablePath = Files.createTempDirectory("delta-table-").toAbsolutePath().toString(); + + Dataset data = spark.createDataFrame( + java.util.Arrays.asList( + new Person(1, "Alice"), + new Person(2, "Bob") + ), + Person.class + ); + + data.write().format("delta").mode("overwrite").save(tablePath); + spark.sql("DROP TABLE IF EXISTS people"); + spark.sql("CREATE TABLE IF NOT EXISTS people USING DELTA LOCATION '" + tablePath + "'"); + return tablePath; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void cleanupPeopleTable(SparkSession spark) { + spark.sql("DROP TABLE IF EXISTS people"); + } + + public static void stopSession(SparkSession spark) { + if (spark != null) { + spark.stop(); + } + } + + public static class Person implements Serializable { + private int id; + private String name; + + public Person() {} + public Person(int id, String name) { this.id = id; this.name = name; } + + public int getId() { return id; } + public void setId(int id) { this.id = id; } + public String getName() { return name; } + public void setName(String name) { this.name = name; } + } +} From 5a717b031e753f9ea0b352538894a072dc666faf Mon Sep 17 00:00:00 2001 From: anujgaud <146576725+anujgaud@users.noreply.github.com> Date: Wed, 15 Oct 2025 15:28:07 +0530 Subject: [PATCH 2/3] Add DeltaLakeUnitTest.java --- .../com/baeldung/delta/DeltaLakeUnitTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 apache-spark-2/src/test/java/com/baeldung/delta/DeltaLakeUnitTest.java diff --git a/apache-spark-2/src/test/java/com/baeldung/delta/DeltaLakeUnitTest.java b/apache-spark-2/src/test/java/com/baeldung/delta/DeltaLakeUnitTest.java new file mode 100644 index 000000000000..68ee84bc6e3a --- /dev/null +++ b/apache-spark-2/src/test/java/com/baeldung/delta/DeltaLakeUnitTest.java @@ -0,0 +1,43 @@ +package com.baeldung.delta; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DeltaLakeUnitTest { + + private static SparkSession spark; + private static String tablePath; + + @BeforeAll + static void setUp() { + spark = DeltaLake.createSession(); + tablePath = DeltaLake.preparePeopleTable(spark); + } + + @AfterAll + static void tearDown() { + try { + DeltaLake.cleanupPeopleTable(spark); + } finally { + DeltaLake.stopSession(spark); + } + } + + @Test + void givenDeltaLake_whenUsingDeltaFormat_thenPrintAndValidate() { + Dataset df = spark.sql("DESCRIBE DETAIL people"); + df.show(false); + + Row row = df.first(); + assertEquals("file:"+tablePath, row.getAs("location")); + assertEquals("delta", row.getAs("format")); + assertTrue(row.getAs("numFiles") >= 1); + } +} From c38318659f8f5272a567580d411be81d275c3c30 Mon Sep 17 00:00:00 2001 From: anujgaud <146576725+anujgaud@users.noreply.github.com> Date: Wed, 15 Oct 2025 15:29:36 +0530 Subject: [PATCH 3/3] Create pom.xml --- apache-spark-2/pom.xml | 70 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 apache-spark-2/pom.xml diff --git a/apache-spark-2/pom.xml b/apache-spark-2/pom.xml new file mode 100644 index 000000000000..74e26c68d539 --- /dev/null +++ b/apache-spark-2/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + apache-spark-2 + 1.0-SNAPSHOT + jar + apache-spark + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + io.delta + delta-core_2.12 + ${delta-core.version} + + + org.apache.spark + spark-core_2.12 + ${org.apache.spark.spark-core.version} + + + org.apache.spark + spark-sql_2.12 + ${org.apache.spark.spark-sql.version} + + + + + + + maven-assembly-plugin + 3.3.0 + + + package + + single + + + + + + jar-with-dependencies + + + + + + + + + SparkPackagesRepo + https://repos.spark-packages.org + + + + + 2.4.0 + 3.4.0 + 3.4.0 + 3.3.0 + +