Merhabalar bugünkü yazımda size pyspark ile kafkadan topicleri okuyup json formatına göre şemaya uygun düzenleyip delta lake tablosuna upsert işleminden bahsedeceğim.

Pyspark : Apache Spark Hadoop üzerinde uygulanan ve genellikle Java diline benzeyen Scala ile yazılır. Apache Spark’ın Python ile entegrasyonuna Pyspark deniyor. Yani Python geliştiricilerin Spark çerçevesiyle arabirim oluşturmasına, verilerin geniş ölçekte nasıl işleneceğini ve dağıtık (distributed) bir dosya sistemi üzerinden nesneler ve algoritmalarla nasıl çalışacağını öğrenmesine olanak tanır. Spark, Python veya Java gibi bir programlama dili değildir. Uygulama geliştiricileri ve veri bilimcileri, verileri uygun ölçekte hızla sorgulamak, analiz etmek ve dönüştürmek için genellikle Spark’ı uygulamalarına dahil eder

Apache Kafka : Bir çok kaynak sisteminiz ve bir çok hedef sisteminiz var. Bunlar birbirleriyle haberleşmek istediklerinde her bir sistemin birbiri ile verilerin nasıl aktarılacağından, verilerin formatı ve verilerin nasıl parse edecileği gibi bir sürü entegrasyonu gerekecektir.

Bununla beraber her yapılan entegrasyon sisteme yük getirecektir. Apache Kafka teknolojisi sayesinde sistemlerin birbirine bağımlılıklarını ortadan kaldırarak sistem üzerindeki yükleri de azaltacaktır. Apache Kafka dağıtık bir veri akış (streaming) platformudur.

Delta Lake : Apache Spark ve büyük veri iş yüklerinde ACID (Atomicity, tutarlılık, yalıtım ve dayanıklılık) işlemlerini getiren açık kaynaklı bir depolama katmanıdır. Delta lake aracığıyla Spark’a aldığımız veriler, seçilen bir bulut depolama sisteminde Parquet biçinde depolanır. Scala, Pyspark ve .Net dillerini destekler.

Şimdi kısaca bahsettiğimize göre bu 3 teknolojinin birleşimi ile neler yapabiliriz bakalım…

Kullandığımız Versiyonlar ;

  • Pyspark 3
  • Delta Lake 0.8.0
  • Python 3.6

[1]: # Gerekli kutuphaneleri import ediyoruz

import findspark
findspark.init("/anaconda/anaconda3/envs/test/spark-3.1.2-bin-hadoop3.2")

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
import json
import re
from pyspark.sql.functions import *
from pyspark.sql.types import *

from delta.tables import*

[2]: #Spark Session’ı ve delta lake için gerekli “io.delta.sql.DeltaSparkSessionExtension” ve “org.apache.spark.sql.delta.catalog.DeltaCatalog” configlerini oluşturuyoruz.

spark = SparkSession.builder.appName("testDelta")\
.config("spark.executor.memory", "15g") \
.config("spark.driver.memory","15g") \
.config("spark.executor.memoryOverhead", "20g") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()


[3]: #Batch data’mızı gormek ve delta tablosuna insert etmek için kafka topic’ini okuyan kodumuzu oluşturuyoruz.

df = spark.read.format("kafka") \
.option("kafka.bootstrap.servers", "xxxxx:9092") \
.option("subscribe", "delta_topic") \
.option("includeHeaders", "true") \
.load()

[4]: #Kafka topic’inden gelen yapımız bu şekilde bize value’lar lazım.

