Browse Source

kafka优化

develop
王兴琳 15 hours ago
parent
commit
a65a61f460
  1. 74
      zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java

74
zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java

@ -1,57 +1,71 @@
package com.zc.business.message.device.subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
*
* Kafka 生产者组件使用 Spring 管理为单例
*/
@Slf4j
@Configuration
@Component // 改为 @Component,确保被 Spring 管理
public class KafkaTopicProducer {
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.producer.key-serializer}")
private String keySerializer;
@Value("${kafka.producer.value-serializer}")
private String valueSerializer;
/* @Value("${kafka.producer.topic}")
private String topic;*/
public void KafkaTopicProducer(String event,String topic) {
// 配置生产者的属性
private KafkaProducer<String, String> producer;
/**
* bean 初始化时创建 KafkaProducer 实例
*/
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", servers); // Kafka broker地址
props.put("key.serializer", keySerializer);
props.put("value.serializer",valueSerializer);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
// 创建Kafka生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 可选配置
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
/* // 准备要发送的消息
String topic = "eventAi";
*//*String key = "key2";*//*
String value = "这是没有key的测试信息";*/
this.producer = new KafkaProducer<>(props);
log.info("KafkaProducer初始化。");
}
/**
* 发送消息的方法线程安全
*/
public void KafkaTopicProducer(String event, String topic) {
try {
// 发送消息,并获取元数据(异步)
// ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, event);
RecordMetadata metadata = producer.send(record).get();
// 数据信息
// log.info(record.key(), record.value(), metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭生产者
} catch (InterruptedException | ExecutionException e) {
log.error("发送消息给Kafka失败。", e);
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
/**
* 在应用关闭时关闭 KafkaProducer
*/
@PreDestroy
public void destroy() {
if (producer != null) {
producer.close();
log.info("KafkaProducer关闭。");
}
}
}
}
Loading…
Cancel
Save