Now, make the pipeline.
- consumer application
local CPU : my computer ram
practice server : AWS server
In my case, local CPU----->practice server----->telegraf----->csv
input collect load
- ./kafka-topics.sh --create --bootstrap-server 3.35.49.20:9092 --replication-factor 1 --partitions 5 --topic joohyun-computer-metric
Let's create topic which has 5 partitions at factor 1 names joohyun-computer-metric.
Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-topics.sh --create --bootstrap-server IP:9092 --replication-factor 1 --partitions 5 --topic joohyun-computer-metric
Created topic joohyun-computer-metric.
- brew install --build-from-source telegraf
Install telegraf over brew.
Joses-MacBook-Pro:~ joohyunyoon$ brew install --build-from-source telegraf
Updating Homebrew...
Warning: You are using macOS 10.13.
...
==> Summary
🍺 /usr/local/Cellar/telegraf/1.17.0: 7 files, 106.1MB, built in 2 minutes 26 seconds
==> Caveats
==> telegraf
To have launchd start telegraf now and restart at login:
brew services start telegraf
Or, if you don't want/need a background service you can just run:
telegraf -config /usr/local/etc/telegraf.conf
- create telegraf.conf
Joses-MacBook-Pro:bin joohyunyoon$ vi telegraf.conf
[agent]
interval = "10s"
[[outputs.kafka]]
brokers = ["{aws ec2 public ip}:9092"]
## Kafka topic for producer messages
topic = "my-computer-metric"
[[inputs.cpu]]
percpu = true
totalcpu = true
fielddrop = ["time_*"]
[[inputs.mem]]
- ./telegraf -config telegraf.conf
Then, let's run telegraf run by every 10 seconds as setup above.
Joses-MacBook-Pro:bin joohyunyoon$ ./telegraf -config telegraf.conf
2020-12-22T09:07:18Z I! Starting Telegraf 1.17.0
2020-12-22T09:07:18Z I! Loaded inputs: cpu mem
2020-12-22T09:07:18Z I! Loaded aggregators:
2020-12-22T09:07:18Z I! Loaded processors:
2020-12-22T09:07:18Z I! Loaded outputs: kafka
2020-12-22T09:07:18Z I! Tags enabled: host=Joses-MacBook-Pro.local
2020-12-22T09:07:18Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"Joses-MacBook-Pro.local", Flush Interval:10s
- set up save the data as csv
package com.tacademy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerWorker implements Runnable {
private Properties prop;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
ConsumerWorker(Properties prop, String topic, int number) {
this.prop = prop;
this.topic = topic;
this.threadName = "consumer-thread-" + number;
}
@Override
public void run() {
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList(topic));
//get the polled data
File file = new File(threadName + ".csv");
//create new csv file
try {
while (true) {
FileWriter fw = new FileWriter(file, true);
//save the data with FileWriter
StringBuilder fileWriteBuffer = new StringBuilder();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//record from poll
fileWriteBuffer.append(record.value()).append("\n");
}
//appending it with fileWriteBuffer
fw.write(fileWriteBuffer.toString());
consumer.commitSync();
fw.close();
}
} catch (IOException e) {
System.err.println(threadName + " IOException"+e);
} catch (WakeupException e) {
System.out.println(threadName + " WakeupException");
} finally {
consumer.commitSync();
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
- run and check if the polling data is save as csv
package com.tacademy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerSaveMetric {
private static String TOPIC_NAME = "joohyun-computer-metric";
private static String GROUP_ID = "metric-consumers";
private static String BOOTSTRAP_SERVERS = ""; //IP:port
private static int CONSUMER_COUNT = 5;
//because I created 5partitions
private static List<ConsumerWorker> workerThreads = new ArrayList<>();
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
workerThreads.add(worker);
executorService.execute(worker);
}
}
static class ShutdownThread extends Thread {
public void run() {
workerThreads.forEach(ConsumerWorker::shutdown);
System.out.println("Bye");
}
}
}
Joses-MacBook-Pro:kafka-consumer-save-metric joohyunyoon$ ls
build consumer-thread-1.csv consumer-thread-4.csv gradlew.bat
build.gradle consumer-thread-2.csv gradle settings.gradle
consumer-thread-0.csv consumer-thread-3.csv gradlew src
(base) Joses-MacBook-Pro:kafka-consumer-save-metric joohyunyoon$ tail -f *.csv
==> consumer-thread-0.csv <==
==> consumer-thread-1.csv <==
...
==> consumer-thread-2.csv <==
==> consumer-thread-3.csv <==
==> consumer-thread-4.csv <==
'KAFKA' 카테고리의 다른 글
kafka setup consumer, auto commit, sync commit, multi thread (0) | 2020.12.22 |
---|---|
kafka setup producer, setup key and value (0) | 2020.12.21 |
install and execute kafka on my MAC (0) | 2020.12.18 |
install and execute kafka from AWS (0) | 2020.12.18 |
Install kafka, start zookeeper, create topic (0) | 2020.12.17 |