df.show(2)
+----+--------------------+-----------+---------+------+--------------------+-------------+-------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|headers|
+----+--------------------+-----------+---------+------+--------------------+-------------+-------+
|null|[7B 22 74 61 62 6...|delta_topic|        1|     0|2021-10-15 09:30:...|            0|   null|
|null|[7B 22 74 61 62 6...|delta_topic|        0|     0|2021-10-15 09:31:...|            0|   null|
+----+--------------------+-----------+---------+------+--------------------+-------------+-------+

[5]: #Datamız hangi json formatında geliyormuş goruntuleyelım burada ilk olarak value kolonu olarak tek satır gelmesini istedik ve json’ı parse edeceğimiz için tum satırın strıng’e cast ettik.

df = df.selectExpr("CAST(value as STRING) as value")
df.show(2,False)
+----------------------------------------------------------------------------------------------------------+
|value|
+----------------------------------------------------------------------------------------------------------+
|{"table":"deneme","op_type":"I","op_ts":"2021-09-24 08:46:33.015841","current_ts":"2021-09-24T11:46:36",
"pos":"00000000460446574558","before": null,"after":{"ID":"1","AD":"MERT", "SOYAD":"OGURCU", "YAS":"26"}}|
|{"table":"deneme","op_type":"I","op_ts":"2021-09-24 08:46:33.015841","current_ts":"2021-09-24T11:46:36",
"pos":"00000000460446574558","before": null,"after":{"ID":"2","AD":"TOLGA", "SOYAD":"TEZEL", "YAS":"25"}}|
+----------------------------------------------------------------------------------------------------------+

 

[6]: #Json Formatın gelen value kolonunu parse ettik ve deneme tablosunu kullanacağımız için “table” kolonunu filtreledik. op_type’ı insert işlemi olduğu için before kısmı boş oldu. Bizim kullanacağımız ise after kolonu.

json_df = spark.read.json(df.rdd.map(lambda row: row.value)) \
.where(col("table") == "deneme") 
json_df.show()
+--------------------+------+--------------------+--------------------+-------+--------------------+------+
|               after|before|          current_ts|               op_ts|op_type|                 pos| table|
+--------------------+------+--------------------+--------------------+-------+--------------------+------+
|{MERT, 1, OGURCU,...|  null|2021-09-24T11:46:...|2021-09-24 08:46:...|      I|00000000460446574558|deneme|
|{TOLGA, 2, TEZEL,...|  null|2021-09-24T11:46:...|2021-09-24 08:46:...|      I|00000000460446574558|deneme|
+--------------------+------+--------------------+--------------------+-------+--------------------+------+

[7]: #After kolonunun butun alanlarını gormek istedik ve bize gelen insert verisini goruntuledik ve ardından işlem yapmak için sadece after kolonunu aldık.

json_df.select("after.*").show(2,False)
json_data_filter_insert_after = json_df.select("after.*")
+-----+---+------+---+
|AD   |ID |SOYAD |YAS|
+-----+---+------+---+
|MERT |1  |OGURCU|26 |
|TOLGA|2  |TEZEL |25 |
+-----+---+------+---+

[8]: #Burada df’mize 2 alan daha ekledik changeKey alanı Değişecek veriyi concat’leyip sha2 formatında tutuyor. Yani veri geldiği zaman changeKey ile farklılığı karşılaştırılıp upsert işlemine karar veriliyor. inserted alanı ise timestamp en son hangi veri gelmiş onu kontrol etmek için. 

new_df = json_data_filter_insert_after \
.withColumn("changeKey",expr("SHA2(concat_ws(',',ID,AD,SOYAD,YAS),512)")) \
.withColumn("inserted",current_timestamp()) 

new_df.show()
+-----+---+------+---+--------------------+--------------------+
|   AD| ID| SOYAD|YAS|           changeKey|            inserted|
+-----+---+------+---+--------------------+--------------------+
| MERT|  1|OGURCU| 26|7a555f78ee887a50a...|2021-10-15 09:31:...|
|TOLGA|  2| TEZEL| 25|530756a65e70cc224...|2021-10-15 09:31:...|
+-----+---+------+---+--------------------+--------------------+

[9]: #Batch datamızı delta formatında dizine yazıyoruz.

new_df.write.format("delta").mode("overwrite").save("/tmp/mert/deltaStreamTable")

[10]: #Yazdığımızı dizini belirterek delta formatında tablo oluşturuyoruz.

spark.sql("create table deltaStreamTable using delta location '/tmp/mert/deltaStreamTable'")

[11]: #Delta tablomuz oluştu ;

DataFrame[]

[12]: #Delta tablomuzdaki datamız bu şekilde ;

spark.sql("select ID,AD,SOYAD,YAS,changeKey,inserted from deltaStreamTable").show(2)
+---+-----+------+---+--------------------+--------------------+
| ID|   AD| SOYAD|YAS|           changeKey|            inserted|
+---+-----+------+---+--------------------+--------------------+
|  2|TOLGA| TEZEL| 25|530756a65e70cc224...|2021-10-15 09:31:...|
|  1| MERT|OGURCU| 26|7a555f78ee887a50a...|2021-10-15 09:31:...|
+---+-----+------+---+--------------------+--------------------+

[13]: #Stream olarak işleyeceğimiz için gelen format’a gore Schema’mızı oluşturuyoruz.

SchemaDeneme = StructType([
            
            StructField("after",
                            StructType([
                                StructField("ID",StringType(),True),
                                StructField("AD",StringType(),True),
                                StructField("SOYAD", StringType(), True),
                                StructField("YAS", StringType(), True)            
                            ])
                        ),
            StructField("current_ts", StringType(), False),
            StructField("op_ts", StringType(), False),
            StructField("op_type", StringType(), False),
            StructField("pos", StringType(), False),
            StructField("table", StringType(), False)
    
        ])

[14]: #Kafka readStream okumak için aşağıdaki yapıyı kullanıyoruz.

dfStream = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "xxxxx:9092") \
.option("subscribe", "delta_topic") \
.option("includeHeaders", "true") \
.load() \
.selectExpr("CAST(value AS STRING) value")

[15]: #Stream olan veriyi parse from_json(col(“value”), SchemaDeneme vererek parse ediyoruz. ve bu kolona value adını veriyoruz.Where koşulu ile deneme tablosunu filtreliyoruz. 

dfStreamFilter = dfStream.select(from_json(col("value"), SchemaDeneme).alias("value")) \
.where(col("value.table") == "deneme")

[16]: #Yine batch datadaki gibi changeKey ve inserted alanlarını ekliyoruz.

dfStreamFilter = dfStreamFilter \
.select("value.after.ID","value.after.AD","value.after.SOYAD","value.after.YAS") \
.withColumn("inserted",current_timestamp()) \
.withColumn("changeKey",expr("SHA2(concat_ws(',',ID,AD,SOYAD,YAS),512)"))

[17]: #Gelen datayı karşılaştırıp değiştireceğimiz için deltalake tablosunu df’ye atıyoruz.

deltaDF = DeltaTable.forName(spark,f"deltaStreamTable")

[18]: #mergetoDF fonksiyonu gelen data ile mevcut batch datayı bize uniq olarak gelen ID’ye gore eşleyip, changeKey’i farklıysa değiştirme işlemi yapıyor. Eğer ID tamamen farklıysa insert işlemi yapıyor. 

def mergetoDF(microdf, batchId):
    microdf = microdf.dropDuplicates(["ID","changeKey"])
    (deltaDF.alias("t")
    .merge(
        microdf.alias("s"),
        "s.ID = t.ID")
    .whenMatchedUpdateAll("s.changeKey <> t.changeKey")
    .whenNotMatchedInsertAll()
    .execute()
    )

[19]: #writeStream ile gelen dataframe’i foreachBatch işlemi yaparak belirlediğimiz /tmp/mert/deltaStream path’ine yazıyoruz. İşlem yarım kalırsa diye hdfs üzerine yazacak şekilde “checkpointLocation” path’i ekliyoruz.

dfStreamFilter.writeStream \
.format("delta") \
.outputMode("append") \
.foreachBatch(mergetoDF) \
.option("checkpointLocation","/tmp/mert/deltaStreamTable/_checkpoint") \
.start("/tmp/mert/deltaStreamTable")

[20]: #Kafka topic’inde yeni veri insert işlemi yaptık ve ID’si 1 olan verimizi güncelledik. Aşağıda en son tablomuzu goruntuluyoruz. 

spark.sql("SELECT ID,AD,SOYAD,YAS,inserted,changeKey FROM default.deltaStreamTable").show()
+---+-----+------+---+--------------------+--------------------+
| ID|   AD| SOYAD|YAS|            inserted|           changeKey|
+---+-----+------+---+--------------------+--------------------+
|  2|TOLGA| TEZEL| 25|2021-10-15 09:31:...|530756a65e70cc224...|
|  1| MERT|OGURCU| 26|2021-10-15 09:31:...|7a555f78ee887a50a...|
+---+-----+------+---+--------------------+--------------------+

[21]: #ID 1′ yaşını 27 olarak güncelledik ve değişimi gördük

spark.sql("SELECT ID,AD,SOYAD,YAS,inserted,changeKey FROM default.deltaStreamTable").show()
+---+-----+------+---+--------------------+--------------------+
| ID|   AD| SOYAD|YAS|            inserted|           changeKey|
+---+-----+------+---+--------------------+--------------------+
|  2|TOLGA| TEZEL| 25|2021-10-15 09:31:...|530756a65e70cc224...|
|  1| MERT|OGURCU| 27|2021-10-15 09:35:...|e1d754dcb473ea82d...|
+---+-----+------+---+--------------------+--------------------+

[22]: #ID 3 ekledik

spark.sql("SELECT ID,AD,SOYAD,YAS,inserted,changeKey FROM default.deltaStreamTable").show()
+---+-----+------+---+--------------------+--------------------+
| ID|   AD| SOYAD|YAS|            inserted|           changeKey|
+---+-----+------+---+--------------------+--------------------+
|  2|TOLGA| TEZEL| 25|2021-10-15 09:31:...|530756a65e70cc224...|
|  1| MERT|OGURCU| 27|2021-10-15 09:35:...|e1d754dcb473ea82d...|
|  3|  ALI|  VELI| 20|2021-10-15 09:36:...|932046c15b7aa5063...|
+---+-----+------+---+--------------------+--------------------+

[EK]: #Yapılan Kafka işlemleri 

#Kafka topic'i produce etmek için : kafka-console-producer --broker-list xxxxx:9092 --topic deltalake_stream

#Eklediğim offsetler ;

{"table":"deneme","op_type":"I","op_ts":"2021-09-24 08:46:33.015841","current_ts":"2021-09-24T11:46:36.730006",
"pos":"00000000460446574558","before": null,"after":{"ID":"1","AD":"MERT", "SOYAD":"OGURCU", "YAS":"26"}}

{"table":"deneme","op_type":"I","op_ts":"2021-09-24 08:46:33.015841","current_ts":"2021-09-24T11:46:36.730006",
"pos":"00000000460446574558","before": null,"after":{"ID":"2","AD":"TOLGA", "SOYAD":"TEZEL", "YAS":"25"}}

{"table":"deneme","op_type":"U","op_ts":"2021-09-24 08:48:33.015841","current_ts":"2021-09-24T11:48:36.730006",
"pos":"00000000460446574559", "before":{"ID":"3","AD":"ALI","SOYAD":"1-66676470433", "YAS":"26"},
"after":{"ID":"1","AD":"MERT", "SOYAD":"OGURCU", "YAS":"27"}}

{"table":"deneme","op_type":"I","op_ts":"2021-09-24 08:46:33.015841","current_ts":"2021-09-24T11:46:36.730006",
"pos":"00000000460446574558","before": null,"after":{"ID":"3","AD":"ALI", "SOYAD":"VELI", "YAS":"20"}}