Then, let's put kafka producer and consumer in practice consecutively previous posts(2020/12/18 - [KAFKA] - install and execute kafka from AWS, 2020/12/18 - [KAFKA] - install and execute kafka on my MAC).
- set up the producer
In my case, I used IntelliJ IDEA.
package com.tacademy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
private static String TOPIC_NAME = "joohyun";
private static String BOOTSTRAP_SERVERS = " "; //my AWS Public IP:port
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//set up BOOTSTRAP_SERVERS with my AWS Public IP and port
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//Serialization key to string with StringSerializer
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//Serialization value to string with StringSerializer
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
//let's create KafkaProducer containing configs what I setup as above
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
//let's produce the data from 0 to 9
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
//there is only value(data), no key
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
//let's contain the data into topic joohyun
Thread.sleep(1000); //taking one second off
} catch (Exception e) {
System.out.println(e);
}
}
}
}
- Result after run producer without key
It has not setup the key so it returns as null.
null-This is record 0
null-This is record 1
null-This is record 2
null-This is record 3
null-This is record 5
null-This is record 4
null-This is record 6
null-This is record 8
null-This is record 7
null-This is record 9
- set up the key and value of producer
Then, let's setup the key.
package com.tacademy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerWithKeyValue {
private static String TOPIC_NAME = "joohyun";
private static String BOOTSTRAP_SERVERS = " "; //my AWS Public IP:port
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
//key is Integer.toString(index), value is data
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
key.serializer : the class used in message key serialization
value.serializer : the class used in message value serialization
- ./kafka-console-producer.sh --bootstrap-server my AWS Public IP:9092 --topic joohyun
It is the result after run producer with key .
The role of key is delimiter to distinguish the messages. Otherwise the rold of value is the data to produce.
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-consumer.sh
--bootstrap-server my AWS Public IP:9092 --topic joohyun --property
print.key=true --property key.separator="-"
0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5
6-This is record 6
7-This is record 7
8-This is record 8
9-This is record 9
--bootstrap-server : list of broker in order to connect to kafka, request to connect this server
--property print.key=true : show me the key as a property
key.separator="-" : show me the value with separator("-")
'KAFKA' 카테고리의 다른 글
kafka pipeline(local->telegraf->csv) (0) | 2020.12.22 |
---|---|
kafka setup consumer, auto commit, sync commit, multi thread (0) | 2020.12.22 |
install and execute kafka on my MAC (0) | 2020.12.18 |
install and execute kafka from AWS (0) | 2020.12.18 |
Install kafka, start zookeeper, create topic (0) | 2020.12.17 |