Selamlar herkese bu yazımda ELK stack yapısında bulunan Logstash’e giriş yapıp, teori ve uygulamalar ile uçtan uca Logstash pipelinelarını göstereceğim.

1. Logstash

Logstash, çok sayıda kaynaktan veri alabilen, dönüştüren ve daha sonra başka kaynaklara gönderebilen açık kaynaklı bir veri işleme hattıdır.

Şekil 1: Logstash[1]

1.1 Logstash Nasıl Çalışır?

Logstash pipeline’ı temel olarak 3 aşamadan oluşur. İnput, Filter ve Output.
İnputlar : Eventlerin gelmesi, oluşturulmasıdır.
Filterlar : Gelen eventlerin üzerinde aksiyonlar, transformasyonlar gerçekleştirirler.
Outputlar : Eventleri başka bir yere aktarırlar.

Şekil 2 : Logstash pipeline[2]

Örnek bir Logstash config dosyasının yapısı aşağıda gösterilmiştir. İlerleyen kısımlarda daha detaylı olarak değinilecektir.

input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

Logstash’in iki tür config dosyası vardır: Birincisi Standart conf dosyası, pipelinenin kendisini yani input, filter ve output’un tanımlandığı kısım. İkinicisi ise Logstash pipelinenin başlatılmasını ve çalışma şeklini etkileyen seçenekleri belirten yml conf dosyaları. Bu yazıda Logstash’in büyük bir kısmını oluşturan standart config dosyası üzerinden uygulamalar gösterilecektir.

1.2 Pluginler

Input Plugin : Datanın Logstash’e alındığı kısım. En çok kullanılan Logstash input pluginleri şunlardır.

File : Filesystem üzerindeki dosya/lardan logları okur.
Redis : Redis objesinden logları okur.
Stdin: Standart inputtan (command line) okur.
S3: Amazon S3’den logları okur.
Rabbitmq, Mongodb, Kafka gibi input pluginleri de mevcuttur.

Filter Plugin : Input kısmından gelen logların üzerinde işlemlerin(Dönüştürme, parse etme, silme vb) yapıldığı kısım. En çok kullanılan filter pluginleri şunlardır.

Grok: Gelen metni ayrıştırır ve yapılandırır. Grok, Logstash’de yapılandırılmamış logları yapılandırılmış ve sorgulanabilir bir şeye ayrıştırmanın en iyi yoludur. Logstash’de built in olarak bulunan yaklaşık 120 pattern vardır. İp arama, Sayı arama Hostname arama vs.
Aşağıda örnek 4 pattern gösterilmiştir. 
UNIXPATH (/([\w_%!$@:.,+~-]+|\.)*)+
CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})

Mutate: Fieldlerde genel dönüşümler yapabilir. Eventlerdeki fieldleri yeniden adlandırabilir, kaldırabilir, tiplerini değiştirebilirsiniz.

Outputs : Logstash pipelinenin son aşamasıdır. Bir event birden fazla outputtan geçebilir. Yaygın olarak kullanılan bazı outputlar şunlardır:
Elasticsearch : loglar/eventler Elasticsearch’e gönderir. Verilerinizi sorgulanabilir bir biçimde kaydetmeyi sağlar.
File : Dosya sisteminizdeki bir dosyaya yazar.
Stdout : Standart output’a yazar.
Kafka : Bir Kafka topic üzerine yazar.

Diğer pluginler
Azure event hubs : Azure’den eventler alır.
Cloudwatch : AWS Cloudwatch Api’dan eventleri alır.
Github : Github’dan eventler alır.
Graphite: Graphiteden metricleri alır.
http : http ve https üzerinden eventler alır.
Jdbc : Jdbc üzerinden eventleri alır.
Rabbitmq : Rabbitmq üzerinden eventleri alır.
Tcp-udp : tcp ve udp üzerinden eventler okunur veya yazılır.
Twitter : Twitter streaming api üzerinden eventler alınır.
Email : eventler email server üzerinden gönderilir.
İnfluxdb : eventler influxdb’ye yazılır.
Mongodb : eventler mongodb’ye yazılır.
Solr_http : loglar solr’da indexlenir ve saklanır.
Webhdfs : loglar hdfs’e yazılır.

