Merhabalar. Flume ve Kafka akan veri işlemenin önde gelen araçlarından. Her ne kadar Flume eskimeye yüz tutsa da hala aktif olarak kullanılıyor ve iyi işler çıkarıyor. Bir akan veri uygulaması mimarisinde eğer Flume ve Kafka birlikte kullanılıyorsa genelde veri kaynağından Flume ile Kafka’ya doğru bir akış tasarlanır. Kafka’ya giden mesaj iki bölümden oluşur: key ve value. Kafka’ya key göndermez isek key null değeri alır. Ancak Kafka’ya key göndermek bize ilave fonksiyonalite sağlayabilir. Bu yazımızdaFlume ile Spooldir kaynağından okunan kayıtların Kafka’ya gönderilmesi esnasında mesajın içinden key alanını(eğer varsa) alıp Kafka’ya nasıl key adı altında gönderebileceğimizi göreceğiz. 

Spooldir ile örnek verecek olursak her bir satır bir event’ı temsil eder. Aşağıdaki Şekil-1’de gördüğümüz Payload kısmı satırdan oluşur. Ancak bir de başlık kısmı vardır. Headers, yönlendirme kararları vermek veya diğer yapılandırılmış bilgileri (event zaman damgası veya event’a ait sunucunun adı gibi) taşımak için kullanılabilen anahtar / değer çiftleridir. 

Şekil-1: Flume Event Yapısı

Flume içinde bir çok sink vardır. Bunlardan birisi de Kafka’dır. Ancak Kafka sink bize mesaj içinden key alaını alıp kullanmamıza imkan vermiyor. Bu sebeple bu key değerini mesajın içinden interceptors ile alabiliriz. Flume içinde bir kaç farklı Interceptors var bunlardan birisi de regex_extractor. Regex_extractor mesaj içinden java düzenli ifadeye oygun olanını alıp belirlediğimiz bir isim ile Flume header içinde tutuyor. Eğer biz bu değeri key isminde tutar da Kafka Sink’i içinde a1.sinks.k1.kafka.KafkaKeyInterceptor= key şeklinde tanımlarsak bu düzenli ifade ile yakaladığımız değeri Kafka’ya key olarak gönderebiliriz. 

Şimdi örneğe geçelim:

Kullanacağımız kaynak dosya içeriği şu şekildedir:

0,5.1,3.5,1.4,0.2,Iris-setosa,2020-06-23 09:36:22.954788
1,4.9,3.0,1.4,0.2,Iris-setosa,2020-06-23 09:36:23.055103
2,4.7,3.2,1.3,0.2,Iris-setosa,2020-06-23 09:36:23.155118
3,4.6,3.1,1.5,0.2,Iris-setosa,2020-06-23 09:36:23.255456
4,5.0,3.6,1.4,0.2,Iris-setosa,2020-06-23 09:36:23.355829
5,5.4,3.9,1.7,0.4,Iris-setosa,2020-06-23 09:36:23.455884
6,4.6,3.4,1.4,0.3,Iris-setosa,2020-06-23 09:36:23.555917
7,5.0,3.4,1.5,0.2,Iris-setosa,2020-06-23 09:36:23.656288
8,4.4,2.9,1.4,0.2,Iris-setosa,2020-06-23 09:36:23.756695
9,4.9,3.1,1.5,0.1,Iris-setosa,2020-06-23 09:36:23.857249

Biz yukarıdaki dosyada ilk alanın primary key’i temsil ettiğini biliyoruz ve bunu Kafka’ya key olarak göndermek istiyoruz. flume.conf dosyamı aşağıdaki şekilde tasarlıyorum.

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/murat/flume_deneme/spool
a1.sources.r1.fileHeader = true
a1.sources.r1.deletePolicy=immediate

# key extractor for kafka
a1.sources.r1.interceptors=KafkaKeyInterceptor
a1.sources.r1.interceptors.KafkaKeyInterceptor.type=regex_extractor
a1.sources.r1.interceptors.KafkaKeyInterceptor.regex=(\\d+),
a1.sources.r1.interceptors.KafkaKeyInterceptor.serializers=ser1
a1.sources.r1.interceptors.KafkaKeyInterceptor.serializers.ser1.name=key

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = irisnocompaction
a1.sinks.k1.kafka.key = key
a1.sinks.k1.kafka.bootstrap.servers = cloudera:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Yukarıda püf nokta key extractor for kafka bloğu. Bu blokta:

Spooldirectory kaynağı olan r1’e bir interceptor tanımlıyoruz ve adına KafkaKeyInterceptor diyoruz.

a1.sources.r1.interceptors=KafkaKeyInterceptor

KafkaKeyInterceptor adındaki interceptor’a tür belirliyoruz: regex_extractor.

a1.sources.r1.interceptors.KafkaKeyInterceptor.type=regex_extractor

Aşağıdaki düzenli ifade virgülden önceki rakamları alıyor. Yani ilk alanı.

a1.sources.r1.interceptors.KafkaKeyInterceptor.regex=(\\d+),

Aşağıda ise serileştirme ve Flume header içinde hangi ad ile ulaşılacağını belirtiyoruz.

a1.sources.r1.interceptors.KafkaKeyInterceptor.serializers=ser1
a1.sources.r1.interceptors.KafkaKeyInterceptor.serializers.ser1.name=key

Bundan sonra geriye sadece Kafka sink bloğunda

a1.sinks.k1.kafka.key = key

satırını eklemek kalıyor.

Flume agent’i çalıştırdığımızda Kafka’ya gidecek mesaj aşağıdaki gibi olacaktır.

0       0,5.1,3.5,1.4,0.2,Iris-setosa,2020-06-23 09:36:22.954788
2       2,4.7,3.2,1.3,0.2,Iris-setosa,2020-06-23 09:36:23.155118
3       3,4.6,3.1,1.5,0.2,Iris-setosa,2020-06-23 09:36:23.255456
1       1,4.9,3.0,1.4,0.2,Iris-setosa,2020-06-23 09:36:23.055103
4       4,5.0,3.6,1.4,0.2,Iris-setosa,2020-06-23 09:36:23.355829
9       9,4.9,3.1,1.5,0.1,Iris-setosa,2020-06-23 09:36:23.857249
5       5,5.4,3.9,1.7,0.4,Iris-setosa,2020-06-23 09:36:23.455884
7       7,5.0,3.4,1.5,0.2,Iris-setosa,2020-06-23 09:36:23.656288
8       8,4.4,2.9,1.4,0.2,Iris-setosa,2020-06-23 09:36:23.756695
6       6,4.6,3.4,1.4,0.3,Iris-setosa,2020-06-23 09:36:23.555917

Yukarıdaki kafka consumer çıktısından gördüğümüz gibi Kafka’ya event içindeki bir alanı key olarak göndermeyi başardık.

Başka bir yazıda görüşmek dileğiyle hoşçakalın.