KAFKA

kafka setup producer, setup key and value

Naranjito 2020. 12. 21. 20:35

Then, let's put kafka producer and consumer in practice consecutively previous posts(2020/12/18 - [KAFKA] - install and execute kafka from AWS2020/12/18 - [KAFKA] - install and execute kafka on my MAC).

 

  • set up the producer

In my case, I used IntelliJ IDEA.

package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class SimpleProducer {
    private static String TOPIC_NAME = "joohyun";
    private static String BOOTSTRAP_SERVERS = " "; //my AWS Public IP:port

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
        //set up BOOTSTRAP_SERVERS with my AWS Public IP and port
        
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
        StringSerializer.class.getName()); 
        //Serialization key to string with StringSerializer
        
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
        StringSerializer.class.getName()); 
        //Serialization value to string with StringSerializer

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs); 
        //let's create KafkaProducer containing configs what I setup as above

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index; 
            //let's produce the data from 0 to 9
            
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data); 
            //there is only value(data), no key
            
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data); 
                //let's contain the data into topic joohyun
                
                Thread.sleep(1000); //taking one second off
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

 

 

  • Result after run producer without key 

It has not setup the key so it returns as null.

null-This is record 0
null-This is record 1
null-This is record 2
null-This is record 3
null-This is record 5
null-This is record 4
null-This is record 6
null-This is record 8
null-This is record 7
null-This is record 9

 

 

  • set up the key and value of producer

Then, let's setup the key.

package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithKeyValue {
    private static String TOPIC_NAME = "joohyun";
    private static String BOOTSTRAP_SERVERS = " "; //my AWS Public IP:port

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
            //key is Integer.toString(index), value is data
            
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

key.serializer : the class used in message key serialization

 

value.serializer : the class used in message value serialization

 

 

  • ./kafka-console-producer.sh --bootstrap-server my AWS Public IP:9092 --topic joohyun

It is the result after run producer with key .

The role of key is delimiter to distinguish the messages. Otherwise the rold of value is the data to produce.

Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-consumer.sh 
--bootstrap-server my AWS Public IP:9092 --topic joohyun --property 
print.key=true --property key.separator="-"
0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5
6-This is record 6
7-This is record 7
8-This is record 8
9-This is record 9

--bootstrap-server : list of broker in order to connect to kafka, request to connect this server

 

--property print.key=true : show me the key as a property

 

key.separator="-" : show me the value with separator("-")