1.3 Logstash Terimleri

Codec : Codec eklentisi bir verinin görünümünü değiştirir. Codec bileşenleri esasen bir inputun veya outputun parçası olarak çalışabilen stream filtreleridir.

Delimiter : Dosyadan okuma yapılırken delimiterin ne olacağını belirtir. Default delimiter new line ( \n ) karakteridir.

Path : Input veya Output Dosya yolunu belirttiğimiz kısım.

Start_position : Okuma esnasında okuma işlemine nereden başlanacağını bildiren keyword. Beginning ve end olmak üzere iki adet değer alır.

Tags : eventlere tag ekleyebilirsiniz. Bu tagleri daha sonra arama veya dönüşüm yapmak için kullanabilirsiniz

Örnek tags =>[“dilisim”]

filter{
if "dilisim" in tags[]{
	drop();
} }

Metadata : Eventlere metadata ekleyebilirsiniz. Bunlar eventlerin içinde doğrudan bulunmaz ancak yönlendirme yaparken kullanılabilir. Örnek: 3 inputunuz var (Kafka, file, tcp) ve 4 outputunuz var. Hangisi nereye gidecek bunu belirtmek için metadatalardan yardım alınabilir.

1.4 Logstash Şartlı İfadeler

Bazen bir eventi belirli şartlar altında filtrelemek veya outputa aktarmak istersiniz. Bunun için bir conditional ifadeleri kullanabilirsiniz. Logstash’in şartlı ifadeleri programlama dillerinde olduğu gibi çalışır. Temel syntaxı şu şekildedir.

Şekil 3 : Logstash Şartlı İfadeler[3]

Operatörler olarak programlama dillerinden de bildiğimiz ==, != ,< ,> , <= , >=, in, not in , and , or , nand , xor , ve ! kullanılıyor.

1.5 Logstash Kurulumu

İlk olarak https://www.elastic.co/downloads/logstash adresine girip istediğimiz Logstash versiyonunu ve işletim sistemimiz seçip indiriyoruz. Bu yazıda Logstash’in 6.8.0 sürümü kullanılmıştır. Dosyaları indirdikten sonra ilk olarak cmd üzerinden Logstash’ın olduğu klasöre gidip command line üzerinden “bin/logstash” komutu ile Logstash’i çalıştırıyoruz.

1.6 Logstash ile uygulamalar

1.6.1 Standart input – Standart output

Logstash ile yapacağımız ilk uygulamada command line üzerinden girdiğimiz eventi yine command line ile okuyacağız.

İlk olarak cmd ile Logstash’in olduğu directorye gidiyoruz ve istediğimiz yere logstash.conf isimli dosyayı oluşturuyoruz. Daha sonra conf dosyamızı açıp aşağıdaki satırları ekliyoruz.

input {
	stdin {

	}
}
#filter kısmını eklememize şu an için gerek yok. Filter kısmı olmasa bile logstash çalışabilmektedir. Aldığı veriyi olduğu gibi aktarır.

output {
	stdout {

	}
}

Daha sonra dosyamızı kaydedip çıkıyoruz ve 

bin/logstash -f config/pipelines/pipeline.conf

komutu ile ilk Logstash uygulamamızı çalıştırıyoruz. Logstash’in başlaması 45-50 saniye kadar sürebilmektedir. “Successfully started” yazısını gördükten sonra “Selamlar dilisim” yazıp gönderiyoruz. Logstash ise cevap olarak bize gönderdiğimiz mesajın yanı sıra 3 adet bilgi daha dönüyor. Bunlar Logstash’in çalıştığı host adı, bu işlemin yapıldığı tarih ve versiyon numarasıdır.

Şekil 4 : İlk uygulamamızın console görüntüsü

Peki plain texti düzgün bir şekilde iletti ya inputumuz json formatında olsaydı o zaman ne olacaktı ? Name fieldinin değeri Tolga olacak şekilde bir input gönderelim.

Şekil 5 : Logstash’in plain text ayarlarıyla Json gösterimi

