Programming/Redis, Kafka

카프카 kafka 토픽 + producer + consumer 예제 코드

by Renechoi 2023. 5. 30.



brew를 통해 설치했고 kafka를 실행했다면 .sh 확장자 없이 다음과 같은 명령어로도 카프카 토픽 실행이 가능하다.


카프카 토픽 








자바 코드에서 다음과 같은 코드로 토픽을 생성할 수 있다. 


private static void createTopics() {
		String bootstrapServers = "";

		// Kafka 관리자 클라이언트 설정
		Properties adminProps = new Properties();
		adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		AdminClient adminClient = AdminClient.create(adminProps);

		// 생성할 토픽 설정
		String topicName = "demo_java";
		int numPartitions = 1;
		short replicationFactor = 1;

		// NewTopic 객체 생성
		NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);

		// 토픽 생성
		// AdminClient 종료


CLI에서 생성된 토픽 확인하기 





producer를 만들어 메시지를 전송해보기 


private static void createProducer() {
   Properties properties = new Properties();
   properties.setProperty("bootstrap.servers", "");

   // 프로듀서로 문자열이 들어오면 직렬화 한다
   properties.setProperty("key.serializer", StringSerializer.class.getName());
   properties.setProperty("value.serializer", StringSerializer.class.getName());

   KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

   ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_java", "hello world");

   // 전송하는 것은 비동기이지만 테스트를 위해 flush 해주어 바로 반응을 보도록 한다.




producer가 전송한 메시지를 다음과 같은 명령어로 확인할 수 있다. 



kafka-console-consumer --bootstrap-server --topic demo_java --from-beginning





여러 개의 메시지를 보내고 로그로 찍어보기 


   public static void main(String[] args) {
      log.info("I am a Kafka Producer!");

      // create Producer Properties
      Properties properties = new Properties();

      // connect to Localhost
             properties.setProperty("bootstrap.servers", "");

      // connect to Conduktor Playground

      // set producer properties
      properties.setProperty("key.serializer", StringSerializer.class.getName());
      properties.setProperty("value.serializer", StringSerializer.class.getName());

      properties.setProperty("batch.size", "400");

      //        properties.setProperty("partitioner.class", RoundRobinPartitioner.class.getName());

      // create the Producer
      KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

      for (int j=0; j<10; j++){

         for (int i=0; i<30; i++){

            // create a Producer Record
            ProducerRecord<String, String> producerRecord =
               new ProducerRecord<>("demo_java", "hello world " + i);

            // send data
            producer.send(producerRecord, new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                  // executes every time a record successfully sent or an exception is thrown
                  if (e == null) {
                     // the record was successfully sent
                     log.info("Received new metadata \n" +
                        "Topic: " + metadata.topic() + "\n" +
                        "Partition: " + metadata.partition() + "\n" +
                        "Offset: " + metadata.offset() + "\n" +
                        "Timestamp: " + metadata.timestamp());
                  } else {
                     log.error("Error while producing", e);

         try {
         } catch (InterruptedException e) {


      // tell the producer to send all data and block until done -- synchronous

      // flush and close the producer



라운드로빈 방식으로 파티션에 들어가는 것을 확인할 수 있다. 




같은 키가 같은 파티션으로 가는지를 확인해보는 예제 


public class ProducerDemoKeys {

    private static final Logger log = LoggerFactory.getLogger(ProducerDemoKeys.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Producer!");

        // create Producer Properties
        Properties properties = new Properties();

        // connect to Localhost
       properties.setProperty("bootstrap.servers", "");

        // connect to Conduktor Playground

        // set producer properties
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // create the Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int j=0; j<2; j++){

            for (int i=0; i<10; i++){

                String topic = "demo_java";
                String key = "id_" + i;
                String value = "hello world " + i;

                // create a Producer Record
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<>(topic, key, value);

                // send data
                producer.send(producerRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // executes every time a record successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            log.info("Key: " + key + " | Partition: " + metadata.partition());
                        } else {
                            log.error("Error while producing", e);

            try {
            } catch (InterruptedException e) {

        // tell the producer to send all data and block until done -- synchronous

        // flush and close the producer










consumer를 만들어 consume 해보기 


public class ConsumerDemo {

   private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());

   public static void main(String[] args) {
      log.info("I am a Kafka Consumer!");

      String groupId = "my-java-application";
      String topic = "demo_java";

      // create Producer Properties
      Properties properties = new Properties();

      // connect to Localhost
      properties.setProperty("bootstrap.servers", "");

      // connect to Conduktor Playground

      // create consumer configs
      properties.setProperty("key.deserializer", StringDeserializer.class.getName());
      properties.setProperty("value.deserializer", StringDeserializer.class.getName());
      properties.setProperty("group.id", groupId);

      // none: consumer 그룹이 없으면 시작 하지 않음
      // latest: 방금 보낸 것을 보기 원할 때
      // earliest : 전체를 보기 원할 때
      properties.setProperty("auto.offset.reset", "earliest");

      // create a consumer
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

      // subscribe to a topic

      // poll for data
      while (true) {


         ConsumerRecords<String, String> records =

         for (ConsumerRecord<String, String> record : records) {
            log.info("Key: " + record.key() + ", Value: " + record.value());
            log.info("Partition: " + record.partition() + ", Offset: " + record.offset());






더이상 consume 할 것이 없으면 무한 루프를 돈다. 







메인 스레드가 종료되면 consumer를 명시적으로 close 할 수 있게 해주기 



public class ConsumerDemoWithShutdown {

    private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWithShutdown.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer!");

        String groupId = "my-java-application";
        String topic = "demo_java";

        // create Producer Properties
        Properties properties = new Properties();

        // connect to Localhost
       properties.setProperty("bootstrap.servers", "");

        // connect to Conduktor Playground

        // create consumer configs
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        properties.setProperty("group.id", groupId);
        properties.setProperty("auto.offset.reset", "earliest");

        // create a consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // get a reference to the main thread
        final Thread mainThread = Thread.currentThread();

        // adding the shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");

                // join the main thread to allow the execution of the code in the main thread
                try {
                } catch (InterruptedException e) {

        try {
            // subscribe to a topic
            // poll for data
            while (true) {

                ConsumerRecords<String, String> records =

                for (ConsumerRecord<String, String> record : records) {
                    log.info("Key: " + record.key() + ", Value: " + record.value());
                    log.info("Partition: " + record.partition() + ", Offset: " + record.offset());


        } catch (WakeupException e) {
            // wakeup 메서드를 통해 일부러 발생시킨 예외
            log.info("Consumer is starting to shut down");

        } catch (Exception e) {
            log.error("Unexpected exception in the consumer", e);
        } finally {
            consumer.close(); // close the consumer, this will also commit offsets
            log.info("The consumer is now gracefully shut down");



Runtime.getRuntime().addShutdownHook: 프로그램이 종료되기 전에 실행되는 셧다운 후크를 추가한다.


이 경우, Consumer가 graceful shutdown을 수행하기 위해 consumer.wakeup()을 호출하고, 메인 스레드의 실행이 완료될 때까지 대기한다.


WakeupException: Consumer의 wakeup() 메서드가 호출되어 Consumer가 종료되는 상황을 처리한다. 

Exception: 예상치 못한 예외가 발생한 경우를 처리한다.

consumer.close(): Consumer를 닫고 오프셋을 커밋하여 올바르게 종료한다.

로깅을 통해 Consumer의 graceful shutdown이 수행되었음을 표시한다.


