{"id":6125,"date":"2021-11-18T11:53:50","date_gmt":"2021-11-18T11:53:50","guid":{"rendered":"https:\/\/bentego.com\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/"},"modified":"2025-10-20T16:09:28","modified_gmt":"2025-10-20T16:09:28","slug":"spark-streaming-ile-kafka-delta-lake-upsert-islemi","status":"publish","type":"post","link":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/","title":{"rendered":"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi"},"content":{"rendered":"<p align=\"left\">Merhabalar bug\u00fcnk\u00fc yaz\u0131mda size pyspark ile kafkadan topicleri okuyup json format\u0131na g\u00f6re \u015femaya uygun d\u00fczenleyip delta lake tablosuna upsert i\u015fleminden bahsedece\u011fim.<\/p>\n<p align=\"left\"><a name=\"tw-target-text\"><\/a><strong> Pyspark :<\/strong> Apache <span lang=\"en-US\">Spark Hadoop \u00fczerinde uygulanan ve genellikle Java diline benzeyen Scala ile yaz\u0131l\u0131r. Apache Spark\u2019\u0131n Python ile entegrasyonuna Pyspark deniyor. Yani Python geli\u015ftiricilerin Spark \u00e7er\u00e7evesiyle arabirim olu\u015fturmas\u0131na, verilerin geni\u015f \u00f6l\u00e7ekte nas\u0131l i\u015flenece\u011fini ve da\u011f\u0131t\u0131k (<\/span><span lang=\"en-US\">distributed<\/span><span lang=\"en-US\">) bir dosya sistemi \u00fczerinden nesneler ve algoritmalarla nas\u0131l \u00e7al\u0131\u015faca\u011f\u0131n\u0131 \u00f6\u011frenmesine olanak tan\u0131r. Spark, Python veya Java gibi bir programlama dili de\u011fildir. <\/span><span lang=\"en-US\">Uygulama geli\u015ftiricileri ve veri bilimcileri, verileri uygun \u00f6l\u00e7ekte h\u0131zla sorgulamak, analiz etmek ve d\u00f6n\u00fc\u015ft\u00fcrmek i\u00e7in genellikle Spark&#8217;\u0131 uygulamalar\u0131na dahil eder<\/span><\/p>\n<p align=\"left\"><strong>Apache Kafka :<\/strong> Bir \u00e7ok kaynak sisteminiz ve bir \u00e7ok hedef sisteminiz var. Bunlar birbirleriyle haberle\u015fmek istediklerinde her bir sistemin birbiri ile verilerin nas\u0131l aktar\u0131laca\u011f\u0131ndan, verilerin format\u0131 ve verilerin nas\u0131l parse edecile\u011fi gibi bir s\u00fcr\u00fc entegrasyonu gerekecektir.<\/p>\n<p align=\"left\">Bununla beraber her yap\u0131lan entegrasyon sisteme y\u00fck getirecektir. Apache Kafka teknolojisi sayesinde sistemlerin birbirine ba\u011f\u0131ml\u0131l\u0131klar\u0131n\u0131 ortadan kald\u0131rarak sistem \u00fczerindeki y\u00fckleri de azaltacakt\u0131r. Apache Kafka da\u011f\u0131t\u0131k bir veri ak\u0131\u015f (streaming) platformudur.<\/p>\n<p align=\"left\"><strong>Delta Lake :<\/strong> Apache Spark ve b\u00fcy\u00fck veri i\u015f y\u00fcklerinde ACID (Atomicity, tutarl\u0131l\u0131k, yal\u0131t\u0131m ve dayan\u0131kl\u0131l\u0131k) i\u015flemlerini getiren a\u00e7\u0131k kaynakl\u0131 bir depolama katman\u0131d\u0131r. Delta lake arac\u0131\u011f\u0131yla Spark\u2019a ald\u0131\u011f\u0131m\u0131z veriler, se\u00e7ilen bir bulut depolama sisteminde Parquet bi\u00e7inde depolan\u0131r. Scala, Pyspark ve .Net dillerini destekler.<\/p>\n<p align=\"left\">\u015eimdi k\u0131saca bahsetti\u011fimize g\u00f6re bu 3 teknolojinin birle\u015fimi ile neler yapabiliriz bakal\u0131m\u2026<\/p>\n<p align=\"left\">Kulland\u0131\u011f\u0131m\u0131z Versiyonlar ;<\/p>\n<ul>\n<li>Pyspark 3<\/li>\n<li>Delta Lake 0.8.0<\/li>\n<li>Python 3.6<\/li>\n<\/ul>\n<p>[1]: <strong># Gerekli kutuphaneleri import ediyoruz<\/strong><\/p>\n<blockquote>\n<pre>import findspark\nfindspark.init(\"\/anaconda\/anaconda3\/envs\/test\/spark-3.1.2-bin-hadoop3.2\")\n\nfrom pyspark.sql.functions import *\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import HiveContext\nimport json\nimport re\nfrom pyspark.sql.functions import *\nfrom pyspark.sql.types import *\n\nfrom delta.tables import*\n\n<\/pre>\n<\/blockquote>\n<p>[2]: <strong>#Spark Session&#8217;\u0131 ve delta lake i\u00e7in gerekli &#8220;io.delta.sql.DeltaSparkSessionExtension&#8221; ve &#8220;org.apache.spark.sql.delta.catalog.DeltaCatalog&#8221; configlerini olu\u015fturuyoruz.<\/strong><\/p>\n<blockquote>\n<pre>spark = SparkSession.builder.appName(\"testDelta\")\\\n.config(\"spark.executor.memory\", \"15g\") \\\n.config(\"spark.driver.memory\",\"15g\") \\\n.config(\"spark.executor.memoryOverhead\", \"20g\") \\\n.config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n.config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\") \\\n.getOrCreate()\n\n<\/pre>\n<\/blockquote>\n<pre><\/pre>\n<p>[3]: <strong>#Batch data&#8217;m\u0131z\u0131 gormek ve delta tablosuna insert etmek i\u00e7in kafka topic&#8217;ini okuyan kodumuzu olu\u015fturuyoruz. <\/strong><\/p>\n<blockquote>\n<pre>df = spark.read.format(\"kafka\") \\\n.option(\"kafka.bootstrap.servers\", \"xxxxx:9092\") \\\n.option(\"subscribe\", \"delta_topic\") \\\n.option(\"includeHeaders\", \"true\") \\\n.load()\n<\/pre>\n<\/blockquote>\n<p>[4]: <strong>#Kafka topic&#8217;inden gelen yap\u0131m\u0131z bu \u015fekilde bize value&#8217;lar laz\u0131m.<\/strong><\/p>\n<blockquote>\n<pre>df.show(2)\n+----+--------------------+-----------+---------+------+--------------------+-------------+-------+\n| key|               value|      topic|partition|offset|           timestamp|timestampType|headers|\n+----+--------------------+-----------+---------+------+--------------------+-------------+-------+\n|null|[7B 22 74 61 62 6...|delta_topic|        1|     0|2021-10-15 09:30:...|            0|   null|\n|null|[7B 22 74 61 62 6...|delta_topic|        0|     0|2021-10-15 09:31:...|            0|   null|\n+----+--------------------+-----------+---------+------+--------------------+-------------+-------+\n\n<\/pre>\n<\/blockquote>\n<p>[5]: <strong>#Datam\u0131z hangi json format\u0131nda geliyormu\u015f goruntuleyel\u0131m burada ilk olarak value kolonu olarak tek sat\u0131r gelmesini istedik ve json&#8217;\u0131 parse edece\u011fimiz i\u00e7in tum sat\u0131r\u0131n str\u0131ng&#8217;e cast ettik.<\/strong><\/p>\n<blockquote>\n<pre>df = df.selectExpr(\"CAST(value as STRING) as value\")\ndf.show(2,False)\n+----------------------------------------------------------------------------------------------------------+\n|value|\n+----------------------------------------------------------------------------------------------------------+\n|{\"table\":\"deneme\",\"op_type\":\"I\",\"op_ts\":\"2021-09-24 08:46:33.015841\",\"current_ts\":\"2021-09-24T11:46:36\",\n\"pos\":\"00000000460446574558\",\"before\": null,\"after\":{\"ID\":\"1\",\"AD\":\"MERT\", \"SOYAD\":\"OGURCU\", \"YAS\":\"26\"}}|\n|{\"table\":\"deneme\",\"op_type\":\"I\",\"op_ts\":\"2021-09-24 08:46:33.015841\",\"current_ts\":\"2021-09-24T11:46:36\",\n\"pos\":\"00000000460446574558\",\"before\": null,\"after\":{\"ID\":\"2\",\"AD\":\"TOLGA\", \"SOYAD\":\"TEZEL\", \"YAS\":\"25\"}}|\n+----------------------------------------------------------------------------------------------------------+\n<\/pre>\n<\/blockquote>\n<p>&nbsp;<\/p>\n<p>[6]: <strong>#Json Format\u0131n gelen value kolonunu parse ettik ve deneme tablosunu kullanaca\u011f\u0131m\u0131z i\u00e7in &#8220;table&#8221; kolonunu filtreledik. op_type&#8217;\u0131 insert i\u015flemi oldu\u011fu i\u00e7in before k\u0131sm\u0131 bo\u015f oldu. Bizim kullanaca\u011f\u0131m\u0131z ise after kolonu.<\/strong><\/p>\n<blockquote>\n<pre>json_df = spark.read.json(df.rdd.map(lambda row: row.value)) \\\n.where(col(\"table\") == \"deneme\") \njson_df.show()\n+--------------------+------+--------------------+--------------------+-------+--------------------+------+\n|               after|before|          current_ts|               op_ts|op_type|                 pos| table|\n+--------------------+------+--------------------+--------------------+-------+--------------------+------+\n|{MERT, 1, OGURCU,...|  null|2021-09-24T11:46:...|2021-09-24 08:46:...|      I|00000000460446574558|deneme|\n|{TOLGA, 2, TEZEL,...|  null|2021-09-24T11:46:...|2021-09-24 08:46:...|      I|00000000460446574558|deneme|\n+--------------------+------+--------------------+--------------------+-------+--------------------+------+\n<\/pre>\n<\/blockquote>\n<p>[7]: <strong>#After kolonunun butun alanlar\u0131n\u0131 gormek istedik ve bize gelen insert verisini goruntuledik ve ard\u0131ndan i\u015flem yapmak i\u00e7in sadece after kolonunu ald\u0131k.<\/strong><\/p>\n<blockquote>\n<pre>json_df.select(\"after.*\").show(2,False)\njson_data_filter_insert_after = json_df.select(\"after.*\")\n+-----+---+------+---+\n|AD   |ID |SOYAD |YAS|\n+-----+---+------+---+\n|MERT |1  |OGURCU|26 |\n|TOLGA|2  |TEZEL |25 |\n+-----+---+------+---+\n\n<\/pre>\n<\/blockquote>\n<p>[8]: <strong>#Burada df&#8217;mize 2 alan daha ekledik changeKey alan\u0131 De\u011fi\u015fecek veriyi concat&#8217;leyip sha2 format\u0131nda tutuyor. Yani veri geldi\u011fi zaman changeKey ile farkl\u0131l\u0131\u011f\u0131 kar\u015f\u0131la\u015ft\u0131r\u0131l\u0131p upsert i\u015flemine karar veriliyor. inserted alan\u0131 ise timestamp en son hangi veri gelmi\u015f onu kontrol etmek i\u00e7in.\u00a0<\/strong><\/p>\n<blockquote>\n<pre>new_df = json_data_filter_insert_after \\\n.withColumn(\"changeKey\",expr(\"SHA2(concat_ws(',',ID,AD,SOYAD,YAS),512)\")) \\\n.withColumn(\"inserted\",current_timestamp()) \n\nnew_df.show()\n+-----+---+------+---+--------------------+--------------------+\n|   AD| ID| SOYAD|YAS|           changeKey|            inserted|\n+-----+---+------+---+--------------------+--------------------+\n| MERT|  1|OGURCU| 26|7a555f78ee887a50a...|2021-10-15 09:31:...|\n|TOLGA|  2| TEZEL| 25|530756a65e70cc224...|2021-10-15 09:31:...|\n+-----+---+------+---+--------------------+--------------------+\n<\/pre>\n<\/blockquote>\n<p>[9]: <strong>#Batch datam\u0131z\u0131 delta format\u0131nda dizine yaz\u0131yoruz.<\/strong><\/p>\n<blockquote>\n<pre>new_df.write.format(\"delta\").mode(\"overwrite\").save(\"\/tmp\/mert\/deltaStreamTable\")\n<\/pre>\n<\/blockquote>\n<p>[10]: <strong>#Yazd\u0131\u011f\u0131m\u0131z\u0131 dizini belirterek delta format\u0131nda tablo olu\u015fturuyoruz.<\/strong><\/p>\n<blockquote>\n<pre>spark.sql(\"create table deltaStreamTable using delta location '\/tmp\/mert\/deltaStreamTable'\")\n<\/pre>\n<\/blockquote>\n<p>[11]: <strong>#Delta tablomuz olu\u015ftu ;<\/strong><\/p>\n<blockquote>\n<pre>DataFrame[]\n<\/pre>\n<\/blockquote>\n<p>[12]: <strong>#Delta tablomuzdaki datam\u0131z bu \u015fekilde ;<\/strong><\/p>\n<blockquote>\n<pre>spark.sql(\"select ID,AD,SOYAD,YAS,changeKey,inserted from deltaStreamTable\").show(2)\n+---+-----+------+---+--------------------+--------------------+\n| ID|   AD| SOYAD|YAS|           changeKey|            inserted|\n+---+-----+------+---+--------------------+--------------------+\n|  2|TOLGA| TEZEL| 25|530756a65e70cc224...|2021-10-15 09:31:...|\n|  1| MERT|OGURCU| 26|7a555f78ee887a50a...|2021-10-15 09:31:...|\n+---+-----+------+---+--------------------+--------------------+\n<\/pre>\n<\/blockquote>\n<p>[13]: <strong>#Stream olarak i\u015fleyece\u011fimiz i\u00e7in gelen format&#8217;a gore Schema&#8217;m\u0131z\u0131 olu\u015fturuyoruz.<\/strong><\/p>\n<blockquote>\n<pre>SchemaDeneme = StructType([\n            \n            StructField(\"after\",\n                            StructType([\n                                StructField(\"ID\",StringType(),True),\n                                StructField(\"AD\",StringType(),True),\n                                StructField(\"SOYAD\", StringType(), True),\n                                StructField(\"YAS\", StringType(), True)            \n                            ])\n                        ),\n            StructField(\"current_ts\", StringType(), False),\n            StructField(\"op_ts\", StringType(), False),\n            StructField(\"op_type\", StringType(), False),\n            StructField(\"pos\", StringType(), False),\n            StructField(\"table\", StringType(), False)\n    \n        ])\n<\/pre>\n<\/blockquote>\n<p>[14]: <strong>#Kafka readStream okumak i\u00e7in a\u015fa\u011f\u0131daki yap\u0131y\u0131 kullan\u0131yoruz.<\/strong><\/p>\n<blockquote>\n<pre>dfStream = spark.readStream.format(\"kafka\") \\\n.option(\"kafka.bootstrap.servers\", \"xxxxx:9092\") \\\n.option(\"subscribe\", \"delta_topic\") \\\n.option(\"includeHeaders\", \"true\") \\\n.load() \\\n.selectExpr(\"CAST(value AS STRING) value\")\n\n<\/pre>\n<\/blockquote>\n<p>[15]: <strong>#Stream olan veriyi parse from_json(col(&#8220;value&#8221;), SchemaDeneme vererek parse ediyoruz. ve bu kolona value ad\u0131n\u0131 veriyoruz.Where ko\u015fulu ile deneme tablosunu filtreliyoruz.\u00a0<\/strong><\/p>\n<blockquote>\n<pre>dfStreamFilter = dfStream.select(from_json(col(\"value\"), SchemaDeneme).alias(\"value\")) \\\n.where(col(\"value.table\") == \"deneme\")\n\n<\/pre>\n<\/blockquote>\n<p>[16]: <strong>#Yine batch datadaki gibi changeKey ve inserted alanlar\u0131n\u0131 ekliyoruz.<\/strong><\/p>\n<blockquote>\n<pre>dfStreamFilter = dfStreamFilter \\\n.select(\"value.after.ID\",\"value.after.AD\",\"value.after.SOYAD\",\"value.after.YAS\") \\\n.withColumn(\"inserted\",current_timestamp()) \\\n.withColumn(\"changeKey\",expr(\"SHA2(concat_ws(',',ID,AD,SOYAD,YAS),512)\"))\n<\/pre>\n<\/blockquote>\n<p>[17]: <strong>#Gelen datay\u0131 kar\u015f\u0131la\u015ft\u0131r\u0131p de\u011fi\u015ftirece\u011fimiz i\u00e7in deltalake tablosunu df&#8217;ye at\u0131yoruz.<\/strong><\/p>\n<blockquote>\n<pre>deltaDF = DeltaTable.forName(spark,f\"deltaStreamTable\")\n<\/pre>\n<\/blockquote>\n<p>[18]: <strong>#mergetoDF fonksiyonu gelen data ile mevcut batch datay\u0131 bize uniq olarak gelen ID&#8217;ye gore e\u015fleyip, changeKey&#8217;i farkl\u0131ysa de\u011fi\u015ftirme i\u015flemi yap\u0131yor. E\u011fer ID tamamen farkl\u0131ysa insert i\u015flemi yap\u0131yor.\u00a0<\/strong><\/p>\n<blockquote>\n<pre>def mergetoDF(microdf, batchId):\n    microdf = microdf.dropDuplicates([\"ID\",\"changeKey\"])\n    (deltaDF.alias(\"t\")\n    .merge(\n        microdf.alias(\"s\"),\n        \"s.ID = t.ID\")\n    .whenMatchedUpdateAll(\"s.changeKey &lt;&gt; t.changeKey\")\n    .whenNotMatchedInsertAll()\n    .execute()\n    )\n<\/pre>\n<\/blockquote>\n<p>[19]: <strong>#writeStream ile gelen dataframe&#8217;i foreachBatch i\u015flemi yaparak belirledi\u011fimiz \/tmp\/mert\/deltaStream path&#8217;ine yaz\u0131yoruz. \u0130\u015flem yar\u0131m kal\u0131rsa diye hdfs \u00fczerine yazacak \u015fekilde &#8220;<em>checkpointLocation<\/em>&#8221; path&#8217;i ekliyoruz.<\/strong><\/p>\n<blockquote>\n<pre>dfStreamFilter.writeStream \\\n.format(\"delta\") \\\n.outputMode(\"append\") \\\n.foreachBatch(mergetoDF) \\\n.option(\"checkpointLocation\",\"\/tmp\/mert\/deltaStreamTable\/_checkpoint\") \\\n.start(\"\/tmp\/mert\/deltaStreamTable\")\n\n<\/pre>\n<\/blockquote>\n<p>[20]: <strong>#Kafka topic&#8217;inde yeni veri insert i\u015flemi yapt\u0131k ve ID&#8217;si 1 olan verimizi g\u00fcncelledik. A\u015fa\u011f\u0131da en son tablomuzu goruntuluyoruz.\u00a0<\/strong><\/p>\n<blockquote>\n<pre>spark.sql(\"SELECT ID,AD,SOYAD,YAS,inserted,changeKey FROM default.deltaStreamTable\").show()\n+---+-----+------+---+--------------------+--------------------+\n| ID|   AD| SOYAD|YAS|            inserted|           changeKey|\n+---+-----+------+---+--------------------+--------------------+\n|  2|TOLGA| TEZEL| 25|2021-10-15 09:31:...|530756a65e70cc224...|\n|  1| MERT|OGURCU| 26|2021-10-15 09:31:...|7a555f78ee887a50a...|\n+---+-----+------+---+--------------------+--------------------+\n<\/pre>\n<\/blockquote>\n<p>[21]: <strong>#ID 1&#8242; ya\u015f\u0131n\u0131 27 olarak g\u00fcncelledik ve de\u011fi\u015fimi g\u00f6rd\u00fck<\/strong><\/p>\n<blockquote>\n<pre>spark.sql(\"SELECT ID,AD,SOYAD,YAS,inserted,changeKey FROM default.deltaStreamTable\").show()\n+---+-----+------+---+--------------------+--------------------+\n| ID|   AD| SOYAD|YAS|            inserted|           changeKey|\n+---+-----+------+---+--------------------+--------------------+\n|  2|TOLGA| TEZEL| 25|2021-10-15 09:31:...|530756a65e70cc224...|\n|  1| MERT|OGURCU| 27|2021-10-15 09:35:...|e1d754dcb473ea82d...|\n+---+-----+------+---+--------------------+--------------------+\n<\/pre>\n<\/blockquote>\n<p>[22]: <strong>#ID 3 ekledik<\/strong><\/p>\n<blockquote>\n<pre>spark.sql(\"SELECT ID,AD,SOYAD,YAS,inserted,changeKey FROM default.deltaStreamTable\").show()\n+---+-----+------+---+--------------------+--------------------+\n| ID|   AD| SOYAD|YAS|            inserted|           changeKey|\n+---+-----+------+---+--------------------+--------------------+\n|  2|TOLGA| TEZEL| 25|2021-10-15 09:31:...|530756a65e70cc224...|\n|  1| MERT|OGURCU| 27|2021-10-15 09:35:...|e1d754dcb473ea82d...|\n|  3|  ALI|  VELI| 20|2021-10-15 09:36:...|932046c15b7aa5063...|\n+---+-----+------+---+--------------------+--------------------+\n<\/pre>\n<\/blockquote>\n<p>[EK]: <strong>#Yap\u0131lan Kafka i\u015flemleri\u00a0<\/strong><\/p>\n<pre>#Kafka topic'i produce etmek i\u00e7in : kafka-console-producer --broker-list xxxxx:9092 --topic deltalake_stream\n\n#Ekledi\u011fim offsetler ;\n\n{\"table\":\"deneme\",\"op_type\":\"I\",\"op_ts\":\"2021-09-24 08:46:33.015841\",\"current_ts\":\"2021-09-24T11:46:36.730006\",\n\"pos\":\"00000000460446574558\",\"before\": null,\"after\":{\"ID\":\"1\",\"AD\":\"MERT\", \"SOYAD\":\"OGURCU\", \"YAS\":\"26\"}}\n\n{\"table\":\"deneme\",\"op_type\":\"I\",\"op_ts\":\"2021-09-24 08:46:33.015841\",\"current_ts\":\"2021-09-24T11:46:36.730006\",\n\"pos\":\"00000000460446574558\",\"before\": null,\"after\":{\"ID\":\"2\",\"AD\":\"TOLGA\", \"SOYAD\":\"TEZEL\", \"YAS\":\"25\"}}\n\n{\"table\":\"deneme\",\"op_type\":\"U\",\"op_ts\":\"2021-09-24 08:48:33.015841\",\"current_ts\":\"2021-09-24T11:48:36.730006\",\n\"pos\":\"00000000460446574559\", \"before\":{\"ID\":\"3\",\"AD\":\"ALI\",\"SOYAD\":\"1-66676470433\", \"YAS\":\"26\"},\n\"after\":{\"ID\":\"1\",\"AD\":\"MERT\", \"SOYAD\":\"OGURCU\", \"YAS\":\"27\"}}\n\n{\"table\":\"deneme\",\"op_type\":\"I\",\"op_ts\":\"2021-09-24 08:46:33.015841\",\"current_ts\":\"2021-09-24T11:46:36.730006\",\n\"pos\":\"00000000460446574558\",\"before\": null,\"after\":{\"ID\":\"3\",\"AD\":\"ALI\", \"SOYAD\":\"VELI\", \"YAS\":\"20\"}}<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Merhabalar bug\u00fcnk\u00fc yaz\u0131mda size pyspark ile kafkadan topicleri okuyup json format\u0131na g\u00f6re \u015femaya uygun d\u00fczenleyip delta lake tablosuna upsert i\u015fleminden bahsedece\u011fim. Pyspark [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":5923,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"inline_featured_image":false,"footnotes":""},"categories":[87],"tags":[142,115,116],"class_list":["post-6125","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-blog","tag-deltalake-tr","tag-kafka-tr","tag-spark-tr"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v25.4 - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi - Bentego<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\" \/>\n<meta property=\"og:locale\" content=\"tr_TR\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi - Bentego\" \/>\n<meta property=\"og:description\" content=\"Merhabalar bug\u00fcnk\u00fc yaz\u0131mda size pyspark ile kafkadan topicleri okuyup json format\u0131na g\u00f6re \u015femaya uygun d\u00fczenleyip delta lake tablosuna upsert i\u015fleminden bahsedece\u011fim. Pyspark [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\" \/>\n<meta property=\"og:site_name\" content=\"Bentego\" \/>\n<meta property=\"article:published_time\" content=\"2021-11-18T11:53:50+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2025-10-20T16:09:28+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png\" \/>\n\t<meta property=\"og:image:width\" content=\"2400\" \/>\n\t<meta property=\"og:image:height\" content=\"1600\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"Bentego\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:label1\" content=\"Yazan:\" \/>\n\t<meta name=\"twitter:data1\" content=\"Bentego\" \/>\n\t<meta name=\"twitter:label2\" content=\"Tahmini okuma s\u00fcresi\" \/>\n\t<meta name=\"twitter:data2\" content=\"5 dakika\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\"},\"author\":{\"name\":\"Bentego\",\"@id\":\"https:\/\/bentego.com\/tr\/#\/schema\/person\/0348418b7b0cbca83fdd7a899d54821e\"},\"headline\":\"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi\",\"datePublished\":\"2021-11-18T11:53:50+00:00\",\"dateModified\":\"2025-10-20T16:09:28+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\"},\"wordCount\":754,\"publisher\":{\"@id\":\"https:\/\/bentego.com\/tr\/#organization\"},\"image\":{\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png\",\"keywords\":[\"deltalake\",\"kafka\",\"spark\"],\"articleSection\":[\"Blog\"],\"inLanguage\":\"tr\"},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\",\"url\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\",\"name\":\"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi - Bentego\",\"isPartOf\":{\"@id\":\"https:\/\/bentego.com\/tr\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage\"},\"image\":{\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png\",\"datePublished\":\"2021-11-18T11:53:50+00:00\",\"dateModified\":\"2025-10-20T16:09:28+00:00\",\"breadcrumb\":{\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#breadcrumb\"},\"inLanguage\":\"tr\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"tr\",\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage\",\"url\":\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png\",\"contentUrl\":\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png\",\"width\":2400,\"height\":1600},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\/\/bentego.com\/tr\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/bentego.com\/tr\/#website\",\"url\":\"https:\/\/bentego.com\/tr\/\",\"name\":\"Bentego\",\"description\":\"Turning data into enterprise value\",\"publisher\":{\"@id\":\"https:\/\/bentego.com\/tr\/#organization\"},\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/bentego.com\/tr\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"tr\"},{\"@type\":\"Organization\",\"@id\":\"https:\/\/bentego.com\/tr\/#organization\",\"name\":\"Bentego\",\"url\":\"https:\/\/bentego.com\/tr\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"tr\",\"@id\":\"https:\/\/bentego.com\/tr\/#\/schema\/logo\/image\/\",\"url\":\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/05\/logo-bentego.svg\",\"contentUrl\":\"https:\/\/bentego.com\/wp-content\/uploads\/2025\/05\/logo-bentego.svg\",\"width\":433,\"height\":109,\"caption\":\"Bentego\"},\"image\":{\"@id\":\"https:\/\/bentego.com\/tr\/#\/schema\/logo\/image\/\"}},{\"@type\":\"Person\",\"@id\":\"https:\/\/bentego.com\/tr\/#\/schema\/person\/0348418b7b0cbca83fdd7a899d54821e\",\"name\":\"Bentego\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi - Bentego","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/","og_locale":"tr_TR","og_type":"article","og_title":"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi - Bentego","og_description":"Merhabalar bug\u00fcnk\u00fc yaz\u0131mda size pyspark ile kafkadan topicleri okuyup json format\u0131na g\u00f6re \u015femaya uygun d\u00fczenleyip delta lake tablosuna upsert i\u015fleminden bahsedece\u011fim. Pyspark [&hellip;]","og_url":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/","og_site_name":"Bentego","article_published_time":"2021-11-18T11:53:50+00:00","article_modified_time":"2025-10-20T16:09:28+00:00","og_image":[{"width":2400,"height":1600,"url":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png","type":"image\/png"}],"author":"Bentego","twitter_card":"summary_large_image","twitter_misc":{"Yazan:":"Bentego","Tahmini okuma s\u00fcresi":"5 dakika"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#article","isPartOf":{"@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/"},"author":{"name":"Bentego","@id":"https:\/\/bentego.com\/tr\/#\/schema\/person\/0348418b7b0cbca83fdd7a899d54821e"},"headline":"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi","datePublished":"2021-11-18T11:53:50+00:00","dateModified":"2025-10-20T16:09:28+00:00","mainEntityOfPage":{"@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/"},"wordCount":754,"publisher":{"@id":"https:\/\/bentego.com\/tr\/#organization"},"image":{"@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage"},"thumbnailUrl":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png","keywords":["deltalake","kafka","spark"],"articleSection":["Blog"],"inLanguage":"tr"},{"@type":"WebPage","@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/","url":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/","name":"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi - Bentego","isPartOf":{"@id":"https:\/\/bentego.com\/tr\/#website"},"primaryImageOfPage":{"@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage"},"image":{"@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage"},"thumbnailUrl":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png","datePublished":"2021-11-18T11:53:50+00:00","dateModified":"2025-10-20T16:09:28+00:00","breadcrumb":{"@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#breadcrumb"},"inLanguage":"tr","potentialAction":[{"@type":"ReadAction","target":["https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/"]}]},{"@type":"ImageObject","inLanguage":"tr","@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#primaryimage","url":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png","contentUrl":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/06\/Frame-83__.png","width":2400,"height":1600},{"@type":"BreadcrumbList","@id":"https:\/\/bentego.com\/tr\/spark-streaming-ile-kafka-delta-lake-upsert-islemi\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/bentego.com\/tr\/"},{"@type":"ListItem","position":2,"name":"Spark Streaming ile Kafka + Delta Lake Upsert \u0130\u015flemi"}]},{"@type":"WebSite","@id":"https:\/\/bentego.com\/tr\/#website","url":"https:\/\/bentego.com\/tr\/","name":"Bentego","description":"Turning data into enterprise value","publisher":{"@id":"https:\/\/bentego.com\/tr\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/bentego.com\/tr\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"tr"},{"@type":"Organization","@id":"https:\/\/bentego.com\/tr\/#organization","name":"Bentego","url":"https:\/\/bentego.com\/tr\/","logo":{"@type":"ImageObject","inLanguage":"tr","@id":"https:\/\/bentego.com\/tr\/#\/schema\/logo\/image\/","url":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/05\/logo-bentego.svg","contentUrl":"https:\/\/bentego.com\/wp-content\/uploads\/2025\/05\/logo-bentego.svg","width":433,"height":109,"caption":"Bentego"},"image":{"@id":"https:\/\/bentego.com\/tr\/#\/schema\/logo\/image\/"}},{"@type":"Person","@id":"https:\/\/bentego.com\/tr\/#\/schema\/person\/0348418b7b0cbca83fdd7a899d54821e","name":"Bentego"}]}},"_links":{"self":[{"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/posts\/6125","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/comments?post=6125"}],"version-history":[{"count":1,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/posts\/6125\/revisions"}],"predecessor-version":[{"id":6249,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/posts\/6125\/revisions\/6249"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/media\/5923"}],"wp:attachment":[{"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/media?parent=6125"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/categories?post=6125"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/bentego.com\/tr\/wp-json\/wp\/v2\/tags?post=6125"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}