Biz name’i bir field olarak istemiştik ancak bu direkt olarak mesajın içine kattı. Peki ne yapabiliriz ? Yukarıda bahsettiğim codec pluginini kullanabiliriz. Şimdi hem json formatında input alan hem de aldığı inputu hem command line üzerinden bize döndüren hem de bir text dosyasına kaydeden bir uygulama yapalım. Config dosyamızın içine girip aşağıdaki şekilde değişiklik yapalım.

input {
	stdin {
		codec => json
	}
}

output {
	stdout {

        }

	file {
		path => "tempoutput.txt"
	}
}

Codec kullanarak artık json tipindeki loglarımızı/eventlerimizi Logstash’e tanıtabilecek ve file plugin kullanarak bunları bir dosyaya kaydedebileceğiz. Şimdi tekrar Logstash pipelinemızı ayağa kaldıralım ve az önceki inputu tekrar girelim.

Şekil 6 : Logstash Json gösterimi

Gördüğünüz gibi bu sefer name adında bir field ve Tolga adında bu fielde ait bir value elde ettik. Son satırdaki infoya dikkat edelim. Burada bize belirtilen pathte bir dosya açıldığını ve dosyaya yazma işleminin yapıldığından bahsediyor. 

Şimdi bu sefer için içine başka uygulamalar katalım. Postman üzerinden json tipinde eventler yollayıp bunları yine cmdden izleyelim.

Bunun için yukarıdaki config dosyamızın input kısmına http pluginini ekliyoruz ve daha sonra uygulamamızı tekrar başlatıp Postman clientimizi açıyoruz.

http {
	 host => "127.0.0.1"
	 port => 8080
 }
#host ve port bilgimizi yukarıdaki şekilde ayarlıyoruz. Port istediğiniz bir değer olabilir ( başka bir uygulama tarafından kullanılmaması gerekir ).

Postman clientimizi açıktan sonra host ve port bilgilerini girip sağ taraftan jsonu seçiyoruz ve göndermek istediğimiz eventleri yazıyoruz. Name ve Peoplecount fieldlerinden oluşan bir json gönderdim.

Şekil 7 : Postmanın iç görüntüsü

Daha sonra console üzerinden kontrol ettiğimzde bu sefer yukarıdaki outputlardan çok daha fazla sayıda field olduğunu gözlemleyebiliriz. Bunun sebebi ise üçüncü parti bir uygulama/agent kullanmış olmamızdan dolayıdır.

Şekil 8 : Postman agent bilgileri

İlk iki satırda gönderdiğimiz değerleri görebiliyoruz. Geri kalan header kısmı ise bizim hangi agenti kullandığımız ve agent üzerinde hangi ayarları yaptığımız hakkında bize bilgi veriyor. Bu aslında çok yararlı bir bilgidir. Siz kendi serverinize Postman ya da X uygulaması üzerinden request atılmasını istemiyorsanız bunu filter kısmında if sorgusu ile kontrol ederek istemediğiniz yerlerden gelen requestleri engelleyebilirsiniz.

1.6.2 Logstash – TCP uygulaması

Bu uygulamada bir port üzerinden gelen veriyi localde bir dosyaya kaydedeceğiz. Bu sefer input, filter ve outputu parçalayarak göstereceğim. 

input {
		tcp {
			port => 5400
			codec => json
			}
}

5400 portuna gelen logları tcp üzerinden ile Logstash’e aktaracağız. Bunun için tcp pluginini kullanıyoruz. Json tipinde veriler aktaracağımız için codec belirttik ve son olarak port olarak 5400 seçtim. Siz başka bir port ile deneyebilirsiniz.

filter {
	mutate {
		convert => {
			"age" => "integer"
			"_id" => "integer"
		}
		remove_field => ["host","@version","@timestamp","port"]
		}
}

Filter kısmında ise bir takım işlemler yaptık. İlk olarak age ve _id fieldleri eğer string tipinde ( “x” ) geliyorsa bunları integer’a çevirdik. İlk uygulamamızda görmüştük Logstash bazı fieldler ekliyordu. Bunları bu sefer dışarda bırakmak istedim bunun için mutate plugininin remove_field özelliğinden faydalandım. Silmek istediğim alanları içine string olarak ekledim.

