brew를 통해 설치했고 kafka를 실행했다면 .sh 확장자 없이 다음과 같은 명령어로도 카프카 토픽 실행이 가능하다.
카프카 토픽
kafka-topics
자바 코드에서 다음과 같은 코드로 토픽을 생성할 수 있다.
private static void createTopics() {
String bootstrapServers = "127.0.0.1:9092";
// Kafka 관리자 클라이언트 설정
Properties adminProps = new Properties();
adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(adminProps);
// 생성할 토픽 설정
String topicName = "demo_java";
int numPartitions = 1;
short replicationFactor = 1;
// NewTopic 객체 생성
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
// 토픽 생성
adminClient.createTopics(Collections.singletonList(newTopic));
// AdminClient 종료
adminClient.close();
}
CLI에서 생성된 토픽 확인하기
producer를 만들어 메시지를 전송해보기
private static void createProducer() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 프로듀서로 문자열이 들어오면 직렬화 한다
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_java", "hello world");
// 전송하는 것은 비동기이지만 테스트를 위해 flush 해주어 바로 반응을 보도록 한다.
producer.send(producerRecord);
producer.flush();
producer.close();
}
producer가 전송한 메시지를 다음과 같은 명령어로 확인할 수 있다.
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic demo_java --from-beginning
여러 개의 메시지를 보내고 로그로 찍어보기
public static void main(String[] args) {
log.info("I am a Kafka Producer!");
// create Producer Properties
Properties properties = new Properties();
// connect to Localhost
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// connect to Conduktor Playground
// set producer properties
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
properties.setProperty("batch.size", "400");
// properties.setProperty("partitioner.class", RoundRobinPartitioner.class.getName());
// create the Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int j=0; j<10; j++){
for (int i=0; i<30; i++){
// create a Producer Record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "hello world " + i);
// send data
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// executes every time a record successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
log.info("Received new metadata \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// tell the producer to send all data and block until done -- synchronous
producer.flush();
// flush and close the producer
producer.close();
}
}
라운드로빈 방식으로 파티션에 들어가는 것을 확인할 수 있다.
같은 키가 같은 파티션으로 가는지를 확인해보는 예제
public class ProducerDemoKeys {
private static final Logger log = LoggerFactory.getLogger(ProducerDemoKeys.class.getSimpleName());
public static void main(String[] args) {
log.info("I am a Kafka Producer!");
// create Producer Properties
Properties properties = new Properties();
// connect to Localhost
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// connect to Conduktor Playground
// set producer properties
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
// create the Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int j=0; j<2; j++){
for (int i=0; i<10; i++){
String topic = "demo_java";
String key = "id_" + i;
String value = "hello world " + i;
// create a Producer Record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topic, key, value);
// send data
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// executes every time a record successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
log.info("Key: " + key + " | Partition: " + metadata.partition());
} else {
log.error("Error while producing", e);
}
}
});
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// tell the producer to send all data and block until done -- synchronous
producer.flush();
// flush and close the producer
producer.close();
}
}
consumer를 만들어 consume 해보기
public class ConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());
public static void main(String[] args) {
log.info("I am a Kafka Consumer!");
String groupId = "my-java-application";
String topic = "demo_java";
// create Producer Properties
Properties properties = new Properties();
// connect to Localhost
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// connect to Conduktor Playground
// create consumer configs
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
// none: consumer 그룹이 없으면 시작 하지 않음
// latest: 방금 보낸 것을 보기 원할 때
// earliest : 전체를 보기 원할 때
properties.setProperty("auto.offset.reset", "earliest");
// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// subscribe to a topic
consumer.subscribe(Arrays.asList(topic));
// poll for data
while (true) {
log.info("Polling");
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}
}
더이상 consume 할 것이 없으면 무한 루프를 돈다.
메인 스레드가 종료되면 consumer를 명시적으로 close 할 수 있게 해주기
public class ConsumerDemoWithShutdown {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWithShutdown.class.getSimpleName());
public static void main(String[] args) {
log.info("I am a Kafka Consumer!");
String groupId = "my-java-application";
String topic = "demo_java";
// create Producer Properties
Properties properties = new Properties();
// connect to Localhost
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// connect to Conduktor Playground
// create consumer configs
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// get a reference to the main thread
final Thread mainThread = Thread.currentThread();
// adding the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// subscribe to a topic
consumer.subscribe(Arrays.asList(topic));
// poll for data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
// wakeup 메서드를 통해 일부러 발생시킨 예외
log.info("Consumer is starting to shut down");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close(); // close the consumer, this will also commit offsets
log.info("The consumer is now gracefully shut down");
}
}
}
Runtime.getRuntime().addShutdownHook: 프로그램이 종료되기 전에 실행되는 셧다운 후크를 추가한다.
이 경우, Consumer가 graceful shutdown을 수행하기 위해 consumer.wakeup()을 호출하고, 메인 스레드의 실행이 완료될 때까지 대기한다.
WakeupException: Consumer의 wakeup() 메서드가 호출되어 Consumer가 종료되는 상황을 처리한다.
Exception: 예상치 못한 예외가 발생한 경우를 처리한다.
consumer.close(): Consumer를 닫고 오프셋을 커밋하여 올바르게 종료한다.
로깅을 통해 Consumer의 graceful shutdown이 수행되었음을 표시한다.
'Programming > Redis, Kafka' 카테고리의 다른 글
ubuntu 환경에서 redis, master-slave, sentinel 설치 (0) | 2023.05.31 |
---|---|
Redis의 특징, 개념, 장점, 단점, 목적 (0) | 2023.05.31 |
이커머스에서 redis, kafka usecase (0) | 2023.05.31 |
맥 homebrew 를 통해 설치한 kafka 실행하기 (0) | 2023.05.30 |
레디스redis 자료구조 예시 코드 (Geospatial 이용 거리 찾기 예제 포함) (0) | 2023.05.29 |