diff --git a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java index 417d447d..eae4044d 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java +++ b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java @@ -1,57 +1,71 @@ package com.zc.business.message.device.subscribe; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.*; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; - +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.Properties; import java.util.concurrent.ExecutionException; - /** - * + * Kafka 生产者组件,使用 Spring 管理为单例 */ @Slf4j -@Configuration +@Component // 改为 @Component,确保被 Spring 管理 public class KafkaTopicProducer { + @Value("${kafka.bootstrap-servers}") private String servers; + @Value("${kafka.producer.key-serializer}") private String keySerializer; + @Value("${kafka.producer.value-serializer}") private String valueSerializer; -/* @Value("${kafka.producer.topic}") - private String topic;*/ - public void KafkaTopicProducer(String event,String topic) { - // 配置生产者的属性 + + private KafkaProducer producer; + + /** + * 在 bean 初始化时创建 KafkaProducer 实例 + */ + @PostConstruct + public void init() { Properties props = new Properties(); - props.put("bootstrap.servers", servers); // Kafka broker地址 - props.put("key.serializer", keySerializer); - props.put("value.serializer",valueSerializer); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); - // 创建Kafka生产者实例 - KafkaProducer producer = new KafkaProducer<>(props); + // 可选配置 + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, "3"); - /* // 准备要发送的消息 - String topic = "eventAi"; - *//*String key = "key2";*//* - String value = "这是没有key的测试信息";*/ + this.producer = new KafkaProducer<>(props); + log.info("KafkaProducer初始化。"); + } + /** + * 发送消息的方法(线程安全) + */ + public void KafkaTopicProducer(String event, String topic) { try { - // 发送消息,并获取元数据(异步) - // ProducerRecord record = new ProducerRecord<>(topic, key, value); ProducerRecord record = new ProducerRecord<>(topic, event); RecordMetadata metadata = producer.send(record).get(); - // 数据信息 - // log.info(record.key(), record.value(), metadata.partition(), metadata.offset()); - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - } finally { - // 关闭生产者 + } catch (InterruptedException | ExecutionException e) { + log.error("发送消息给Kafka失败。", e); + Thread.currentThread().interrupt(); // 恢复中断状态 + } + } + + /** + * 在应用关闭时关闭 KafkaProducer + */ + @PreDestroy + public void destroy() { + if (producer != null) { producer.close(); + log.info("KafkaProducer关闭。"); } } -} +} \ No newline at end of file