Browse Source

代码优化

develop
xiepufeng 9 months ago
parent
commit
0afef0c2de
  1. 59
      zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
  2. 4
      zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java
  3. 2
      zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java
  4. 2
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java
  5. 55
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java
  6. 2
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java
  7. 2
      zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java
  8. 2
      zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java

59
zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java

@ -7,23 +7,24 @@ import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.core.redis.RedisCache;
import com.zc.business.constant.RedisKeyConstants; import com.zc.business.constant.RedisKeyConstants;
import com.zc.business.domain.DcDevice; import com.zc.business.domain.DcDevice;
import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.domain.DcWarning; import com.zc.business.domain.DcWarning;
import com.zc.business.domain.MdDeviceData;
import com.zc.business.enums.IotProductEnum; import com.zc.business.enums.IotProductEnum;
import com.zc.business.enums.WarningSourceEnum; import com.zc.business.enums.WarningSourceEnum;
import com.zc.business.enums.WarningStateEnum; import com.zc.business.enums.WarningStateEnum;
import com.zc.business.enums.WarningSubclassEnum; import com.zc.business.enums.WarningSubclassEnum;
import com.zc.business.service.DcTrafficSectionDataService; import com.zc.business.service.*;
import com.zc.business.service.IDcDeviceService;
import com.zc.business.service.IDcWarningService;
import com.zc.business.service.IMiddleDatabaseService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -53,6 +54,12 @@ public class DeviceMessageHandler {
@Resource @Resource
private DcTrafficSectionDataService dcTrafficSectionDataService; private DcTrafficSectionDataService dcTrafficSectionDataService;
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Autowired
private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService;
/** /**
* 更新设备状态 * 更新设备状态
* *
@ -102,6 +109,12 @@ public class DeviceMessageHandler {
// 一站式情况调查站 // 一站式情况调查站
if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)) { if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)) {
oneStopDeviceMessageHandle(data); oneStopDeviceMessageHandle(data);
return;
}
// 一站式情况调查站
if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)) {
weatherDetectorMessageHandle(data);
} }
} }
@ -319,4 +332,42 @@ public class DeviceMessageHandler {
dcTrafficSectionDataService.processRealtimeMessage(msg); dcTrafficSectionDataService.processRealtimeMessage(msg);
} }
/**
* 气象检测器消息处理入口
* @param msg 设备消息
*/
private void weatherDetectorMessageHandle(JSONObject msg) {
DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) msg.get("properties");
meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString());
meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData);
//设计院中间库 插入设备数据
MdDeviceData mdDeviceData = new MdDeviceData();
mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId());
mdDeviceData.setDevType("3");
mdDeviceData.setTimeStamp(new Date());
mdDeviceData.setCreatorUserId("自动存储");
Map<String,Object> expands = new HashMap<>();
expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量
expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速
expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向
expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度
expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度
expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压
expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑
expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态
expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度
expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度
expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度
expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度
expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况
mdDeviceData.setExpands(JSONObject.toJSONString(expands));
middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData);
}
} }

4
zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java

@ -1,7 +1,5 @@
package com.zc.business.message.device.listener; package com.zc.business.message.device.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zc.business.message.device.handler.DeviceMessageHandler; import com.zc.business.message.device.handler.DeviceMessageHandler;
import com.zc.common.core.redis.stream.RedisStream; import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -22,8 +20,6 @@ import javax.annotation.Resource;
@Component @Component
public class DeviceEventListener implements StreamListener<String, ObjectRecord<String, String>> public class DeviceEventListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;

2
zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java

@ -16,8 +16,6 @@ import org.springframework.stereotype.Component;
@Component @Component
public class DeviceFunctionReplyListener implements StreamListener<String, ObjectRecord<String, String>> public class DeviceFunctionReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;

2
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java

@ -16,8 +16,6 @@ import org.springframework.stereotype.Component;
@Component @Component
public class DevicePropertyReadReplyListener implements StreamListener<String, ObjectRecord<String, String>> public class DevicePropertyReadReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;

55
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java

@ -7,6 +7,7 @@ import com.zc.business.domain.DcDevice;
import com.zc.business.domain.DcMeteorologicalDetectorData; import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.domain.MdDeviceData; import com.zc.business.domain.MdDeviceData;
import com.zc.business.enums.IotProductEnum; import com.zc.business.enums.IotProductEnum;
import com.zc.business.message.device.handler.DeviceMessageHandler;
import com.zc.business.service.IDcMeteorologicalDetectorDataService; import com.zc.business.service.IDcMeteorologicalDetectorDataService;
import com.zc.business.service.IMiddleDatabaseService; import com.zc.business.service.IMiddleDatabaseService;
import com.zc.business.service.IDcYzsqbdcHmbldDataService; import com.zc.business.service.IDcYzsqbdcHmbldDataService;
@ -33,69 +34,21 @@ import java.util.Map;
@Component @Component
public class DevicePropertyReportListener implements StreamListener<String, ObjectRecord<String, String>> public class DevicePropertyReportListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;
@Resource @Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor; private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Autowired
private IMiddleDatabaseService middleDatabaseService;
@Autowired @Resource
private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService; private DeviceMessageHandler deviceMessageHandler;
@Override @Override
public void onMessage(ObjectRecord<String, String> message) { public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream(); String streamKay = message.getStream();
RecordId recordId = message.getId(); RecordId recordId = message.getId();
threadPoolTaskExecutor.execute(() -> deviceMessageHandler.handle(message.getValue()));
threadPoolTaskExecutor.execute(() -> {
Map data = JSON.parseObject(message.getValue(), HashMap.class);
Map<String,Object> headers = (Map<String, Object>) data.get("headers");
if (headers.get("productId") != null){
String productId = headers.get("productId").toString();
//气象检测器
if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)){
DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) data.get("properties");
meteorologicalDetectorData.setIotDeviceId(data.get("deviceId").toString());
meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData);
//设计院中间库 插入设备数据
MdDeviceData mdDeviceData = new MdDeviceData();
mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId());
mdDeviceData.setDevType("3");
mdDeviceData.setTimeStamp(new Date());
mdDeviceData.setCreatorUserId("自动存储");
Map<String,Object> expands = new HashMap<>();
expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量
expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速
expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向
expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度
expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度
expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压
expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑
expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态
expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度
expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度
expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度
expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度
expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况
mdDeviceData.setExpands(JSONObject.toJSONString(expands));
middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData);
} else if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)){
//交调
dcYzsqbdcHmbldDataService.addDcYzsqbdcHmbldDataList(data);
}
}
});
// 消费完后直接删除消息 // 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId)); redisStream.del(streamKay, String.valueOf(recordId));

2
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java

@ -16,8 +16,6 @@ import org.springframework.stereotype.Component;
@Component @Component
public class DevicePropertyWriteReplyListener implements StreamListener<String, ObjectRecord<String, String>> public class DevicePropertyWriteReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;

2
zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java

@ -23,8 +23,6 @@ import java.util.List;
@Component @Component
public class OfflineMessageListener implements StreamListener<String, ObjectRecord<String, String>> public class OfflineMessageListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;

2
zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java

@ -24,8 +24,6 @@ import java.util.List;
@Component @Component
public class OnlineMessageListener implements StreamListener<String, ObjectRecord<String, String>> public class OnlineMessageListener implements StreamListener<String, ObjectRecord<String, String>>
{ {
private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class);
@Autowired @Autowired
@Qualifier("iotRedisStream") @Qualifier("iotRedisStream")
private RedisStream redisStream; private RedisStream redisStream;

Loading…
Cancel
Save