KAFKA

kafka setup consumer, auto commit, sync commit, multi thread

Naranjito 2020. 12. 22. 17:24
  • import compile group
plugins {
    id 'java'
}

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

compile group : creatable class which creates client consumer, producer instance is designated through dependencies,  so firmly it needs to be imported

 

version : matchable, better betwwen broker version and client version

 

  • set up the consumer
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    private static String TOPIC_NAME = "joohyun";
    private static String GROUP_ID = "joohyungroup";
    private static String BOOTSTRAP_SERVERS = ""; //my AWS Public IP:port

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
        //deserialize
        
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
        //deserialize

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); 
            //polling the data waiting for 1 second
            
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

 

 

  • ./kafka-console-producer.sh --bootstrap-server my AWS Public IP:9092 --topic joohyun

Result on the kafka console.

Interact with application.

Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server my AWS Public IP:9092 --topic joohyun
>a
b>
>c
>d

 

 

  • Result on the application

Interact with console producer.

a
b
hello
world
This is record 0
This is record 2
This is record 3
This is record 9
This is record 3
This is record 5
This is record 8
kafka
This is record 1
This is record 5
This is record 7
This is record 8
This is record 0
This is record 2
This is record 7
This is record 9
e
hello
This is record 4
This is record 6
This is record 1
This is record 4
This is record 6
a
b
c
d

 

  • set up consumer auto commit
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWithAutoCommit {
    private static String TOPIC_NAME = "joohyun";
    private static String GROUP_ID = "joohyungroup";
    private static String BOOTSTRAP_SERVERS = ""; //my AWS Public IP:port

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
        //auto commit? yes, true is default, in other words, it commit automatically internally whenever pooing data
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000); 
        // by 60 seconds

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

ENABLE_AUTO_COMMIT_CONFIG, true : it commits automatically by certain interval when poll() works

 

 

  • set up consumer sync commit

In order to avoid duplicate data.

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWithSyncCommit {
    private static String TOPIC_NAME = "joohyun";
    private static String GROUP_ID = "joohyungroup";
    private static String BOOTSTRAP_SERVERS = ""; //my AWS Public IP:port

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
        //false, in other words, I do not want to auto commit, i do commit sync below

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
                consumer.commitSync(); 
                //i do commit sync instead of auto commit
                
                record.offset();
            }
        }
    }
}

 

  • Result on consumer console
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server IP:9092 --topic joohyun
>hello hola

 

 

  • Result on consumer application
hello hola

 

 

  • Result on consumer console after put more data and stop consumer application
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server IP:9092 --topic joohyun
>hello hola
>hola
>mundo

 

  • Result on consumer application after put more data through consumer console and stop consumer application

Only new data return after stop the consumer application byv set up consumer sync commit

hello hola
hola
mundo

 

 

  • set up consumer multi thread
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerWithMultiThread {
    private static String TOPIC_NAME = "joohyun";
    private static String GROUP_ID = "joohyungroup";
    private static String BOOTSTRAP_SERVERS = ""; //IP:port
    private static int CONSUMER_COUNT = 3;
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ExecutorService executorService = Executors.newCachedThreadPool();
        //when thread works completely, it doesn't work anymore with newCachedThreadPool
        
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            executorService.execute(worker);
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}

 

 

  • Result on consumer console 
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server IP:9092 --topic joohyun
>a
>b
>c
>d
>hola hola
>mundo world

 

 

  • Result on consumer application
consumer-thread-0 >> a
consumer-thread-0 >> b
consumer-thread-0 >> c
consumer-thread-0 >> d
consumer-thread-1 >> hola hola
consumer-thread-2 >> mundo world

 

 

  • kill -term 

Terminate consumer multi thread safely.

Joses-MacBook-Pro:~ joohyunyoon$ jps
46400 ConsumerWithMultiThread
39793 Kafka
46673 Jps
45649 ConsoleProducer
27410 QuorumPeerMain
42887 ConsoleProducer
37710 
46351 GradleDaemon
Joses-MacBook-Pro:~ joohyunyoon$ kill -term 46400