본문 바로가기
Programming/Redis, Kafka

카프카 kafka 토픽 + producer + consumer 예제 코드

by Renechoi 2023. 5. 30.

 

 

brew를 통해 설치했고 kafka를 실행했다면 .sh 확장자 없이 다음과 같은 명령어로도 카프카 토픽 실행이 가능하다.

 

카프카 토픽 

 

kafka-topics

 

 

 

 

 

자바 코드에서 다음과 같은 코드로 토픽을 생성할 수 있다. 

 

private static void createTopics() {
		String bootstrapServers = "127.0.0.1:9092";

		// Kafka 관리자 클라이언트 설정
		Properties adminProps = new Properties();
		adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		AdminClient adminClient = AdminClient.create(adminProps);

		// 생성할 토픽 설정
		String topicName = "demo_java";
		int numPartitions = 1;
		short replicationFactor = 1;

		// NewTopic 객체 생성
		NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);

		// 토픽 생성
		adminClient.createTopics(Collections.singletonList(newTopic));
		
		// AdminClient 종료
		adminClient.close();
	}

 

CLI에서 생성된 토픽 확인하기 

 

 

 

 

producer를 만들어 메시지를 전송해보기 

 

private static void createProducer() {
   Properties properties = new Properties();
   properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

   // 프로듀서로 문자열이 들어오면 직렬화 한다
   properties.setProperty("key.serializer", StringSerializer.class.getName());
   properties.setProperty("value.serializer", StringSerializer.class.getName());

   KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

   ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_java", "hello world");

   // 전송하는 것은 비동기이지만 테스트를 위해 flush 해주어 바로 반응을 보도록 한다.
   producer.send(producerRecord);

   producer.flush();
   producer.close();
}

 

 

producer가 전송한 메시지를 다음과 같은 명령어로 확인할 수 있다. 

 

 

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic demo_java --from-beginning

 

 

 

 

여러 개의 메시지를 보내고 로그로 찍어보기 

 

   public static void main(String[] args) {
      log.info("I am a Kafka Producer!");

      // create Producer Properties
      Properties properties = new Properties();

      // connect to Localhost
             properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

      // connect to Conduktor Playground


      // set producer properties
      properties.setProperty("key.serializer", StringSerializer.class.getName());
      properties.setProperty("value.serializer", StringSerializer.class.getName());

      properties.setProperty("batch.size", "400");

      //        properties.setProperty("partitioner.class", RoundRobinPartitioner.class.getName());

      // create the Producer
      KafkaProducer<String, String> producer = new KafkaProducer<>(properties);


      for (int j=0; j<10; j++){

         for (int i=0; i<30; i++){

            // create a Producer Record
            ProducerRecord<String, String> producerRecord =
               new ProducerRecord<>("demo_java", "hello world " + i);

            // send data
            producer.send(producerRecord, new Callback() {
               @Override
               public void onCompletion(RecordMetadata metadata, Exception e) {
                  // executes every time a record successfully sent or an exception is thrown
                  if (e == null) {
                     // the record was successfully sent
                     log.info("Received new metadata \n" +
                        "Topic: " + metadata.topic() + "\n" +
                        "Partition: " + metadata.partition() + "\n" +
                        "Offset: " + metadata.offset() + "\n" +
                        "Timestamp: " + metadata.timestamp());
                  } else {
                     log.error("Error while producing", e);
                  }
               }
            });
         }

         try {
            Thread.sleep(500);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }

      }


      // tell the producer to send all data and block until done -- synchronous
      producer.flush();

      // flush and close the producer
      producer.close();
   }
}

 

 

라운드로빈 방식으로 파티션에 들어가는 것을 확인할 수 있다. 

 

 

 

같은 키가 같은 파티션으로 가는지를 확인해보는 예제 

 

public class ProducerDemoKeys {