output {
	stdout {
			
	}
}

Daha sonra tekrar commandline üzerinden görmek istediğim için output kısmını bu şekilde düzelttim.

Peki 5400 portuna nasıl veri göndereceğiz? Ben bunun için netcat’i kullandım. Netcat, ağı okuyan ve TCP veya UDP iletişim kurallarını kullanarak ağ bağlantılarını yazan bir hizmetdir. 

İlk olarak bir json dosyası oluşturup aşağıdaki satırları dosyanın içine ekleyip kaydettim.

{"_id":"1","name":"Ali","age":"10","city":"istanbul"}
{"_id":"2","name":"Veli","age":"22","city":"bursa"}
{"_id":"3","name":"Hasan","age":"34","city":"bayburt"}
{"_id":"4","name":"Elif","age":"11","city":"balıkesir"}
{"_id":"5","name":"Gizem","age":"53","city":"adana"}

Daha sonra tekrar bir cmd açıp yukarıdaki x.json dosyamızın olduğu konuma geldim ve aşağıdaki komut ile dosyamızı netcat ile 5400 portuna aktarmış oldum.

nc localhost 5400 < original.json

Daha sonra Logstash’i çalıştırdığım terminal ekranına döndüğüm zaman sonuçların ekrana 1 saniyeden kısa bir sürede gelmiş olduğunu gördüm.

Şekil 9 : Json dosyasının Tcp plugininde gösterimi

1.6.3 Logstash – Twitter Uygulaması

Bu uygulamamızda Logstash Twitter input plugini kullaranak Twitter API ile iletişime geçip belirttiğimiz hashtage ait tweetleri çekip bir dosyaya kaydedeceğiz.

Config dosyamız aşağıdaki gibidir. Bunun için ilk olarak twitter developer accountu açmalı ve aşağıdaki 4 adet key’e sahip olmalısınız. Ben kendi keylerimi paylaşamıyorum maalesef.

input {
	twitter {
		consumer_key => "xxxx"
		consumer_secret => "xxx"
		oauth_token => "xxxx"
		oauth_token_secret => "xxxx"
		keywords => [ "big data"]
		full_tweet => true
		ignore_retweets => true
}
}

output {
file {
	path => "tweetsnew.json"

}
}


Burada keywords fieldi aramak istediğimiz hashtagı, full_tweet fieldi o tweete air tüm detay bilgileri alan ve ignore_retweets ise retweetleri ignore etmemizi sağlayan fieldler. Retweetleri ignore ettik çünkü bir hashtage atılan binlerce retweet tweetler üzerinde bir analiz yapmak istediğimiz zaman bizi yanıltabilir.

Şekil 10 : Logstash Twitter Plugini

Daha sonra json dosyamıza girdiğimizde tweetlerin gelmiş olduğunu görebileceğiz. Çok fazla field içerdiğinden 1 tweete ait çekilen bilgileri aşağıda gösterebilirim.

