From 0afef0c2de1ad75f7368636766f6756f12ad68b1 Mon Sep 17 00:00:00 2001 From: xiepufeng <1072271977@qq.com> Date: Fri, 15 Mar 2024 18:52:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/handler/DeviceMessageHandler.java | 59 +++++++++++++++++-- .../device/listener/DeviceEventListener.java | 4 -- .../listener/DeviceFunctionReplyListener.java | 2 - .../DevicePropertyReadReplyListener.java | 2 - .../DevicePropertyReportListener.java | 55 ++--------------- .../DevicePropertyWriteReplyListener.java | 2 - .../listener/OfflineMessageListener.java | 2 - .../listener/OnlineMessageListener.java | 2 - 8 files changed, 59 insertions(+), 69 deletions(-) diff --git a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java index 99239c90..eec32d66 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java +++ b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java @@ -7,23 +7,24 @@ import com.alibaba.fastjson.JSONObject; import com.ruoyi.common.core.redis.RedisCache; import com.zc.business.constant.RedisKeyConstants; import com.zc.business.domain.DcDevice; +import com.zc.business.domain.DcMeteorologicalDetectorData; import com.zc.business.domain.DcWarning; +import com.zc.business.domain.MdDeviceData; import com.zc.business.enums.IotProductEnum; import com.zc.business.enums.WarningSourceEnum; import com.zc.business.enums.WarningStateEnum; import com.zc.business.enums.WarningSubclassEnum; -import com.zc.business.service.DcTrafficSectionDataService; -import com.zc.business.service.IDcDeviceService; -import com.zc.business.service.IDcWarningService; -import com.zc.business.service.IMiddleDatabaseService; +import com.zc.business.service.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -53,6 +54,12 @@ public class DeviceMessageHandler { @Resource private DcTrafficSectionDataService dcTrafficSectionDataService; + @Autowired + private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; + + @Autowired + private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService; + /** * 更新设备状态 * @@ -102,6 +109,12 @@ public class DeviceMessageHandler { // 一站式情况调查站 if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)) { oneStopDeviceMessageHandle(data); + return; + } + + // 一站式情况调查站 + if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)) { + weatherDetectorMessageHandle(data); } } @@ -319,4 +332,42 @@ public class DeviceMessageHandler { dcTrafficSectionDataService.processRealtimeMessage(msg); } + + /** + * 气象检测器消息处理入口 + * @param msg 设备消息 + */ + private void weatherDetectorMessageHandle(JSONObject msg) { + + DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) msg.get("properties"); + meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString()); + meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); + + //设计院中间库 插入设备数据 + MdDeviceData mdDeviceData = new MdDeviceData(); + mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId()); + mdDeviceData.setDevType("3"); + mdDeviceData.setTimeStamp(new Date()); + mdDeviceData.setCreatorUserId("自动存储"); + + Map expands = new HashMap<>(); + expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量 + expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速 + expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向 + expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度 + expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度 + expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压 + expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑 + expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态 + expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度 + expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度 + expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度 + expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度 + expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况 + + mdDeviceData.setExpands(JSONObject.toJSONString(expands)); + + middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData); + } + } diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java index 26202f28..214489bd 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java @@ -1,7 +1,5 @@ package com.zc.business.message.device.listener; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.zc.business.message.device.handler.DeviceMessageHandler; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; @@ -22,8 +20,6 @@ import javax.annotation.Resource; @Component public class DeviceEventListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class); - @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream; diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java index d65eb9c3..c8b4b8fd 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java @@ -16,8 +16,6 @@ import org.springframework.stereotype.Component; @Component public class DeviceFunctionReplyListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class); - @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream; diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java index 2f10b1e7..fc411370 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java @@ -16,8 +16,6 @@ import org.springframework.stereotype.Component; @Component public class DevicePropertyReadReplyListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class); - @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream; diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java index f0b66d56..24dca16a 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java @@ -7,6 +7,7 @@ import com.zc.business.domain.DcDevice; import com.zc.business.domain.DcMeteorologicalDetectorData; import com.zc.business.domain.MdDeviceData; import com.zc.business.enums.IotProductEnum; +import com.zc.business.message.device.handler.DeviceMessageHandler; import com.zc.business.service.IDcMeteorologicalDetectorDataService; import com.zc.business.service.IMiddleDatabaseService; import com.zc.business.service.IDcYzsqbdcHmbldDataService; @@ -33,69 +34,21 @@ import java.util.Map; @Component public class DevicePropertyReportListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class); @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream; @Resource private ThreadPoolTaskExecutor threadPoolTaskExecutor; - @Autowired - private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; - @Autowired - private IMiddleDatabaseService middleDatabaseService; - @Autowired - private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService; + @Resource + private DeviceMessageHandler deviceMessageHandler; @Override public void onMessage(ObjectRecord message) { String streamKay = message.getStream(); RecordId recordId = message.getId(); - - threadPoolTaskExecutor.execute(() -> { - Map data = JSON.parseObject(message.getValue(), HashMap.class); - Map headers = (Map) data.get("headers"); - if (headers.get("productId") != null){ - String productId = headers.get("productId").toString(); - //气象检测器 - if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)){ - DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) data.get("properties"); - meteorologicalDetectorData.setIotDeviceId(data.get("deviceId").toString()); - meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); - - //设计院中间库 插入设备数据 - MdDeviceData mdDeviceData = new MdDeviceData(); - mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId()); - mdDeviceData.setDevType("3"); - mdDeviceData.setTimeStamp(new Date()); - mdDeviceData.setCreatorUserId("自动存储"); - - Map expands = new HashMap<>(); - expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量 - expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速 - expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向 - expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度 - expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度 - expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压 - expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑 - expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态 - expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度 - expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度 - expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度 - expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度 - expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况 - - mdDeviceData.setExpands(JSONObject.toJSONString(expands)); - - middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData); - } else if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)){ - //交调 - dcYzsqbdcHmbldDataService.addDcYzsqbdcHmbldDataList(data); - - } - } - }); + threadPoolTaskExecutor.execute(() -> deviceMessageHandler.handle(message.getValue())); // 消费完后直接删除消息 redisStream.del(streamKay, String.valueOf(recordId)); diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java index 4438f61b..2c1c95c0 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java @@ -16,8 +16,6 @@ import org.springframework.stereotype.Component; @Component public class DevicePropertyWriteReplyListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class); - @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream; diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java index e9add800..a281364d 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java @@ -23,8 +23,6 @@ import java.util.List; @Component public class OfflineMessageListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class); - @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream; diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java index c21d1b19..b4978665 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java @@ -24,8 +24,6 @@ import java.util.List; @Component public class OnlineMessageListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class); - @Autowired @Qualifier("iotRedisStream") private RedisStream redisStream;