diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml
index 551a37a7..52b5e91b 100644
--- a/ruoyi-admin/src/main/resources/application.yml
+++ b/ruoyi-admin/src/main/resources/application.yml
@@ -192,3 +192,10 @@ iot:
# 允许访问的ip地址
allowed:
ips: 10.0.111.11,10.168.73.54,10.168.71.194
+ # Kafka配置
+kafka:
+ bootstrap-servers: 121.40.97.235:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ topic: eventAi
diff --git a/zc-business/pom.xml b/zc-business/pom.xml
index ce688960..e5adad4b 100644
--- a/zc-business/pom.xml
+++ b/zc-business/pom.xml
@@ -101,7 +101,10 @@
com.ruoyi
ruoyi-quartz
-
+
+ org.springframework.kafka
+ spring-kafka
+
org.jsoup
jsoup
diff --git a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
index f9e8c0c6..4adb3332 100644
--- a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
+++ b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.google.gson.Gson;
import com.ruoyi.common.core.redis.RedisCache;
import com.zc.business.constant.RedisKeyConstants;
import com.zc.business.domain.DcDevice;
@@ -13,6 +14,7 @@ import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.domain.DcWarning;
import com.zc.business.domain.MdDeviceData;
import com.zc.business.enums.*;
+import com.zc.business.message.device.subscribe.KafkaTopicProducer;
import com.zc.business.service.*;
import com.zc.common.core.websocket.WebSocketService;
import com.zc.common.core.websocket.constant.WebSocketEvent;
@@ -62,6 +64,8 @@ public class DeviceMessageHandler {
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
+ @Resource
+ private KafkaTopicProducer kafkaTopicProducer;
/**
@@ -359,9 +363,20 @@ public class DeviceMessageHandler {
int WarningLevel = data.getInteger("visibilityLevel");
if (WarningLevel != UniversalEnum.ZERO.getNumber()) {
dcWarningService.insertDcWarning(dcWarning);
+ //kafka消息推送
+ // 使用Gson将对象转换为JSON字符串
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(dcWarning);
+ kafkaTopicProducer.KafkaTopicProducer(jsonString);
}
} else {
dcWarningService.insertDcWarning(dcWarning);
+ //kafka消息推送
+ // 使用Gson将对象转换为JSON字符串
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(dcWarning);
+ kafkaTopicProducer.KafkaTopicProducer(jsonString);
+
}
}
@@ -727,4 +742,5 @@ public class DeviceMessageHandler {
middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData);
}
+
}
diff --git a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java
new file mode 100644
index 00000000..5827937d
--- /dev/null
+++ b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java
@@ -0,0 +1,55 @@
+package com.zc.business.message.device.subscribe;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.beans.factory.annotation.Value;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ *
+ */
+@Slf4j
+public class KafkaTopicProducer {
+ @Value("${kafka.bootstrap-servers}")
+ private String servers;
+ @Value("${kafka.producer.key-serializer}")
+ private String keySerializer;
+ @Value("${kafka.producer.value-serializer}")
+ private String valueSerializer;
+ @Value("${kafka.producer.topic}")
+ private String topic;
+ public void KafkaTopicProducer(String event) {
+ // 配置生产者的属性
+ Properties props = new Properties();
+ props.put("bootstrap.servers", servers); // Kafka broker地址
+ props.put("key.serializer", keySerializer);
+ props.put("value.serializer",valueSerializer);
+
+ // 创建Kafka生产者实例
+ KafkaProducer producer = new KafkaProducer<>(props);
+
+ /* // 准备要发送的消息
+ String topic = "eventAi";
+ *//*String key = "key2";*//*
+ String value = "这是没有key的测试信息";*/
+
+ try {
+ // 发送消息,并获取元数据(异步)
+ // ProducerRecord record = new ProducerRecord<>(topic, key, value);
+ ProducerRecord record = new ProducerRecord<>(topic, event);
+ RecordMetadata metadata = producer.send(record).get();
+ // 数据信息
+ log.info(record.key(), record.value(), metadata.partition(), metadata.offset());
+ } catch (ExecutionException | InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ // 关闭生产者
+ producer.close();
+ }
+ }
+}