{"quote_count":0,
"in_reply_to_screen_name":null,
"filter_level":"low",
"possibly_sensitive":false,
"created_at":"Thu May 07 11:24:49 +0000 2020","retweet_count":0,"@version":"1","@timestamp":"2020-05-07T11:24:49.000Z",
"is_quote_status":false,
"truncated":false,
"place":null,
"entities":{"urls":[{"url":"https://t.co/T6Mvytsqse","expanded_url":"https://bit.ly/3fcor94","display_url":"bit.ly/3fcor94","indices":[81,104]}],"user_mentions":[],"symbols":[],"hashtags":[]},"coordinates":null,"id_str":"1258357111345553412","timestamp_ms":"1588850689146","in_reply_to_status_id_str":null,
"source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","in_reply_to_user_id":null,
"text":"Top Ways in Which Big Data Has Fortified Geolocation Apps - Data Science Central https://t.co/T6Mvytsqse","reply_count":0,"favorite_count":0,"in_reply_to_user_id_str":null,"in_reply_to_status_id":null,"geo":null,"id":1258357111345553412,"contributors":null,"retweeted":false,"user":{"description":null,"url":"http://about.me/nivanecu","protected":false,"profile_background_image_url_https":"https://abs.twimg.com/images/themes/theme1/bg.png","statuses_count":549,"followers_count":31,"contributors_enabled":false,"profile_use_background_image":false,"created_at":"Sat May 28 00:01:42 +0000 2011","is_translator":false,"location":"Quito, Ecuador","default_profile":false,"translator_type":"none","profile_background_image_url":"http://abs.twimg.com/images/themes/theme1/bg.png","screen_name":"nivanecu","profile_banner_url":"https://pbs.twimg.com/profile_banners/306491885/1431542810","profile_link_color":"0084B4","notifications":null,"profile_text_color":"000000","id_str":"306491885","following":null,"name":"Nelson Ivan Herrera","utc_offset":null,"listed_count":2,"profile_sidebar_border_color":"000000","verified":false,"favourites_count":83,"time_zone":null,"profile_image_url_https":"https://pbs.twimg.com/profile_images/1252324954667978752/7vofA9XA_normal.jpg","profile_image_url":"http://pbs.twimg.com/profile_images/1252324954667978752/7vofA9XA_normal.jpg","profile_background_color":"000000","id":306491885,"default_profile_image":false,"follow_request_sent":null,"profile_background_tile":false,"lang":null,"friends_count":43,"profile_sidebar_fill_color":"000000","geo_enabled":true},"lang":"en","favorited":false}

1.6.4 ELK ( Elasticsearch, Logstash, Kibana ) ve Kafka uygulaması 

Bu uygulamada local disk üzerinde bulunan bir csv dosyasını alıp önce Elasticsearch’e gönderip Kibana üzerinden kontrol edeceğiz daha sonra aynı veriyi bir Kafka topicine yazacağız.

Elimizde “islem(str)”,”islemtarihi(str)”,”yapankisi(int)”,”basari_kodu(str)”,”islemsuresi(int)”,”operator_server_definer(str)” sütunlarından oluşan 3 adet csv dosyası var. Bu csv dosyalarının adı daily_date.csv şeklinde burada date önemli çünkü Elasticsearch’te indexlerken bu isimleri kullanacağız.

input {
	file {
		path => "/Users/tolgadilisim/Desktop/event-data/daily_*.csv"
		start_position => "beginning"
        }
}

Config dosyasını yine üçe bölerek açıklayacağım. İlk olarak input kısmında file plugini kullanarak path’imizi ve okumaya dosyanın neresinden başlayacağımızı belirtiyoruz. Dosyalarımızın hepsi daily_date şeklinde olduğu için daily_* notasyonu bize tüm dosyalarımızı görmemizi sağlayacak.

filter {
		csv {
			separator => ","
			columns => ["islem","islemtarihi","yapankisi","basari_kodu","islemsuresi","operator_server_definer"]
		}
		ruby {
	            code => "
	                event.set('index_type',event.get('path').split('/')[-1].gsub('.csv',''))
	            "
	        }

		mutate {
			convert => {
				"yapankisi" => "integer"
				"islemsuresi" => "integer"
			}
		}

	        mutate { add_field => { "[@metadata][mesaj]" => "%{message}" } }
		mutate { add_field => { "[@metadata][index_type]" => "%{index_type}" } }
                mutate { remove_field => ["index_type"] }

		mutate {
			 remove_field => ["host","message","@version","@timestamp","path"]

		 }



}

Burada ilk olarak csv pluginini kullanarak csv dosyalarımızın columnlarını ve seperatoru belirtiyoruz. Ruby plugini kullanarak her bir csv dosyası için dosya pathini alıyoruz sondan / işaretine göre bölüp .csv ifadesini “” ile değiştiriyoruz yani geriye sadece dosya adı kalıyor ve bunu bir değişkene aktarıyoruz.

Örnek : daily_22-04-2020.csv –> daily_22-04-2020 olacak ve biz daha sonra bu değişkenimizi Elasticsearch’te index oluştururken kullanacağız.

