From a65a61f4609fa4754d44698c320b72919edfbc41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=85=B4=E7=90=B3?= <1911390090@qq.com> Date: Fri, 18 Jul 2025 16:34:51 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/subscribe/KafkaTopicProducer.java | 74 +++++++++++-------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java index 417d447d..eae4044d 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java +++ b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java @@ -1,57 +1,71 @@ package com.zc.business.message.device.subscribe; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.*; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; - +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.Properties; import java.util.concurrent.ExecutionException; - /** - * + * Kafka 生产者组件,使用 Spring 管理为单例 */ @Slf4j -@Configuration +@Component // 改为 @Component,确保被 Spring 管理 public class KafkaTopicProducer { + @Value("${kafka.bootstrap-servers}") private String servers; + @Value("${kafka.producer.key-serializer}") private String keySerializer; + @Value("${kafka.producer.value-serializer}") private String valueSerializer; -/* @Value("${kafka.producer.topic}") - private String topic;*/ - public void KafkaTopicProducer(String event,String topic) { - // 配置生产者的属性 + + private KafkaProducer producer; + + /** + * 在 bean 初始化时创建 KafkaProducer 实例 + */ + @PostConstruct + public void init() { Properties props = new Properties(); - props.put("bootstrap.servers", servers); // Kafka broker地址 - props.put("key.serializer", keySerializer); - props.put("value.serializer",valueSerializer); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); - // 创建Kafka生产者实例 - KafkaProducer producer = new KafkaProducer<>(props); + // 可选配置 + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, "3"); - /* // 准备要发送的消息 - String topic = "eventAi"; - *//*String key = "key2";*//* - String value = "这是没有key的测试信息";*/ + this.producer = new KafkaProducer<>(props); + log.info("KafkaProducer初始化。"); + } + /** + * 发送消息的方法(线程安全) + */ + public void KafkaTopicProducer(String event, String topic) { try { - // 发送消息,并获取元数据(异步) - // ProducerRecord record = new ProducerRecord<>(topic, key, value); ProducerRecord record = new ProducerRecord<>(topic, event); RecordMetadata metadata = producer.send(record).get(); - // 数据信息 - // log.info(record.key(), record.value(), metadata.partition(), metadata.offset()); - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - } finally { - // 关闭生产者 + } catch (InterruptedException | ExecutionException e) { + log.error("发送消息给Kafka失败。", e); + Thread.currentThread().interrupt(); // 恢复中断状态 + } + } + + /** + * 在应用关闭时关闭 KafkaProducer + */ + @PreDestroy + public void destroy() { + if (producer != null) { producer.close(); + log.info("KafkaProducer关闭。"); } } -} +} \ No newline at end of file