From 93b797d3f4e0176238ace2eff976c97537f601c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=85=B4=E7=90=B3?= <1911390090@qq.com> Date: Mon, 24 Feb 2025 16:34:03 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=8E=A8=E9=80=81=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=EF=BC=88AI=EF=BC=89=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.yml | 7 +++ zc-business/pom.xml | 5 +- .../device/handler/DeviceMessageHandler.java | 16 ++++++ .../device/subscribe/KafkaTopicProducer.java | 55 +++++++++++++++++++ 4 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 551a37a7..52b5e91b 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -192,3 +192,10 @@ iot: # 允许访问的ip地址 allowed: ips: 10.0.111.11,10.168.73.54,10.168.71.194 + # Kafka配置 +kafka: + bootstrap-servers: 121.40.97.235:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + topic: eventAi diff --git a/zc-business/pom.xml b/zc-business/pom.xml index ce688960..e5adad4b 100644 --- a/zc-business/pom.xml +++ b/zc-business/pom.xml @@ -101,7 +101,10 @@ com.ruoyi ruoyi-quartz - + + org.springframework.kafka + spring-kafka + org.jsoup jsoup diff --git a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java index f9e8c0c6..4adb3332 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java +++ b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.gson.Gson; import com.ruoyi.common.core.redis.RedisCache; import com.zc.business.constant.RedisKeyConstants; import com.zc.business.domain.DcDevice; @@ -13,6 +14,7 @@ import com.zc.business.domain.DcMeteorologicalDetectorData; import com.zc.business.domain.DcWarning; import com.zc.business.domain.MdDeviceData; import com.zc.business.enums.*; +import com.zc.business.message.device.subscribe.KafkaTopicProducer; import com.zc.business.service.*; import com.zc.common.core.websocket.WebSocketService; import com.zc.common.core.websocket.constant.WebSocketEvent; @@ -62,6 +64,8 @@ public class DeviceMessageHandler { @Autowired private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; + @Resource + private KafkaTopicProducer kafkaTopicProducer; /** @@ -359,9 +363,20 @@ public class DeviceMessageHandler { int WarningLevel = data.getInteger("visibilityLevel"); if (WarningLevel != UniversalEnum.ZERO.getNumber()) { dcWarningService.insertDcWarning(dcWarning); + //kafka消息推送 + // 使用Gson将对象转换为JSON字符串 + Gson gson = new Gson(); + String jsonString = gson.toJson(dcWarning); + kafkaTopicProducer.KafkaTopicProducer(jsonString); } } else { dcWarningService.insertDcWarning(dcWarning); + //kafka消息推送 + // 使用Gson将对象转换为JSON字符串 + Gson gson = new Gson(); + String jsonString = gson.toJson(dcWarning); + kafkaTopicProducer.KafkaTopicProducer(jsonString); + } } @@ -727,4 +742,5 @@ public class DeviceMessageHandler { middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData); } + } diff --git a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java new file mode 100644 index 00000000..5827937d --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java @@ -0,0 +1,55 @@ +package com.zc.business.message.device.subscribe; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Value; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + + +/** + * + */ +@Slf4j +public class KafkaTopicProducer { + @Value("${kafka.bootstrap-servers}") + private String servers; + @Value("${kafka.producer.key-serializer}") + private String keySerializer; + @Value("${kafka.producer.value-serializer}") + private String valueSerializer; + @Value("${kafka.producer.topic}") + private String topic; + public void KafkaTopicProducer(String event) { + // 配置生产者的属性 + Properties props = new Properties(); + props.put("bootstrap.servers", servers); // Kafka broker地址 + props.put("key.serializer", keySerializer); + props.put("value.serializer",valueSerializer); + + // 创建Kafka生产者实例 + KafkaProducer producer = new KafkaProducer<>(props); + + /* // 准备要发送的消息 + String topic = "eventAi"; + *//*String key = "key2";*//* + String value = "这是没有key的测试信息";*/ + + try { + // 发送消息,并获取元数据(异步) + // ProducerRecord record = new ProducerRecord<>(topic, key, value); + ProducerRecord record = new ProducerRecord<>(topic, event); + RecordMetadata metadata = producer.send(record).get(); + // 数据信息 + log.info(record.key(), record.value(), metadata.partition(), metadata.offset()); + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } finally { + // 关闭生产者 + producer.close(); + } + } +}