KAFKA

kafka pipeline(local->telegraf->csv)

Naranjito 2020. 12. 22. 18:34

Now, make the pipeline.

 

  • consumer application

local CPU : my computer ram

practice server : AWS server

 

In my case, local CPU----->practice server----->telegraf----->csv

                                   input                        collect              load

 

 

  • ./kafka-topics.sh --create --bootstrap-server 3.35.49.20:9092 --replication-factor 1 --partitions 5 --topic joohyun-computer-metric

Let's create topic which has 5 partitions at factor 1 names  joohyun-computer-metric.

Joses-MacBook-Pro:bin joohyunyoon$ ./kafka-topics.sh --create --bootstrap-server IP:9092 --replication-factor 1 --partitions 5 --topic joohyun-computer-metric
Created topic joohyun-computer-metric.

 

 

  • brew install --build-from-source telegraf

Install telegraf over brew.

Joses-MacBook-Pro:~ joohyunyoon$ brew install --build-from-source telegraf
Updating Homebrew...
Warning: You are using macOS 10.13.

...

==> Summary
🍺  /usr/local/Cellar/telegraf/1.17.0: 7 files, 106.1MB, built in 2 minutes 26 seconds
==> Caveats
==> telegraf
To have launchd start telegraf now and restart at login:
  brew services start telegraf
Or, if you don't want/need a background service you can just run:
  telegraf -config /usr/local/etc/telegraf.conf

 

 

  • create telegraf.conf
Joses-MacBook-Pro:bin joohyunyoon$ vi telegraf.conf
[agent]
  interval = "10s"

[[outputs.kafka]]
  brokers = ["{aws ec2 public ip}:9092"]
  ## Kafka topic for producer messages
  topic = "my-computer-metric"

[[inputs.cpu]]
  percpu = true
  totalcpu = true
  fielddrop = ["time_*"]

[[inputs.mem]]

 

 

  • ./telegraf -config telegraf.conf

Then, let's run telegraf run by every 10 seconds as setup above. 

Joses-MacBook-Pro:bin joohyunyoon$ ./telegraf -config telegraf.conf
2020-12-22T09:07:18Z I! Starting Telegraf 1.17.0
2020-12-22T09:07:18Z I! Loaded inputs: cpu mem
2020-12-22T09:07:18Z I! Loaded aggregators: 
2020-12-22T09:07:18Z I! Loaded processors: 
2020-12-22T09:07:18Z I! Loaded outputs: kafka
2020-12-22T09:07:18Z I! Tags enabled: host=Joses-MacBook-Pro.local
2020-12-22T09:07:18Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"Joses-MacBook-Pro.local", Flush Interval:10s

 

  • set up save the data as csv
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWorker implements Runnable {
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        //get the polled data
        File file = new File(threadName + ".csv");
        //create new csv file

        try {
            while (true) {
                FileWriter fw = new FileWriter(file, true);
                //save the data with FileWriter
                
                StringBuilder fileWriteBuffer = new StringBuilder();
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    //record from poll
                    
                    fileWriteBuffer.append(record.value()).append("\n");
                }
                //appending it with fileWriteBuffer
                
                fw.write(fileWriteBuffer.toString());
                consumer.commitSync();
                fw.close();
            }
        } catch (IOException e) {
            System.err.println(threadName + " IOException"+e);
        } catch (WakeupException e) {
            System.out.println(threadName + " WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

 

 

  • run and check if the polling data is save as csv
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerSaveMetric {
    private static String TOPIC_NAME = "joohyun-computer-metric";
    private static String GROUP_ID = "metric-consumers";
    private static String BOOTSTRAP_SERVERS = ""; //IP:port
    private static int CONSUMER_COUNT = 5;
    //because I created 5partitions
    
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            executorService.execute(worker);
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}
Joses-MacBook-Pro:kafka-consumer-save-metric joohyunyoon$ ls
build                   consumer-thread-1.csv   consumer-thread-4.csv   gradlew.bat
build.gradle            consumer-thread-2.csv   gradle                  settings.gradle
consumer-thread-0.csv   consumer-thread-3.csv   gradlew                 src
(base) Joses-MacBook-Pro:kafka-consumer-save-metric joohyunyoon$ tail -f *.csv

==> consumer-thread-0.csv <==

==> consumer-thread-1.csv <==

...

==> consumer-thread-2.csv <==

==> consumer-thread-3.csv <==

==> consumer-thread-4.csv <==