    private static final Logger log = LoggerFactory.getLogger(ProducerDemoKeys.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Producer!");

        // create Producer Properties
        Properties properties = new Properties();

        // connect to Localhost
       properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

        // connect to Conduktor Playground

        // set producer properties
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // create the Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);


        for (int j=0; j<2; j++){

            for (int i=0; i<10; i++){

                String topic = "demo_java";
                String key = "id_" + i;
                String value = "hello world " + i;

                // create a Producer Record
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<>(topic, key, value);

                // send data
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // executes every time a record successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            log.info("Key: " + key + " | Partition: " + metadata.partition());
                        } else {
                            log.error("Error while producing", e);
                        }
                    }
                });
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }



        // tell the producer to send all data and block until done -- synchronous
        producer.flush();

        // flush and close the producer
        producer.close();
    }
}

 

 

 

 

 

 

 

 

 

consumer를 만들어 consume 해보기 

 

public class ConsumerDemo {

   private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());

   public static void main(String[] args) {
      log.info("I am a Kafka Consumer!");

      String groupId = "my-java-application";
      String topic = "demo_java";

      // create Producer Properties
      Properties properties = new Properties();

      // connect to Localhost
      properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

      // connect to Conduktor Playground

      // create consumer configs
      properties.setProperty("key.deserializer", StringDeserializer.class.getName());
      properties.setProperty("value.deserializer", StringDeserializer.class.getName());
      properties.setProperty("group.id", groupId);

      // none: consumer 그룹이 없으면 시작 하지 않음
      // latest: 방금 보낸 것을 보기 원할 때
      // earliest : 전체를 보기 원할 때
      properties.setProperty("auto.offset.reset", "earliest");

      // create a consumer
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

      // subscribe to a topic
      consumer.subscribe(Arrays.asList(topic));

      // poll for data
      while (true) {

         log.info("Polling");

         ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(1000));

         for (ConsumerRecord<String, String> record : records) {
            log.info("Key: " + record.key() + ", Value: " + record.value());
            log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
         }

      }

   }
}

 

 

 

더이상 consume 할 것이 없으면 무한 루프를 돈다. 

 

 

 

 

 

 

메인 스레드가 종료되면 consumer를 명시적으로 close 할 수 있게 해주기 

 

 

public class ConsumerDemoWithShutdown {

    private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWithShutdown.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer!");

        String groupId = "my-java-application";
        String topic = "demo_java";

        // create Producer Properties
        Properties properties = new Properties();

        // connect to Localhost
       properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

        // connect to Conduktor Playground

        // create consumer configs
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        properties.setProperty("group.id", groupId);
        properties.setProperty("auto.offset.reset", "earliest");

        // create a consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // get a reference to the main thread
        final Thread mainThread = Thread.currentThread();

        // adding the shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
                consumer.wakeup();


                // join the main thread to allow the execution of the code in the main thread
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });



        try {
            // subscribe to a topic
            consumer.subscribe(Arrays.asList(topic));
            // poll for data
            while (true) {

                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    log.info("Key: " + record.key() + ", Value: " + record.value());
                    log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                }

            }

        } catch (WakeupException e) {
            // wakeup 메서드를 통해 일부러 발생시킨 예외
            log.info("Consumer is starting to shut down");

        } catch (Exception e) {
            log.error("Unexpected exception in the consumer", e);
        } finally {
            consumer.close(); // close the consumer, this will also commit offsets
            log.info("The consumer is now gracefully shut down");
        }


    }
}

 

Runtime.getRuntime().addShutdownHook: 프로그램이 종료되기 전에 실행되는 셧다운 후크를 추가한다.

 

이 경우, Consumer가 graceful shutdown을 수행하기 위해 consumer.wakeup()을 호출하고, 메인 스레드의 실행이 완료될 때까지 대기한다.

 

WakeupException: Consumer의 wakeup() 메서드가 호출되어 Consumer가 종료되는 상황을 처리한다. 


Exception: 예상치 못한 예외가 발생한 경우를 처리한다.


consumer.close(): Consumer를 닫고 오프셋을 커밋하여 올바르게 종료한다.


로깅을 통해 Consumer의 graceful shutdown이 수행되었음을 표시한다.

 

 

반응형