Burada çok fazla mutate kullandık bunları beraber ve ayrı ayrı kullanabiliriz. Burada ayrı ayrı kullanımını göstermek için bu kadar uzattım. İlk mutate plugininde yapankisi ve islemsüresini integer’a çevirdik. Bunların tipi zaten intti ama olur da veri içinde yanlışlıkla string gelirse diye önlem aldık. Daha sonra bunları Kafka’da göstermek için hepsini mesaj isimli bir metadata değişkenine aktardık. Metadatalar datanın içinde doğrudan yer almazlar ama message değişkeni doğrudan yer alıyordu. Hem onu ortadan kaldırmış olduk hem de tekrar elimizde messagenin sahip olduğu valueyi tuttuk. Diğer mutatelerde ise istemediğimiz, Logstash’in kendi eklediği fieldleri sildik.

output {
		elasticsearch {
			hosts => ["localhost:9200"]
			index => "%{[@metadata][index_type]}"
			document_type => "_doc"
			http_compression => true
				}

		kafka {
			topic_id => "tolgakafka"
			bootstrap_servers => "localhost:9092"

			codec => plain {
				format => "%{[@metadata][mesaj]}"
			}
		}
}

Geldik son kısmımıza, burada ilk başta Elasticsearch pluginini kullanarak Elasticsearch’in çalıştığı adresi belirttik. Daha sonra yukarıda dosya isimlerini kesmiştik hatırlarsınız. O kestiğimiz isimleri burada index adı olarak kullanıyoruz. Yani bu ne demek ? Her bir csv dosyası kendi adını taşıyan bir indexi oluşturacak ve veriler ayrı ayrı indexlerde tutulacak.

Document type ise default Elasticsearch doküman tipidir. Güncel versiyonlarda _docdur ancak eski Elasticsearch versiyonları kullanıyorsanız doc gibi farklı olabilir değiştirmeniz gerekli bu yüzden ayrıca belirttim şu an kullanmama gerek yok çünkü ben güncel Elasticsearch versiyonu kullanıyorum. 

Kafka tarafında ise yine Kafka plugini yardımıyla veriyi göndereceğimiz Kafka topicinin adı, Kafka serverlerimizin bilgileri ve ne göndereceğimizi codec kısmında belirtiyoruz. Bu da üstteki message -> metadata mesaj dönüşümü yaptığımız mesaj yani verinin kendisiydi direkt virgüllerle ayrılmış şekilde veriyi Kafka’ya basıyoruz.

Şimdi yukarıda anlattıklarımızı adım adım uygulayalım;

İlk olarak aşağıdaki komut ile bir Kafka topici yaratalım.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic tolgakafka

Daha sonra uygulamamızı başlatabiliriz sırasıyla şu uygulamaları aşağıdaki komutlarla başlatıyoruz: Elasticsearch -> Kibana -> Zookeeper -> Kafka -> Logstash -> Kafka topic

  1. bin/elasticsearch
  2. bin/kibana
  3. bin/zookeeper-server-start.sh config/zookeeper.properties
  4. bin/kafka-server-start.sh config/server.properties
  5. bin/logstash -f config/pipelines/pipeline.conf
  6. bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic tolgakafka

İlk olarak Kibana arayüzümüze girip “get indices” ile indexlerimizi sorguladığımızda daily dosyalarımızın isminde indexlerimizin oluştuğunu görüyoruz.

Şekil 11 : Elasticsearch indexlerinin Kibana’da gösterimi

Daha sonra indexlerimizin içi dolu mu diye sorgu attığımızda ise csv dosyalarımızın içeriklerinin başarılı bir şekilde Elasticsearch üzerinden aranabilir hale geldiğini gözlemliyoruz.

Şekil 12 : Kibana sorgu sonucu

Son olarak Kafka topicimize bakalım. Kafka plugini kullanarak csv dosyalarının içeriğini Kafka topicimize de göndermiş olduk.

Şekil 13 : Csv’nin Kafka Topic’e yazılması

Kaynakça
[1] : elastic.co/guide/en/logstash/current/introduction.html 
[2] : tutorialspoint.com/logstash/images/logstash_internal_architecture.jpg
[3] : elastic.co/guide/en/logstash/current/event-dependent-configuration.html

Bu yazım buraya kadardı. Bir sonraki yazımda görüşmek üzere ?