- import compile group
plugins {
id 'java'
}
group 'com.tacademy'
version '1.0'
repositories {
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}
compile group : creatable class which creates client consumer, producer instance is designated through dependencies, so firmly it needs to be imported
version : matchable, better betwwen broker version and client version
- set up the consumer
package com.tacademy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
private static String TOPIC_NAME = "joohyun";
private static String GROUP_ID = "joohyungroup";
private static String BOOTSTRAP_SERVERS = ""; //my AWS Public IP:port
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//deserialize
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//deserialize
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
//polling the data waiting for 1 second
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
- ./kafka-console-producer.sh --bootstrap-server my AWS Public IP:9092 --topic joohyun
Result on the kafka console.
Interact with application.
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server my AWS Public IP:9092 --topic joohyun
>a
b>
>c
>d
- Result on the application
Interact with console producer.
a
b
hello
world
This is record 0
This is record 2
This is record 3
This is record 9
This is record 3
This is record 5
This is record 8
kafka
This is record 1
This is record 5
This is record 7
This is record 8
This is record 0
This is record 2
This is record 7
This is record 9
e
hello
This is record 4
This is record 6
This is record 1
This is record 4
This is record 6
a
b
c
d
- set up consumer auto commit
package com.tacademy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerWithAutoCommit {
private static String TOPIC_NAME = "joohyun";
private static String GROUP_ID = "joohyungroup";
private static String BOOTSTRAP_SERVERS = ""; //my AWS Public IP:port
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//auto commit? yes, true is default, in other words, it commit automatically internally whenever pooing data
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
// by 60 seconds
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
ENABLE_AUTO_COMMIT_CONFIG, true : it commits automatically by certain interval when poll() works
- set up consumer sync commit
In order to avoid duplicate data.
package com.tacademy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerWithSyncCommit {
private static String TOPIC_NAME = "joohyun";
private static String GROUP_ID = "joohyungroup";
private static String BOOTSTRAP_SERVERS = ""; //my AWS Public IP:port
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//false, in other words, I do not want to auto commit, i do commit sync below
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
consumer.commitSync();
//i do commit sync instead of auto commit
record.offset();
}
}
}
}
- Result on consumer console
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server IP:9092 --topic joohyun
>hello hola
- Result on consumer application
hello hola
- Result on consumer console after put more data and stop consumer application
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server IP:9092 --topic joohyun
>hello hola
>hola
>mundo
- Result on consumer application after put more data through consumer console and stop consumer application
Only new data return after stop the consumer application byv set up consumer sync commit
hello hola
hola
mundo
- set up consumer multi thread
package com.tacademy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerWithMultiThread {
private static String TOPIC_NAME = "joohyun";
private static String GROUP_ID = "joohyungroup";
private static String BOOTSTRAP_SERVERS = ""; //IP:port
private static int CONSUMER_COUNT = 3;
private static List<ConsumerWorker> workerThreads = new ArrayList<>();
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ExecutorService executorService = Executors.newCachedThreadPool();
//when thread works completely, it doesn't work anymore with newCachedThreadPool
for (int i = 0; i < CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
workerThreads.add(worker);
executorService.execute(worker);
}
}
static class ShutdownThread extends Thread {
public void run() {
workerThreads.forEach(ConsumerWorker::shutdown);
System.out.println("Bye");
}
}
}
- Result on consumer console
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-console-producer.sh --bootstrap-server IP:9092 --topic joohyun
>a
>b
>c
>d
>hola hola
>mundo world
- Result on consumer application
consumer-thread-0 >> a
consumer-thread-0 >> b
consumer-thread-0 >> c
consumer-thread-0 >> d
consumer-thread-1 >> hola hola
consumer-thread-2 >> mundo world
- kill -term
Terminate consumer multi thread safely.
Joses-MacBook-Pro:~ joohyunyoon$ jps
46400 ConsumerWithMultiThread
39793 Kafka
46673 Jps
45649 ConsoleProducer
27410 QuorumPeerMain
42887 ConsoleProducer
37710
46351 GradleDaemon
Joses-MacBook-Pro:~ joohyunyoon$ kill -term 46400
'KAFKA' 카테고리의 다른 글
kafka pipeline(local->telegraf->csv) (0) | 2020.12.22 |
---|---|
kafka setup producer, setup key and value (0) | 2020.12.21 |
install and execute kafka on my MAC (0) | 2020.12.18 |
install and execute kafka from AWS (0) | 2020.12.18 |
Install kafka, start zookeeper, create topic (0) | 2020.12.17 |