From e5318b5bfa696d88d114c3c3cdbe4679591deaee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=85=B4=E7=90=B3?= <1911390090@qq.com> Date: Mon, 24 Mar 2025 14:18:17 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=8E=A8=E9=80=81=E6=89=AB=E7=A0=81?= =?UTF-8?q?=E6=8A=A5=E8=AD=A6=EF=BC=8C=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81?= =?UTF-8?q?=EF=BC=8C=E6=B0=94=E8=B1=A1=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CodeScanningAlarmController.java | 10 ++++++++ .../java/com/zc/business/enums/KafkEnum.java | 24 ++++++++++++++++++ .../device/handler/DeviceMessageHandler.java | 25 +++++++++++++------ .../device/subscribe/KafkaTopicProducer.java | 6 ++--- 4 files changed, 54 insertions(+), 11 deletions(-) create mode 100644 zc-business/src/main/java/com/zc/business/enums/KafkEnum.java diff --git a/zc-business/src/main/java/com/zc/business/controller/CodeScanningAlarmController.java b/zc-business/src/main/java/com/zc/business/controller/CodeScanningAlarmController.java index 27962b4b..e8ad62b0 100644 --- a/zc-business/src/main/java/com/zc/business/controller/CodeScanningAlarmController.java +++ b/zc-business/src/main/java/com/zc/business/controller/CodeScanningAlarmController.java @@ -2,13 +2,17 @@ package com.zc.business.controller; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.google.gson.Gson; import com.ruoyi.common.core.controller.BaseController; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.utils.StringUtils; +import com.ruoyi.common.utils.spring.SpringUtils; import com.zc.business.domain.DcEventProcess; import com.zc.business.domain.DcWarning; +import com.zc.business.enums.KafkEnum; import com.zc.business.enums.UniversalEnum; import com.zc.business.mapper.DcEventProcessMapper; +import com.zc.business.message.device.subscribe.KafkaTopicProducer; import com.zc.business.service.IDcWarningService; import com.zc.common.core.websocket.WebSocketService; import com.zc.common.core.websocket.constant.WebSocketEvent; @@ -98,6 +102,12 @@ public class CodeScanningAlarmController extends BaseController { dcWarning.setRemark(simpleDateFormat.format(dcWarning.getWarningTime()) + warningTitle); dcWarning.setWarningSource(UniversalEnum.FIVE.getNumber()); dcWarning.setWarningState(UniversalEnum.ONE.getNumber()); + //todo 推送kafak扫码报警事件 + Gson gson = new Gson(); + String jsonString = gson.toJson(dcWarning); + KafkaTopicProducer kafkaTopicProducer = SpringUtils.getBean(KafkaTopicProducer.class); + kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.scanAlarm.value()); + return AjaxResult.success(dcWarningService.insertDcWarning(dcWarning)); } else { dcWarning.setId(oldData.get("id").toString()); diff --git a/zc-business/src/main/java/com/zc/business/enums/KafkEnum.java b/zc-business/src/main/java/com/zc/business/enums/KafkEnum.java new file mode 100644 index 00000000..49481bce --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/enums/KafkEnum.java @@ -0,0 +1,24 @@ +package com.zc.business.enums; + +/** + * + */ +public enum KafkEnum { +//气象数据 + weather("weatherData"), + // 扫码报警 + scanAlarm("scanCodeData"), + //设备状态 + deviceStatus("deviceState"), + // 视频AI + videoAI("eventAi"); + private final String value; + + KafkEnum(String value) { + this.value = value; + } + + public String value() { + return this.value; + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java index ce7a5f41..d23e9c69 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java +++ b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java @@ -1,14 +1,11 @@ package com.zc.business.message.device.handler; -import cn.hutool.core.date.DateUtil; -import cn.hutool.core.thread.NamedThreadFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.gson.Gson; import com.ruoyi.common.core.redis.RedisCache; -import com.ruoyi.common.utils.SecurityUtils; import com.ruoyi.common.utils.spring.SpringUtils; import com.zc.business.constant.RedisKeyConstants; import com.zc.business.domain.DcDevice; @@ -20,7 +17,6 @@ import com.zc.business.message.device.subscribe.KafkaTopicProducer; import com.zc.business.service.*; import com.zc.common.core.websocket.WebSocketService; import com.zc.common.core.websocket.constant.WebSocketEvent; -import org.apache.catalina.security.SecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -68,6 +64,7 @@ public class DeviceMessageHandler { @Autowired private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; + KafkaTopicProducer kafkaTopicProducer = SpringUtils.getBean(KafkaTopicProducer.class); /** @@ -96,7 +93,13 @@ public class DeviceMessageHandler { }); WebSocketService.broadcast(WebSocketEvent.DEVICE_STATE, dcDevices); //推送设备状态更新 - // 批量更新中间库设备状态 + //kafka消息推送 + //state 设备状态 0 异常 1 正常 + Gson gson = new Gson(); + String jsonString = gson.toJson(dcDevices); + kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.deviceStatus.value()); + + // 批量更新中间库设备状态 middleDatabaseService.updateMiddleDatabaseDeviceByList(dcDevices); } @@ -361,7 +364,6 @@ public class DeviceMessageHandler { combinedData.put("address", otherConfig);*/ dcWarning.setOtherConfig(otherConfig.toString()); //异常天气等级 过滤 - KafkaTopicProducer kafkaTopicProducer = SpringUtils.getBean(KafkaTopicProducer.class); if (data.getInteger("warningType") == VISIBILITY_LEVEL) { int WarningLevel = data.getInteger("visibilityLevel"); if (WarningLevel != UniversalEnum.ZERO.getNumber()) { @@ -370,7 +372,7 @@ public class DeviceMessageHandler { // 使用Gson将对象转换为JSON字符串 Gson gson = new Gson(); String jsonString = gson.toJson(dcWarning); - kafkaTopicProducer.KafkaTopicProducer(jsonString); + kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.videoAI.value()); } } else { dcWarningService.insertDcWarning(dcWarning); @@ -378,7 +380,7 @@ public class DeviceMessageHandler { // 使用Gson将对象转换为JSON字符串 Gson gson = new Gson(); String jsonString = gson.toJson(dcWarning); - kafkaTopicProducer.KafkaTopicProducer(jsonString); + kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.videoAI.value()); } @@ -716,6 +718,13 @@ public class DeviceMessageHandler { meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString()); meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); + //todo 推送kafak事件消息 气象检测器 + //kafka消息推送 + // 使用Gson将对象转换为JSON字符串 + Gson gson = new Gson(); + String jsonString = gson.toJson(meteorologicalDetectorData); + kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.weather.value()); + //设计院中间库 插入设备数据 MdDeviceData mdDeviceData = new MdDeviceData(); mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId()); diff --git a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java index f0eddb7c..bd2b0351 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java +++ b/zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java @@ -23,9 +23,9 @@ public class KafkaTopicProducer { private String keySerializer; @Value("${kafka.producer.value-serializer}") private String valueSerializer; - @Value("${kafka.producer.topic}") - private String topic; - public void KafkaTopicProducer(String event) { +/* @Value("${kafka.producer.topic}") + private String topic;*/ + public void KafkaTopicProducer(String event,String topic) { // 配置生产者的属性 Properties props = new Properties(); props.put("bootstrap.servers", servers); // Kafka broker地址