Browse Source

Merge remote-tracking branch 'origin/develop' into develop

develop
zhaoxianglong 11 months ago
parent
commit
35b8c92e38
  1. 5
      zc-business/src/main/java/com/zc/business/constant/DeviceFunctionIdConstants.java
  2. 154
      zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java
  3. 49
      zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java
  4. 22
      zc-business/src/main/java/com/zc/business/mapper/DcTrafficSectionDataMapper.java
  5. 92
      zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
  6. 6
      zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java
  7. 4
      zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java
  8. 4
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java
  9. 57
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java
  10. 4
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java
  11. 9
      zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java
  12. 4
      zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java
  13. 15
      zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java
  14. 431
      zc-business/src/main/java/com/zc/business/service/impl/DcEmergencyPlansServiceImpl.java
  15. 166
      zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java
  16. 52
      zc-business/src/main/java/com/zc/business/statistics/cache/AbstractTrafficStatisticsCache.java
  17. 137
      zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java
  18. 137
      zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java
  19. 142
      zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java
  20. 141
      zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java
  21. 70
      zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java
  22. 44
      zc-business/src/main/resources/mapper/business/DcTrafficSectionDataMapper.xml

5
zc-business/src/main/java/com/zc/business/constant/DeviceFunctionIdConstants.java

@ -28,4 +28,9 @@ public class DeviceFunctionIdConstants {
* 可变信息标志 1B 功能码
*/
public static final String VARIABLE_INFORMATION_FLAG_1B = "1B";
/**
* 激光疲劳唤醒 SETTM 功能码
*/
public static final String VARIABLE_INFORMATION_FLAG_SETTM = "SETTM";
}

154
zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java

@ -0,0 +1,154 @@
package com.zc.business.domain;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import lombok.Data;
import org.apache.commons.codec.digest.DigestUtils;
import java.util.Date;
import java.util.Objects;
/**
* 交通断面数据统计定义
* @author xiepufeng
*/
@Data
public class DcTrafficSectionData {
/**
* 主键
*/
@TableId
private String id;
/**
* 车流量
*/
private Integer trafficVolume;
/**
* 平均速度
*/
private Integer averageSpeed;
/**
* 所属设备
*/
private Long deviceId;
/**
* 统计时间
*/
private Date statisticalDate;
/**
* 道路方向
*/
private Byte direction;
/**
* 时段类型
* 1- 2- 3- 4-
*/
private Byte periodType;
/**
* 所在桩号
*/
private Integer stakeMark;
/** 创建时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date createTime;
/** 更新时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date updateTime;
/**
* 设置交通数据的统计周期类型
* @param periodType 统计周期类型的枚举值
*/
public void setPeriodType(TrafficDataPeriodTypeEnum periodType) {
this.periodType = periodType.getCode(); // 将枚举类型转换为代码值存储
}
/**
* 重写equals方法用于比较两个对象是否相等
* @param o 要与当前对象比较的对象
* @return 如果两个对象相等则返回true否则返回false
*/
@Override
public boolean equals(Object o) {
if (this == o) return true; // 同一对象比较,直接返回true
if (o == null || getClass() != o.getClass()) return false; // 对象为空或类型不同,返回false
DcTrafficSectionData that = (DcTrafficSectionData) o; // 类型转换
return Objects.equals(deviceId, that.deviceId) &&
Objects.equals(statisticalDate, that.statisticalDate) &&
Objects.equals(direction, that.direction) &&
Objects.equals(periodType, that.periodType) &&
Objects.equals(stakeMark, that.stakeMark); // 比较各属性值
}
/**
* 重写hashCode方法基于对象的属性生成哈希码
* @return 对象的哈希码值
*/
@Override
public int hashCode() {
return Objects.hash(deviceId, statisticalDate, direction, periodType, stakeMark);
}
/**
* 设置统计日期根据不同的统计周期类型来调整日期使其对应周期的起始日期
* @param statisticalDate 统计日期原始日期
*/
public void setStatisticalDate(Date statisticalDate) {
TrafficDataPeriodTypeEnum typeEnum = TrafficDataPeriodTypeEnum.valueOfCode(periodType);
setStatisticalDate(statisticalDate, typeEnum);
}
/**
* 根据给定的统计周期类型和日期设置统计日期为相应周期的起始日期
* @param statisticalDate 原始统计日期
* @param typeEnum 统计周期类型
*/
public void setStatisticalDate(Date statisticalDate, TrafficDataPeriodTypeEnum typeEnum) {
switch (typeEnum) {
case DAY:
// 设置为当天的起始时间
this.statisticalDate = DateUtil.beginOfDay(statisticalDate);
break;
case MONTH:
// 设置为当月的起始日期
this.statisticalDate = DateUtil.beginOfMonth(statisticalDate);
break;
case QUARTER:
// 设置为当季度的起始日期
this.statisticalDate = DateUtil.beginOfQuarter(statisticalDate);
break;
case YEAR:
// 设置为当年的起始日期
this.statisticalDate = DateUtil.beginOfYear(statisticalDate);
break;
default:
// 如果不是预定义的周期类型,则不做任何处理
this.statisticalDate = statisticalDate;
}
}
/**
* 根据设备ID统计时间方向时段类型桩号生成一个唯一ID
*/
public void generateUniqueId() {
String combinedAttributes = deviceId + "_" + DateUtil.format(statisticalDate, "yyyyMMdd_HHmmss") + "_" + direction + "_" + periodType + "_" + stakeMark;
this.id = DigestUtils.md5Hex(combinedAttributes);
}
}

49
zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java

@ -0,0 +1,49 @@
package com.zc.business.enums;
import lombok.Getter;
/**
* 定义一个枚举类型 TrafficDataPeriodTypeEnum用于表示周期类型
*/
@Getter
public enum TrafficDataPeriodTypeEnum {
// 枚举成员:年,关联字节型代码 1 和描述 "年"
YEAR((byte) 1, "年"),
// 枚举成员:月,关联字节型代码 2 和描述 "月"
MONTH((byte) 2, "月"),
// 枚举成员:季,关联字节型代码 3 和描述 "季"
QUARTER((byte) 3, "季"),
// 枚举成员:日,关联字节型代码 4 和描述 "日"
DAY((byte) 4, "日");
// 每个枚举成员的属性:代码,存储对应的字节型数值
private final Byte code;
// 每个枚举成员的属性:描述,存储该周期类型的中文描述
private final String description;
// 构造方法,初始化每个枚举成员的代码和描述信息
TrafficDataPeriodTypeEnum(Byte code, String description) {
this.code = code;
this.description = description;
}
// 反向查找方法,根据给定的字节型代码查找对应的枚举值
// 如果找到匹配项,则返回该枚举类型;否则抛出IllegalArgumentException异常
public static TrafficDataPeriodTypeEnum valueOfCode(Byte code) {
// 遍历所有PeriodTypeEnum枚举值
for (TrafficDataPeriodTypeEnum type : values()) {
// 如果当前枚举类型的code与输入的code相等
if (type.getCode().equals(code)) {
// 返回找到的枚举类型
return type;
}
}
// 如果循环结束都没有找到匹配的code,则抛出异常,说明提供的code非法
throw new IllegalArgumentException("无效的周期类型代码: " + code);
}
}

22
zc-business/src/main/java/com/zc/business/mapper/DcTrafficSectionDataMapper.java

@ -0,0 +1,22 @@
package com.zc.business.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zc.business.domain.DcTrafficSectionData;
import org.apache.ibatis.annotations.Mapper;
/**
* 交通断面数据统计Mapper接口
*
* @author xiepufeng
*/
@Mapper
public interface DcTrafficSectionDataMapper extends BaseMapper<DcTrafficSectionData> {
/**
* 插入或更新交通路段数据
*
* @param trafficSectionData 交通路段数据对象包含需要插入或更新的数据
* @return 返回一个布尔值表示操作是否成功true表示插入或更新成功false表示失败
*/
boolean insertOrUpdate(DcTrafficSectionData trafficSectionData);
}

92
zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java

@ -1,32 +1,30 @@
package com.zc.business.message.device.handler;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.redis.RedisCache;
import com.zc.business.constant.RedisKeyConstants;
import com.zc.business.domain.DcDevice;
import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.domain.DcWarning;
import com.zc.business.domain.MdDeviceData;
import com.zc.business.enums.IotProductEnum;
import com.zc.business.enums.WarningSourceEnum;
import com.zc.business.enums.WarningStateEnum;
import com.zc.business.enums.WarningSubclassEnum;
import com.zc.business.service.IDcDeviceService;
import com.zc.business.service.IDcWarningService;
import com.zc.business.service.IMiddleDatabaseService;
import com.zc.business.service.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@ -53,8 +51,14 @@ public class DeviceMessageHandler {
@Resource
private RedisCache redisCache;
@Value("${iot.address}")
private String iotAddress;
@Resource
private DcTrafficSectionDataService dcTrafficSectionDataService;
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Autowired
private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService;
/**
* 更新设备状态
@ -99,6 +103,18 @@ public class DeviceMessageHandler {
// 摄像头检测事件
if (IotProductEnum.CAMERA_DETECTION_EVENT.value().equals(productId)) {
this.cameraDetectionEventHandle(data);
return;
}
// 一站式情况调查站
if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)) {
oneStopDeviceMessageHandle(data);
return;
}
// 一站式情况调查站
if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)) {
weatherDetectorMessageHandle(data);
}
}
@ -159,7 +175,7 @@ public class DeviceMessageHandler {
String title = stakeMarkDescription + WarningSubclassEnum.getDecorateInfo(warningSubclass);
// 标题
dcWarning.setWarningTitle(title);
dcWarning.setRemark(convertTimestampToString(captureTime) + " " + title);
dcWarning.setRemark(DateUtil.formatDateTime(DateUtil.date(captureTime)) + " " + title);
// 影响车道
dcWarning.setLane(String.valueOf(data.getInteger("relatedLaneNo")));
// 物联设备id
@ -309,23 +325,49 @@ public class DeviceMessageHandler {
}
/**
* 将毫秒级时间戳转换为"yyyy-MM-dd HH:mm:ss"格式的字符串
*
* @param timestampInMillis 毫秒级时间戳
* @return 格式化后的日期时间字符串
* 一站式情况调查站设备消息处理入口
* @param msg 设备消息
*/
public static String convertTimestampToString(long timestampInMillis) {
// 转换为Instant对象
Instant instant = Instant.ofEpochMilli(timestampInMillis);
// 转换为LocalDateTime(默认时区)
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
private void oneStopDeviceMessageHandle(JSONObject msg) {
dcTrafficSectionDataService.processRealtimeMessage(msg);
}
// 创建DateTimeFormatter实例并指定输出格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 格式化LocalDateTime对象为字符串
return localDateTime.format(formatter);
/**
* 气象检测器消息处理入口
* @param msg 设备消息
*/
private void weatherDetectorMessageHandle(JSONObject msg) {
DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) msg.get("properties");
meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString());
meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData);
//设计院中间库 插入设备数据
MdDeviceData mdDeviceData = new MdDeviceData();
mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId());
mdDeviceData.setDevType("3");
mdDeviceData.setTimeStamp(new Date());
mdDeviceData.setCreatorUserId("自动存储");
Map<String,Object> expands = new HashMap<>();
expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量
expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速
expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向
expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度
expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度
expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压
expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑
expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态
expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度
expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度
expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度
expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度
expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况
mdDeviceData.setExpands(JSONObject.toJSONString(expands));
middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData);
}
}

6
zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java

@ -1,12 +1,11 @@
package com.zc.business.message.device.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zc.business.message.device.handler.DeviceMessageHandler;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -21,9 +20,8 @@ import javax.annotation.Resource;
@Component
public class DeviceEventListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class);
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Resource

4
zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java

@ -4,6 +4,7 @@ import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -15,9 +16,8 @@ import org.springframework.stereotype.Component;
@Component
public class DeviceFunctionReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class);
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override

4
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java

@ -4,6 +4,7 @@ import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -15,9 +16,8 @@ import org.springframework.stereotype.Component;
@Component
public class DevicePropertyReadReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class);
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override

57
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java

@ -7,6 +7,7 @@ import com.zc.business.domain.DcDevice;
import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.domain.MdDeviceData;
import com.zc.business.enums.IotProductEnum;
import com.zc.business.message.device.handler.DeviceMessageHandler;
import com.zc.business.service.IDcMeteorologicalDetectorDataService;
import com.zc.business.service.IMiddleDatabaseService;
import com.zc.business.service.IDcYzsqbdcHmbldDataService;
@ -14,6 +15,7 @@ import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -32,68 +34,21 @@ import java.util.Map;
@Component
public class DevicePropertyReportListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class);
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Autowired
private IMiddleDatabaseService middleDatabaseService;
@Autowired
private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService;
@Resource
private DeviceMessageHandler deviceMessageHandler;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
threadPoolTaskExecutor.execute(() -> {
Map data = JSON.parseObject(message.getValue(), HashMap.class);
Map<String,Object> headers = (Map<String, Object>) data.get("headers");
if (headers.get("productId") != null){
String productId = headers.get("productId").toString();
//气象检测器
if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)){
DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) data.get("properties");
meteorologicalDetectorData.setIotDeviceId(data.get("deviceId").toString());
meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData);
//设计院中间库 插入设备数据
MdDeviceData mdDeviceData = new MdDeviceData();
mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId());
mdDeviceData.setDevType("3");
mdDeviceData.setTimeStamp(new Date());
mdDeviceData.setCreatorUserId("自动存储");
Map<String,Object> expands = new HashMap<>();
expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量
expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速
expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向
expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度
expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度
expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压
expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑
expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态
expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度
expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度
expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度
expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度
expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况
mdDeviceData.setExpands(JSONObject.toJSONString(expands));
middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData);
} else if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)){
//交调
dcYzsqbdcHmbldDataService.addDcYzsqbdcHmbldDataList(data);
}
}
});
threadPoolTaskExecutor.execute(() -> deviceMessageHandler.handle(message.getValue()));
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));

4
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java

@ -4,6 +4,7 @@ import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -15,9 +16,8 @@ import org.springframework.stereotype.Component;
@Component
public class DevicePropertyWriteReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class);
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override

9
zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java

@ -3,10 +3,11 @@ package com.zc.business.message.device.listener;
import com.alibaba.fastjson.JSON;
import com.zc.business.domain.DcDevice;
import com.zc.business.message.device.handler.DeviceMessageHandler;
import com.zc.business.service.IDcDeviceService;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -15,7 +16,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 离线消息监听
@ -23,9 +23,8 @@ import java.util.stream.Collectors;
@Component
public class OfflineMessageListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class);
@Resource
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Resource

4
zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java

@ -8,6 +8,7 @@ import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@ -23,9 +24,8 @@ import java.util.List;
@Component
public class OnlineMessageListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class);
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Resource

15
zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java

@ -0,0 +1,15 @@
package com.zc.business.service;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.IService;
import com.zc.business.domain.DcTrafficSectionData;
public interface DcTrafficSectionDataService extends IService<DcTrafficSectionData> {
/**
* 处理实时接收到的设备消息并将其转换为交通断面统计数据对象并缓存
*
* @param msg 设备发送的JSON格式实时消息
*/
void processRealtimeMessage(JSONObject msg);
}

431
zc-business/src/main/java/com/zc/business/service/impl/DcEmergencyPlansServiceImpl.java

@ -21,6 +21,8 @@ import com.zc.business.service.DcEmergencyPlansService;
import com.zc.business.service.DcExecuteActionService;
import com.zc.business.service.IDcDeviceService;
import com.zc.common.core.httpclient.exception.HttpException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -31,6 +33,7 @@ import java.util.*;
import java.util.stream.Collectors;
@Service
@Slf4j
public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
@Resource
@ -46,6 +49,9 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
@Resource
private EventPlanAssocMapper eventPlanAssocMapper;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* 查询事件预案
@ -103,7 +109,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
JSONObject triggerJson = JSONObject.parseObject(triggerMechanism);
String locationType = triggerJson.get("locationType").toString();
DcEventVehicleAccident dcEventVehicleAccident = event.getDcEventVehicleAccident();
String eventLocationType =dcEventVehicleAccident.getLocationType().toString();
String eventLocationType = dcEventVehicleAccident.getLocationType().toString();
return locationType.equals(eventLocationType);
})
.collect(Collectors.toList());
@ -141,7 +147,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
int warningType = Integer.parseInt(dcWarning.getWarningType().toString());
if (warningType == WarningTypeEnum.UNUSUAL_WEATHER.getCode()) {
if (warningType == WarningTypeEnum.UNUSUAL_WEATHER.getCode()) {
return dcEmergencyPlansList.stream()
.filter(dcEmergencyPlans -> {
String triggerMechanism = dcEmergencyPlans.getTriggerMechanism();
@ -163,13 +169,13 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
* 交通事件-情报板确认回显原始模板
*/
@Override
public Map<String,List<DcInfoBoardTemplate>> eventBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) {
public Map<String, List<DcInfoBoardTemplate>> eventBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) {
// 获取事件数据
DcEvent dcEvent = dcEventAnDcEmergencyPlans.getDcEvent();
// 方向
String direction = dcEvent.getDirection();
// 事件桩号
dcEvent.setStakeMark(dcEvent.getStakeMark().replace("K",""));
dcEvent.setStakeMark(dcEvent.getStakeMark().replace("K", ""));
String[] markArray = dcEvent.getStakeMark().split("\\+");
if (markArray[1].length() < 3) {
// 不足三位 补零
@ -184,13 +190,13 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
* 感知事件-情报板确认回显原始模板
*/
@Override
public Map<String,List<DcInfoBoardTemplate>> warningBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) {
public Map<String, List<DcInfoBoardTemplate>> warningBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) {
// 获取事件数据
DcWarning dcWarning = dcEventAnDcEmergencyPlans.getDcWarning();
// 方向
String direction = dcWarning.getDirection();
// 事件桩号
dcWarning.setStakeMark(dcWarning.getStakeMark().replace("K",""));
dcWarning.setStakeMark(dcWarning.getStakeMark().replace("K", ""));
String[] markArray = dcWarning.getStakeMark().split("\\+");
if (markArray[1].length() < 3) {
// 不足三位 补零
@ -204,48 +210,50 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
/**
* 情报板设备执行3A功能,获取模板
*/
public Map<String,List<DcInfoBoardTemplate>> getBoardTemplate(List<DcDevice> dcDevices) {
Map<String,List<DcInfoBoardTemplate>> map = new HashMap<>();
public Map<String, List<DcInfoBoardTemplate>> getBoardTemplate(List<DcDevice> dcDevices) {
Map<String, List<DcInfoBoardTemplate>> map = new HashMap<>();
dcDevices.forEach(dcDevice -> {
try {
if (StringUtils.isEmpty(dcDevice.getIotDeviceId())) {
return;
}
AjaxResult ajaxResult = dcDeviceController.getDeviceRealtimeProperty(dcDevice.getIotDeviceId(), "3A", new HashMap<>());
if (ajaxResult.get("code").equals(200)) {
JSONObject properties = JSON.parseObject(JSON.parseObject(ajaxResult.get("data").toString()).get("3A").toString());
JSONArray contentArray = JSONArray.parseArray(properties.get("content").toString());
List<DcInfoBoardTemplate> list = new ArrayList<>();
contentArray.forEach(content -> {
DcInfoBoardTemplate dcInfoBoardTemplate = new DcInfoBoardTemplate();
JSONObject jsonObject = JSON.parseObject(content.toString());
String displayAreaWidth = jsonObject.get("displayAreaWidth").toString();
String displayAreaHeight = jsonObject.get("displayAreaHeight").toString();
// 内容
dcInfoBoardTemplate.setContent(jsonObject.get("textContent").toString());
// 前景颜色
dcInfoBoardTemplate.setFontColor(jsonObject.get("foregroundColor").toString());
// 屏幕尺寸
dcInfoBoardTemplate.setScreenSize(displayAreaWidth+"*"+displayAreaHeight);
// 字号
dcInfoBoardTemplate.setFontSize(jsonObject.get("fontSize").toString());
// 字体风格
dcInfoBoardTemplate.setFontType(jsonObject.get("fontStyle").toString());
// 字距
dcInfoBoardTemplate.setFontSpacing(jsonObject.get("fontSpacing").toString());
// 停留时间
dcInfoBoardTemplate.setStopTime(jsonObject.get("residenceTime").toString());
// 入屏方式
dcInfoBoardTemplate.setInScreenMode(jsonObject.get("screenEntryMethod").toString());
// 水平对齐
dcInfoBoardTemplate.setFormatStyle(jsonObject.get("horizontalAlignment").toString());
list.add(dcInfoBoardTemplate);
});
map.put(dcDevice.getDeviceName(),list);
threadPoolTaskExecutor.execute(() -> {
try {
if (StringUtils.isEmpty(dcDevice.getIotDeviceId())) {
return;
}
AjaxResult ajaxResult = dcDeviceController.getDeviceRealtimeProperty(dcDevice.getIotDeviceId(), "3A", new HashMap<>());
if (ajaxResult.get("code").equals(200)) {
JSONObject properties = JSON.parseObject(JSON.parseObject(ajaxResult.get("data").toString()).get("3A").toString());
JSONArray contentArray = JSONArray.parseArray(properties.get("content").toString());
List<DcInfoBoardTemplate> list = new ArrayList<>();
contentArray.forEach(content -> {
DcInfoBoardTemplate dcInfoBoardTemplate = new DcInfoBoardTemplate();
JSONObject jsonObject = JSON.parseObject(content.toString());
String displayAreaWidth = jsonObject.get("displayAreaWidth").toString();
String displayAreaHeight = jsonObject.get("displayAreaHeight").toString();
// 内容
dcInfoBoardTemplate.setContent(jsonObject.get("textContent").toString());
// 前景颜色
dcInfoBoardTemplate.setFontColor(jsonObject.get("foregroundColor").toString());
// 屏幕尺寸
dcInfoBoardTemplate.setScreenSize(displayAreaWidth + "*" + displayAreaHeight);
// 字号
dcInfoBoardTemplate.setFontSize(jsonObject.get("fontSize").toString());
// 字体风格
dcInfoBoardTemplate.setFontType(jsonObject.get("fontStyle").toString());
// 字距
dcInfoBoardTemplate.setFontSpacing(jsonObject.get("fontSpacing").toString());
// 停留时间
dcInfoBoardTemplate.setStopTime(jsonObject.get("residenceTime").toString());
// 入屏方式
dcInfoBoardTemplate.setInScreenMode(jsonObject.get("screenEntryMethod").toString());
// 水平对齐
dcInfoBoardTemplate.setFormatStyle(jsonObject.get("horizontalAlignment").toString());
list.add(dcInfoBoardTemplate);
});
map.put(dcDevice.getDeviceName(), list);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
});
});
return map;
@ -253,12 +261,13 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
/**
* 执行操作中的规则筛选
*
* @param dcExecuteAction
* @param markArray
* @param direction
* @return
*/
public List<DcDevice> ruleFiltering(DcExecuteAction dcExecuteAction,String[] markArray,String direction){
public List<DcDevice> ruleFiltering(DcExecuteAction dcExecuteAction, String[] markArray, String direction) {
Integer searchRule = dcExecuteAction.getSearchRule();
List<String> start = new ArrayList<>();
@ -273,11 +282,10 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
// 根据设备id,获取设备集合
LambdaQueryWrapper<DcDevice> queryWrapper = new LambdaQueryWrapper<>();
JSONObject otherConfig = JSON.parseObject(dcExecuteAction.getOtherConfig());
List<String> deviceList = (List<String>)otherConfig.get("deviceList");
List<String> deviceList = (List<String>) otherConfig.get("deviceList");
queryWrapper.in(DcDevice::getIotDeviceId, deviceList);
dcDevices = dcDeviceService.list(queryWrapper);
}
else if (searchRule.equals(2)) {
} else if (searchRule.equals(2)) {
// 事件上游最近
if (direction.equals("1")) {
@ -296,7 +304,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
}
});
if (dcDevices.size() > 0) {
dcDevices = dcDevices.subList(0 , dcExecuteAction.getNumber());
dcDevices = dcDevices.subList(0, dcExecuteAction.getNumber());
}
} else {
// 下行 取最小的几个
@ -318,8 +326,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
}
}
}
else if (searchRule.equals(3)) {
} else if (searchRule.equals(3)) {
// 事件下游最近
if (direction.equals("1")) {
// 上行 取最大的几个
@ -360,8 +367,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
dcDevices = dcDevices.subList(0, dcExecuteAction.getNumber());
}
}
}
else {
} else {
// 最近公里数
Integer kilometers = Integer.parseInt(markArray[0].replaceAll("K", ""));
// 根据事件桩号、公里数 计算出 桩号范围
@ -395,11 +401,12 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
String direction = dcEvent.getDirection();
// 事件编号
String id = dcEvent.getId();
return executionConfirmation(dcEventAnDcEmergencyPlans,dcEvent.getStakeMark(),direction,id);
return executionConfirmation(dcEventAnDcEmergencyPlans, dcEvent.getStakeMark(), direction, id);
}
/**
* 感知事件-情报板自动生成
*
* @param dcEventAnDcEmergencyPlans
* @return
*/
@ -411,17 +418,17 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
Integer warningType = dcWarning.getWarningType();
if (warningType.equals(WarningTypeEnum.TRAFFIC_JAM.getCode())) {
// 交通拥堵
dcInfoBoardTemplate.setContent("前方"+WarningTypeEnum.TRAFFIC_JAM.getInfo()+"请谨慎驾驶");
}else if (warningType.equals(WarningTypeEnum.NON_MOTOR_VEHICLE.getCode())) {
dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.NON_MOTOR_VEHICLE.getInfo()+"请注意避让");
}else if (warningType.equals(WarningTypeEnum.PEDESTRIAN.getCode())) {
dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.PEDESTRIAN.getInfo()+"请注意避让");
}else if (warningType.equals(WarningTypeEnum.FIREWORKS.getCode())) {
dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.FIREWORKS.getInfo()+"请注意避让");
}else if (warningType.equals(WarningTypeEnum.OUTFALL.getCode())) {
dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.OUTFALL.getInfo()+"请注意避让");
dcInfoBoardTemplate.setContent("前方" + WarningTypeEnum.TRAFFIC_JAM.getInfo() + "请谨慎驾驶");
} else if (warningType.equals(WarningTypeEnum.NON_MOTOR_VEHICLE.getCode())) {
dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.NON_MOTOR_VEHICLE.getInfo() + "请注意避让");
} else if (warningType.equals(WarningTypeEnum.PEDESTRIAN.getCode())) {
dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.PEDESTRIAN.getInfo() + "请注意避让");
} else if (warningType.equals(WarningTypeEnum.FIREWORKS.getCode())) {
dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.FIREWORKS.getInfo() + "请注意避让");
} else if (warningType.equals(WarningTypeEnum.OUTFALL.getCode())) {
dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.OUTFALL.getInfo() + "请注意避让");
} else if (warningType.equals(WarningTypeEnum.VEHICLE_CONVERSE_RUNNING.getCode())) {
dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.OUTFALL.getInfo()+"请注意避让");
dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.OUTFALL.getInfo() + "请注意避让");
}
return dcInfoBoardTemplate;
@ -429,6 +436,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
/**
* 交通事件-情报板自动生成
*
* @return
*/
@Override
@ -475,7 +483,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
.orElse("前方存在限行或关闭");
dcInfoBoardTemplate.setContent(content);
}else {
} else {
// 施工建设
dcInfoBoardTemplate.setContent("前方施工请注意驾驶");
}
@ -484,6 +492,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
/**
* 感知事件确认
*
* @param dcEventAnDcEmergencyPlans 事件数据 事件预案数据
* @return
*/
@ -496,11 +505,12 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
String direction = dcWarning.getDirection();
// 事件编号
String id = dcWarning.getId();
return executionConfirmation(dcEventAnDcEmergencyPlans,dcWarning.getStakeMark(),direction,id);
return executionConfirmation(dcEventAnDcEmergencyPlans, dcWarning.getStakeMark(), direction, id);
}
/**
* 事件确认
*
* @param dcEventAnDcEmergencyPlans 事件数据 事件预案数据
* @return
*/
@ -513,7 +523,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
// 存储所有设备的执行结果
JSONArray resultArray = new JSONArray();
// 事件桩号
stakeMark = stakeMark.replace("K","");
stakeMark = stakeMark.replace("K", "");
String[] markArray = stakeMark.split("\\+");
if (markArray[1].length() < 3) {
// 不足三位 补零
@ -575,156 +585,175 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService {
/**
* 根据不通设备类型执行不通的功能操作
*/
public void invokedFunction(
Integer operationType,
public void invokedFunction(Integer operationType,
List<DcDevice> dcDevices,
JSONObject otherConfig,
JSONArray resultArray) throws HttpException, IOException {
String iotDeviceId = "";
String functionId = "";
JSONArray resultArray) {
for (DcDevice device : dcDevices) {
iotDeviceId = device.getIotDeviceId();
HashMap<String, Object> props = new HashMap<>();
if (device.getDeviceType().equals(DeviceTypeConstants.DRIVING_GUIDANCE)) {
// 行车诱导
functionId = DeviceFunctionIdConstants.DRIVING_GUIDANCE;
// 控制模式 1-手动 2-自动 3-万年历
String controlModel = otherConfig.get("controlModel").toString();
props.put("onWorkStatus", otherConfig.get("state").toString());
props.put("inWorkStatus", otherConfig.get("state").toString());
if (controlModel.equals("1")) {
props.put("mode", "00");
} else if (controlModel.equals("2")) {
String startTime = otherConfig.get("startTime").toString();
String endTime = otherConfig.get("endTime").toString();
props.put("mode", "01");
props.put("startDisplayTime", startTime);
props.put("endDisplayTime", endTime);
} else {
props.put("mode", "02");
}
AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, functionId, props);
// 将调用结果存入到 resultArray(操作结果) 中
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", ajaxResult);
resultArray.add(result);
threadPoolTaskExecutor.execute(() -> {
String iotDeviceId = "";
String functionId = "";
iotDeviceId = device.getIotDeviceId();
HashMap<String, Object> props = new HashMap<>();
try {
if (device.getDeviceType().equals(DeviceTypeConstants.DRIVING_GUIDANCE)) {
// 行车诱导
functionId = DeviceFunctionIdConstants.DRIVING_GUIDANCE;
// 控制模式 1-手动 2-自动 3-万年历
String controlModel = otherConfig.get("controlModel").toString();
props.put("onWorkStatus", otherConfig.get("state").toString());
props.put("inWorkStatus", otherConfig.get("state").toString());
if (controlModel.equals("1")) {
props.put("mode", "00");
} else if (controlModel.equals("2")) {
String startTime = otherConfig.get("startTime").toString();
String endTime = otherConfig.get("endTime").toString();
props.put("mode", "01");
props.put("startDisplayTime", startTime);
props.put("endDisplayTime", endTime);
} else {
props.put("mode", "02");
}
}
else if (device.getDeviceType().equals(DeviceTypeConstants.VARIABLE_INFORMATION_FLAG)) {
if (operationType == 1) {
// 执行操作
// 可变信息标志 分三步
// 1:执行11功能码
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_11;
props.put("fileName","play011.lst");
props.put("size","65535");
AjaxResult ajaxResult11 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props);
if (ajaxResult11.get("code").equals(200)) {
// 2:执行13功能码
HashMap<String, Object> props11 = new HashMap<>();
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_13;
List<Map<String,String>> list = new ArrayList<>();
Map<String,String> parameters = new HashMap<>();
DcInfoBoardTemplate dcInfoBoardTemplate = JSON.parseObject(
JSON.toJSONString(otherConfig.get("dcInfoBoardTemplate")),
DcInfoBoardTemplate.class);
// stopTime
parameters.put("STAY",dcInfoBoardTemplate.getStopTime());
// inScreenMode
parameters.put("ACTION",dcInfoBoardTemplate.getInScreenMode());
// fontSpacing
parameters.put("SPEED",dcInfoBoardTemplate.getFontSpacing());
// fontColor
parameters.put("COLOR",dcInfoBoardTemplate.getFontColor());
// fontType
parameters.put("FONT",dcInfoBoardTemplate.getFontType());
// fontSize
parameters.put("FONT_SIZE",dcInfoBoardTemplate.getFontSize());
// content
parameters.put("CONTENT",dcInfoBoardTemplate.getContent());
// screenSize 768*64 宽度和高度
parameters.put("width",dcInfoBoardTemplate.getScreenSize().split("\\*")[0]);
parameters.put("height",dcInfoBoardTemplate.getScreenSize().split("\\*")[1]);
// formatStyle
parameters.put("formatStyle",dcInfoBoardTemplate.getFormatStyle());
list.add(parameters);
props11.put("parameters",list);
AjaxResult ajaxResult13 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props11);
AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, functionId, props);
// 将调用结果存入到 resultArray(操作结果) 中
JSONObject result = new JSONObject();
if (ajaxResult13.get("code").equals(200)) {
HashMap<String, Object> props1B = new HashMap<>();
// 3: 执行1B功能码
result.put("device", device.getId());
result.put("result", ajaxResult);
resultArray.add(result);
}
else if (device.getDeviceType().equals(DeviceTypeConstants.VARIABLE_INFORMATION_FLAG)) {
if (operationType == 1) {
// 执行操作
// 可变信息标志 分三步
// 1:执行11功能码
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_11;
props.put("fileName", "play011.lst");
props.put("size", "65535");
AjaxResult ajaxResult11;
ajaxResult11 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props);
if (ajaxResult11.get("code").equals(200)) {
// 2:执行13功能码
HashMap<String, Object> props11 = new HashMap<>();
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_13;
List<Map<String, String>> list = new ArrayList<>();
Map<String, String> parameters = new HashMap<>();
DcInfoBoardTemplate dcInfoBoardTemplate = JSON.parseObject(
JSON.toJSONString(otherConfig.get("dcInfoBoardTemplate")),
DcInfoBoardTemplate.class);
// stopTime
parameters.put("STAY", dcInfoBoardTemplate.getStopTime());
// inScreenMode
parameters.put("ACTION", dcInfoBoardTemplate.getInScreenMode());
// fontSpacing
parameters.put("SPEED", dcInfoBoardTemplate.getFontSpacing());
// fontColor
parameters.put("COLOR", dcInfoBoardTemplate.getFontColor());
// fontType
parameters.put("FONT", dcInfoBoardTemplate.getFontType());
// fontSize
parameters.put("FONT_SIZE", dcInfoBoardTemplate.getFontSize());
// content
parameters.put("CONTENT", dcInfoBoardTemplate.getContent());
// screenSize 768*64 宽度和高度
parameters.put("width", dcInfoBoardTemplate.getScreenSize().split("\\*")[0]);
parameters.put("height", dcInfoBoardTemplate.getScreenSize().split("\\*")[1]);
// formatStyle
parameters.put("formatStyle", dcInfoBoardTemplate.getFormatStyle());
list.add(parameters);
props11.put("parameters", list);
AjaxResult ajaxResult13 = null;
ajaxResult13 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props11);
JSONObject result = new JSONObject();
if (ajaxResult13.get("code").equals(200)) {
HashMap<String, Object> props1B = new HashMap<>();
// 3: 执行1B功能码
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_1B;
props1B.put("fileId", "11");
AjaxResult ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props1B);
result.put("device", device.getId());
result.put("result", ajaxResult1B);
resultArray.add(result);
} else {
result.put("device", device.getId());
result.put("result", ajaxResult13);
resultArray.add(result);
}
}
} else {
// 恢复操作
props.put("fileId", "10");
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_1B;
props1B.put("fileId","11");
AjaxResult ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props1B);
AjaxResult ajaxResult1B;
ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props);
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", ajaxResult1B);
resultArray.add(result);
}else {
result.put("device", device.getId());
result.put("result", ajaxResult13);
resultArray.add(result);
}
}
}
else {
// 恢复操作
props.put("fileId","10");
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_1B;
AjaxResult ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props);
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", ajaxResult1B);
resultArray.add(result);
}
else if (device.getDeviceType().equals(DeviceTypeConstants.ROAD_SECTION_VOICE_BROADCASTING)) {
// 路段广播
JSONObject params = new JSONObject();
params.put("name", "task-event");
params.put("outVVol", "8");
params.put("priority", "1");
params.put("text", otherConfig.get("content"));
params.put("repeatTimes", "3");
params.put("functionType", "startPaTts");
JSONArray termList = new JSONArray();
termList.add(JSON.parseObject(device.getOtherConfig()));
params.put("termList", termList);
JSONObject returnResult = broadcastController.nearCamListDistance(params);
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", returnResult);
resultArray.add(result);
}
else if (device.getDeviceType().equals(DeviceTypeConstants.ROAD_SECTION_VOICE_BROADCASTING)) {
// 路段广播
JSONObject params = new JSONObject();
params.put("name","task-event");
params.put("outVVol","8");
params.put("priority","1");
params.put("text",otherConfig.get("content"));
params.put("repeatTimes","3");
params.put("functionType","startPaTts");
JSONArray termList = new JSONArray();
termList.add(JSON.parseObject(device.getOtherConfig()));
params.put("termList",termList);
JSONObject returnResult =broadcastController.nearCamListDistance(params);
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", returnResult);
resultArray.add(result);
}
else if (device.getDeviceType().equals(DeviceTypeConstants.LASER_FATIGUE_AWAKENING)) {
// 激光疲劳唤醒
functionId = otherConfig.get("state").toString();
AjaxResult ajaxResultState = dcDeviceController.invokedFunction(iotDeviceId, functionId, new HashMap<>());
// 将调用结果存入到 resultArray(操作结果) 中
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", ajaxResultState);
resultArray.add(result);
// 操作时长
String operationDuration = "SETTM" + otherConfig.get("operationDuration").toString();
AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, operationDuration, new HashMap<>());
JSONObject resultTime = new JSONObject();
resultTime.put("device", device.getId());
resultTime.put("result", ajaxResult);
resultArray.add(resultTime);
} else {
break;
}
}
else if (device.getDeviceType().equals(DeviceTypeConstants.LASER_FATIGUE_AWAKENING)) {
// 激光疲劳唤醒
functionId = otherConfig.get("state").toString();
AjaxResult ajaxResultState = dcDeviceController.invokedFunction(iotDeviceId, functionId, new HashMap<>());
JSONObject result = new JSONObject();
result.put("device", device.getId());
result.put("result", ajaxResultState);
resultArray.add(result);
// 操作时长
String operationDuration = "SETTM" + otherConfig.get("operationDuration").toString();
HashMap<String, Object> propsTime = new HashMap<>();
propsTime.put("SET", operationDuration);
functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_SETTM;
AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, functionId, propsTime);
JSONObject resultTime = new JSONObject();
resultTime.put("device", device.getId());
resultTime.put("result", ajaxResult);
resultArray.add(resultTime);
}
} catch (HttpException | IOException e) {
log.error(e.toString());
throw new RuntimeException(e);
}
});
}
}

166
zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java

@ -0,0 +1,166 @@
package com.zc.business.service.impl;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import com.zc.business.statistics.cache.*;
import com.zc.business.mapper.DcTrafficSectionDataMapper;
import com.zc.business.service.DcTrafficSectionDataService;
import com.zc.business.statistics.handler.RealtimeTrafficStatistics;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
/**
* 交通断面数据服务实现类负责处理实时设备消息缓存数据定时任务以及数据保存等功能
*
* @author xiepufeng
*/
@Service
public class DcTrafficSectionDataServiceImpl
extends ServiceImpl<DcTrafficSectionDataMapper, DcTrafficSectionData>
implements DcTrafficSectionDataService {
@Resource
private DcTrafficSectionDataMapper dcTrafficSectionDataMapper;
/**
* 初始化方法用于在对象创建后恢复各种周期的交通数据缓存
* 该方法标注了@PostConstruct注解确保在依赖注入完成后调用
*/
@PostConstruct
public void init() {
// TODO 恢复每天交通数据缓存(es获取数据)
recoveryMonthlyCache(); // 恢复每月交通数据缓存
recoveryQuarterlyCache(); // 恢复每季度交通数据缓存
recoveryYearlyCache(); // 恢复每年交通数据缓存
}
/**
* 恢复每月交通数据缓存的方法
* 通过查询当前月份至今的每日交通数据并将其添加到每月交通统计缓存中
*/
private void recoveryMonthlyCache() {
// 构建查询条件,查询当前月份至今的每日交通数据
LambdaQueryWrapper<DcTrafficSectionData> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.DAY);
queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfMonth(new Date()), new Date());
List<DcTrafficSectionData> dcTrafficSectionDataList = this.list(queryWrapper);
// 遍历查询结果,将每日数据添加到每月交通统计缓存
dcTrafficSectionDataList.forEach(MonthlyTrafficStatisticsCache::addCacheData);
}
/**
* 恢复每季度交通数据缓存的方法
* 通过查询当前季度至今的每月交通数据并将其添加到每季度交通统计缓存中
*/
private void recoveryQuarterlyCache() {
// 构建查询条件,查询当前季度至今的每月交通数据
LambdaQueryWrapper<DcTrafficSectionData> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.MONTH);
queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfQuarter(new Date()), new Date());
List<DcTrafficSectionData> dcTrafficSectionDataList = this.list(queryWrapper);
// 遍历查询结果,将每月数据添加到每季度交通统计缓存
dcTrafficSectionDataList.forEach(QuarterlyTrafficStatisticsCache::addCacheData);
}
/**
* 恢复每年交通数据缓存的方法
* 通过查询当前年份至今的每季度交通数据并将其添加到每年交通统计缓存中
*/
private void recoveryYearlyCache() {
// 构建查询条件,查询当前年份至今的每季度交通数据
LambdaQueryWrapper<DcTrafficSectionData> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.QUARTER);
queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfYear(new Date()), new Date());
List<DcTrafficSectionData> dcTrafficSectionDataList = this.list(queryWrapper);
// 遍历查询结果,将每季度数据添加到每年交通统计缓存
dcTrafficSectionDataList.forEach(YearlyTrafficStatisticsCache::addCacheData);
}
/**
* 处理实时接收到的设备消息并将其转换为交通断面统计数据对象并缓存
*
* @param msg 设备发送的JSON格式实时消息
*/
@Override
public void processRealtimeMessage(JSONObject msg) {
// 1. 将设备消息转换为交通断面数据统计定义对象
DcTrafficSectionData dcTrafficSectionData = convertToTrafficStatistics(msg);
// 2. 将转换后的数据添加到缓存中
DailyTrafficStatisticsCache.addCacheData(dcTrafficSectionData);
}
/**
* 将设备实时消息转换为交通断面数据统计定义对象
*
* @param msg JSON格式的设备实时消息
* @return 转换后的交通断面数据统计定义对象
*/
public DcTrafficSectionData convertToTrafficStatistics(JSONObject msg) {
DcTrafficSectionData dcTrafficSectionData = new DcTrafficSectionData();
// TODO
return dcTrafficSectionData;
}
/**
* 定义每小时第20分钟执行的任务用于清除过期缓存数据并将缓存中的数据整合后保存至数据库
*/
@Scheduled(cron = "0 20 * * * ?") // 每小时的20分整点执行该任务
public void performHourlyCleanupAndPersist() {
// 清除已过期的缓存数据
DailyTrafficStatisticsCache.clearExpiredData();
MonthlyTrafficStatisticsCache.clearExpiredData();
QuarterlyTrafficStatisticsCache.clearExpiredData();
YearlyTrafficStatisticsCache.clearExpiredData();
// 整合缓存数据并保存至数据库
// 将缓存中的数据按日统计后保存至数据库
// 添加月交通断面数据到缓存中
persistAggregatedData(DailyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.DAY, MonthlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按月统计后保存至数据库
// 添加季度交通断面数据到缓存中
persistAggregatedData(MonthlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.MONTH, QuarterlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按季度统计后保存至数据库
// 添加年交通断面数据到缓存中
persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按年统计后保存至数据库
persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {});
}
/**
* 将缓存中的数据统计后保存至数据库
*/
public void persistAggregatedData(
Map<String, ? extends AbstractTrafficStatisticsCache> cache,
TrafficDataPeriodTypeEnum periodType,
Consumer<DcTrafficSectionData> consumer
) {
for (AbstractTrafficStatisticsCache data : cache.values()) {
// 如果数据已经存储过,则跳过此次处理
if (data.isStored()) {
continue;
}
DcTrafficSectionData aggregatedData = RealtimeTrafficStatistics.trafficStatistics(data.getData(), periodType);
if (dcTrafficSectionDataMapper.insertOrUpdate(aggregatedData)) {
// 设置数据已存储状态
data.setStored(true);
// 调用回调函数
consumer.accept(aggregatedData);
}
}
}
}

52
zc-business/src/main/java/com/zc/business/statistics/cache/AbstractTrafficStatisticsCache.java

@ -0,0 +1,52 @@
package com.zc.business.statistics.cache;
import com.zc.business.domain.DcTrafficSectionData;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
/**
* 交通断面数据缓存定义
* @author xiepufeng
*/
@Getter
@Setter
public abstract class AbstractTrafficStatisticsCache {
// 日志记录器
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 缓存对象的键由设备ID与统计日期组成
*/
private String cacheKey;
/**
* 设备上报日期字符串
*/
private String statisticalDateStr;
/**
* 数据最后添加到缓存中的时间
*/
private Date lastAddedTime;
/**
* 标记该缓存实例是否已存储过数据
*/
private boolean stored;
/**
* 存储具体交通断面数据的列表
*/
private Collection<DcTrafficSectionData> data;
public AbstractTrafficStatisticsCache() {
this.data = new HashSet<>();
}
}

137
zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java

@ -0,0 +1,137 @@
package com.zc.business.statistics.cache;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import lombok.Getter;
import lombok.Setter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
*
* 以天为单位的交通交通断面数据缓存类用于存储和管理设备上报的交通断面统计数据同时提供了数据缓存的有效性管理和清理功能
* @author xiepufeng
*/
@Getter
@Setter
public class DailyTrafficStatisticsCache extends AbstractTrafficStatisticsCache {
@Getter
// 静态缓存容器,使用ConcurrentHashMap保证线程安全
private static final Map<String, DailyTrafficStatisticsCache> cache = new ConcurrentHashMap<>();
// 最大缓存时间(单位:秒)
private static final long MAX_CACHE_TIME = 25 * 60 * 60; // 缓存数据最长保留25小时
// 最大容量限制(一个设备),防止内存溢出
private static final int MAX_CAPACITY = 60/5 * (24 + 1) + 1000; // 缓存的最大条目数
// 私有构造函数,确保只能通过静态方法获取实例
private DailyTrafficStatisticsCache() {
}
/**
* 添加交通断面数据到缓存中
*
* @param dcTrafficSectionData 待添加的交通断面统计数据
*/
public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) {
// 获取或新建对应的缓存实例
DailyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData);
// 检查缓存容量是否达到上限
if (instance.getData().size() >= MAX_CAPACITY) {
instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId());
return;
}
// 更新最后添加时间
instance.setLastAddedTime(DateUtil.date());
// 更新状态
instance.setStored(false);
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
String key = generateCacheKey(dcTrafficSectionData);
// 设置key和thatDay属性(仅在首次添加时)
if (instance.getCacheKey() == null) {
instance.setCacheKey(key);
instance.setStatisticalDateStr(formattedDate);
}
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.DAY);
// 将新数据添加到数据列表中
instance.getData().add(dcTrafficSectionData);
}
/**
* 获取或创建对应设备与日期的DcTrafficSectionDataCache实例
*
* @param dcTrafficSectionData 交通断面数据统计定义
* @return 对应的交通断面数据缓存实例
*/
private static DailyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) {
// 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例
return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new DailyTrafficStatisticsCache());
}
/**
* 生成缓存键
* 该方法通过设备ID统计日期和道路方向生成一个唯一的缓存键用于存储或检索特定条件下的交通数据
*
* @param dcTrafficSectionData 包含设备ID统计日期和道路方向的交通段数据对象
* @return 返回一个字符串形式的缓存键由设备ID格式化的统计日期和道路方向以"|"字符连接而成
*/
public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) {
// 获取设备ID
Long deviceId = dcTrafficSectionData.getDeviceId();
// 获取并格式化统计日期
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
// 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键
return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection();
}
/**
* 清除所有过期的交通断面数据缓存项
*/
public static void clearExpiredData() {
// 使用stream API找出所有过期的数据缓存项键
Set<String> keysToRemove = cache.keySet().stream()
.filter(DailyTrafficStatisticsCache::isCacheItemExpire)
.collect(Collectors.toSet());
// 安全地从缓存中删除这些过期项
keysToRemove.forEach(cache::remove);
}
/**
* 检查给定缓存键所对应的缓存项是否已经过期
*
* @param key 缓存key
* @return 如果已过期则返回true否则返回false
*/
private static boolean isCacheItemExpire(String key) {
Date lastAddedTime = cache.get(key).getLastAddedTime();
Date currentTime = DateUtil.date();
long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND);
return betweenSecond > MAX_CACHE_TIME;
}
/**
* Date 类型的日期格式化为指定格式的字符串
*
* @param date 需要格式化的 Date 对象
* @return 格式化后的日期字符串
*/
private static String formatDate(Date date) {
// 使用 DateUtil 工具类将 date 格式化为指定格式的字符串
return DateUtil.formatDate(date);
}
}

137
zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java

@ -0,0 +1,137 @@
package com.zc.business.statistics.cache;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import lombok.Getter;
import lombok.Setter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 以月为单位的交通交通断面数据缓存类用于存储和管理设备上报的交通断面统计数据同时提供了数据缓存的有效性管理和清理功能
* @author xiepufeng
*/
@Getter
@Setter
public class MonthlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache {
// 静态缓存容器,使用ConcurrentHashMap保证线程安全
@Getter
private static final Map<String, MonthlyTrafficStatisticsCache> cache = new ConcurrentHashMap<>();
// 最大缓存时间(单位:秒)
private static final long MAX_CACHE_TIME = (60 * 60) * (31 * 24 + 1);
// 最大容量限制,防止内存溢出
private static final int MAX_CAPACITY = 31 + 1000; // 缓存的最大条目数
// 私有构造函数,确保只能通过静态方法获取实例
private MonthlyTrafficStatisticsCache() {
}
/**
* 添加交通断面数据到缓存中
*
* @param dcTrafficSectionData 待添加的交通断面统计数据
*/
public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) {
// 获取或新建对应的缓存实例
MonthlyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData);
// 检查缓存容量是否达到上限
if (instance.getData().size() >= MAX_CAPACITY) {
instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId());
return;
}
// 更新最后添加时间
instance.setLastAddedTime(DateUtil.date());
// 更新状态
instance.setStored(false);
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
String key = generateCacheKey(dcTrafficSectionData);
// 设置key和thatDay属性(仅在首次添加时)
if (instance.getCacheKey() == null) {
instance.setCacheKey(key);
instance.setStatisticalDateStr(formattedDate);
}
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.MONTH);
// 将新数据添加到数据列表中
instance.getData().add(dcTrafficSectionData);
}
/**
* 获取或创建对应设备与日期的DcTrafficSectionDataCache实例
*
* @param dcTrafficSectionData 交通断面数据统计定义
* @return 对应的交通断面数据缓存实例
*/
private static MonthlyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) {
// 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例
return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new MonthlyTrafficStatisticsCache());
}
/**
* 生成缓存键
* 该方法通过设备ID统计日期和道路方向生成一个唯一的缓存键用于存储或检索特定条件下的交通数据
*
* @param dcTrafficSectionData 包含设备ID统计日期和道路方向的交通段数据对象
* @return 返回一个字符串形式的缓存键由设备ID格式化的统计日期和道路方向以"|"字符连接而成
*/
public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) {
// 获取设备ID
Long deviceId = dcTrafficSectionData.getDeviceId();
// 获取并格式化统计日期
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
// 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键
return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection();
}
/**
* 清除所有过期的交通断面数据缓存项
*/
public static void clearExpiredData() {
// 使用stream API找出所有过期的数据缓存项键
Set<String> keysToRemove = cache.keySet().stream()
.filter(MonthlyTrafficStatisticsCache::isCacheItemExpire)
.collect(Collectors.toSet());
// 安全地从缓存中删除这些过期项
keysToRemove.forEach(cache::remove);
}
/**
* 检查给定缓存键所对应的缓存项是否已经过期
*
* @param key 缓存key
* @return 如果已过期则返回true否则返回false
*/
private static boolean isCacheItemExpire(String key) {
Date lastAddedTime = cache.get(key).getLastAddedTime();
Date currentTime = DateUtil.date();
long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND);
return betweenSecond > MAX_CACHE_TIME;
}
/**
* Date 类型的日期格式化为 "yyyy-MM-01" 格式的字符串
* @param date 需要格式化的日期对象
* @return 格式化后的日期字符串
*/
private static String formatDate(Date date) {
// 使用 DateUtil 工具类将 date 格式化为指定格式的字符串
return DateUtil.format(date, "yyyy-MM-01");
}
}

142
zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java

@ -0,0 +1,142 @@
package com.zc.business.statistics.cache;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 以季度为单位的交通交通断面数据缓存类用于存储和管理设备上报的交通断面统计数据同时提供了数据缓存的有效性管理和清理功能
* @author xiepufeng
*/
@Getter
@Setter
public class QuarterlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache {
// 静态缓存容器,使用ConcurrentHashMap保证线程安全
@Getter
private static final Map<String, QuarterlyTrafficStatisticsCache> cache = new ConcurrentHashMap<>();
// 最大缓存时间(单位:秒)
private static final long MAX_CACHE_TIME = (60 * 60) * (3 * 31 * 24 + 1);
// 最大容量限制,防止内存溢出
private static final int MAX_CAPACITY = 3 + 1000; // 缓存的最大条目数
// 私有构造函数,确保只能通过静态方法获取实例
private QuarterlyTrafficStatisticsCache() {
}
/**
* 添加交通断面数据到缓存中
*
* @param dcTrafficSectionData 待添加的交通断面统计数据
*/
public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) {
// 获取或新建对应的缓存实例
QuarterlyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData);
// 检查缓存容量是否达到上限
if (instance.getData().size() >= MAX_CAPACITY) {
instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId());
return;
}
// 更新最后添加时间
instance.setLastAddedTime(DateUtil.date());
// 更新状态
instance.setStored(false);
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
String key = generateCacheKey(dcTrafficSectionData);
// 设置key和thatDay属性(仅在首次添加时)
if (instance.getCacheKey() == null) {
instance.setCacheKey(key);
instance.setStatisticalDateStr(formattedDate);
}
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.QUARTER);
// 将新数据添加到数据列表中
instance.getData().add(dcTrafficSectionData);
}
/**
* 获取或创建对应设备与日期的DcTrafficSectionDataCache实例
*
* @param dcTrafficSectionData 交通断面数据统计定义
* @return 对应的交通断面数据缓存实例
*/
private static QuarterlyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) {
// 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例
return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new QuarterlyTrafficStatisticsCache());
}
/**
* 生成缓存键
* 该方法通过设备ID统计日期和道路方向生成一个唯一的缓存键用于存储或检索特定条件下的交通数据
*
* @param dcTrafficSectionData 包含设备ID统计日期和道路方向的交通段数据对象
* @return 返回一个字符串形式的缓存键由设备ID格式化的统计日期和道路方向以"|"字符连接而成
*/
public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) {
// 获取设备ID
Long deviceId = dcTrafficSectionData.getDeviceId();
// 获取并格式化统计日期
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
// 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键
return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection();
}
/**
* 清除所有过期的交通断面数据缓存项
*/
public static void clearExpiredData() {
// 使用stream API找出所有过期的数据缓存项键
Set<String> keysToRemove = cache.keySet().stream()
.filter(QuarterlyTrafficStatisticsCache::isCacheItemExpire)
.collect(Collectors.toSet());
// 安全地从缓存中删除这些过期项
keysToRemove.forEach(cache::remove);
}
/**
* 检查给定缓存键所对应的缓存项是否已经过期
*
* @param key 缓存key
* @return 如果已过期则返回true否则返回false
*/
private static boolean isCacheItemExpire(String key) {
Date lastAddedTime = cache.get(key).getLastAddedTime();
Date currentTime = DateUtil.date();
long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND);
return betweenSecond > MAX_CACHE_TIME;
}
/**
* Date 类型的日期格式化为 "yyyy-MM-01" 格式字符串
*
* @param date 需要格式化的日期对象
* @return 格式化后的日期字符串格式为 "yyyy-MM-01"
*/
private static String formatDate(Date date) {
// 使用 DateUtil 工具类将 date 格式化为指定格式的字符串
return DateUtil.format(date, "yyyy-MM-01");
}
}

141
zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java

@ -0,0 +1,141 @@
package com.zc.business.statistics.cache;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 以年为单位的交通交通断面数据缓存类用于存储和管理设备上报的交通断面统计数据同时提供了数据缓存的有效性管理和清理功能
* @author xiepufeng
*/
@Getter
@Setter
public class YearlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache {
// 静态缓存容器,使用ConcurrentHashMap保证线程安全
@Getter
private static final Map<String, YearlyTrafficStatisticsCache> cache = new ConcurrentHashMap<>();
// 最大缓存时间(单位:秒)
private static final long MAX_CACHE_TIME = (60 * 60) * (366 * 24 + 1);
// 最大容量限制,防止内存溢出
private static final int MAX_CAPACITY = 4 + 1000; // 缓存的最大条目数
// 私有构造函数,确保只能通过静态方法获取实例
private YearlyTrafficStatisticsCache() {
}
/**
* 添加交通断面数据到缓存中
*
* @param dcTrafficSectionData 待添加的交通断面统计数据
*/
public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) {
// 获取或新建对应的缓存实例
YearlyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData);
// 检查缓存容量是否达到上限
if (instance.getData().size() >= MAX_CAPACITY) {
instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId());
return;
}
// 更新最后添加时间
instance.setLastAddedTime(DateUtil.date());
// 更新状态
instance.setStored(false);
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
String key = generateCacheKey(dcTrafficSectionData);
// 设置key和thatDay属性(仅在首次添加时)
if (instance.getCacheKey() == null) {
instance.setCacheKey(key);
instance.setStatisticalDateStr(formattedDate);
}
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.YEAR);
// 将新数据添加到数据列表中
instance.getData().add(dcTrafficSectionData);
}
/**
* 获取或创建对应设备与日期的DcTrafficSectionDataCache实例
*
* @param dcTrafficSectionData 交通断面数据统计定义
* @return 对应的交通断面数据缓存实例
*/
private static YearlyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) {
// 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例
return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new YearlyTrafficStatisticsCache());
}
/**
* 生成缓存键
* 该方法通过设备ID统计日期和道路方向生成一个唯一的缓存键用于存储或检索特定条件下的交通数据
*
* @param dcTrafficSectionData 包含设备ID统计日期和道路方向的交通段数据对象
* @return 返回一个字符串形式的缓存键由设备ID格式化的统计日期和道路方向以"|"字符连接而成
*/
public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) {
// 获取设备ID
Long deviceId = dcTrafficSectionData.getDeviceId();
// 获取并格式化统计日期
String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate());
// 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键
return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection();
}
/**
* 清除所有过期的交通断面数据缓存项
*/
public static void clearExpiredData() {
// 使用stream API找出所有过期的数据缓存项键
Set<String> keysToRemove = cache.keySet().stream()
.filter(YearlyTrafficStatisticsCache::isCacheItemExpire)
.collect(Collectors.toSet());
// 安全地从缓存中删除这些过期项
keysToRemove.forEach(cache::remove);
}
/**
* 检查给定缓存键所对应的缓存项是否已经过期
*
* @param key 缓存key
* @return 如果已过期则返回true否则返回false
*/
private static boolean isCacheItemExpire(String key) {
Date lastAddedTime = cache.get(key).getLastAddedTime();
Date currentTime = DateUtil.date();
long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND);
return betweenSecond > MAX_CACHE_TIME;
}
/**
* Date 类型的日期格式化为 "yyyy-01-01" 格式的字符串
*
* @param date 需要格式化的日期对象
* @return 格式化后的日期字符串
*/
private static String formatDate(Date date) {
// 使用 DateUtil 工具类将 date 格式化为 "yyyy-01-01" 格式的字符串
return DateUtil.format(date, "yyyy-01-01");
}
}

70
zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java

@ -0,0 +1,70 @@
package com.zc.business.statistics.handler;
import com.ruoyi.common.utils.DateUtils;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
public class RealtimeTrafficStatistics {
/**
* 对给定的交通数据集合进行统计分析返回一个综合交通数据对象
*
* @param dataCollection 交通数据集合不可为null或空包含多个交通路段的详细数据
* @param trafficDataPeriodType 交通数据的时段类型例如小时周等
* @return 综合交通数据对象包含车流量总和平均车速等统计结果如果输入数据为空则返回null
*/
public static DcTrafficSectionData trafficStatistics(Collection<DcTrafficSectionData> dataCollection, TrafficDataPeriodTypeEnum trafficDataPeriodType) {
// 判断输入数据是否为空
if (CollectionUtils.isEmpty(dataCollection)) {
return null;
}
// 创建一个汇总统计用的对象
DcTrafficSectionData aggregatedData = new DcTrafficSectionData();
// 初始化车流量总和和计算平均车速所需的分子部分
int trafficVolume = 0;
double numerator = 0;
// 遍历原始数据列表,累加车流量并计算平均车速
for (DcTrafficSectionData data: dataCollection) {
// 累加车流量
trafficVolume += data.getTrafficVolume();
// 计算分子部分
numerator += data.getAverageSpeed() * data.getTrafficVolume();
}
// 使用第一个数据项的信息填充汇总统计对象的基本属性(设备ID、桩号、方向)
DcTrafficSectionData firstDcTrafficSectionData = dataCollection.iterator().next();
// 设备id
aggregatedData.setDeviceId(firstDcTrafficSectionData.getDeviceId());
// 桩号
aggregatedData.setStakeMark(firstDcTrafficSectionData.getStakeMark());
// 道路方向
aggregatedData.setDirection(firstDcTrafficSectionData.getDirection());
// 计算平均车速并设置到汇总统计对象中
if (trafficVolume != 0) {
aggregatedData.setAverageSpeed((int) Math.round(numerator / trafficVolume));
} else {
// 若车流量为0,则默认设置平均车速为0
aggregatedData.setAverageSpeed(0);
}
// 时段类型
aggregatedData.setPeriodType(trafficDataPeriodType);
// 设置统计时间
aggregatedData.setStatisticalDate(firstDcTrafficSectionData.getStatisticalDate(), trafficDataPeriodType);
// 车流量
aggregatedData.setTrafficVolume(trafficVolume);
// 更新或插入操作
aggregatedData.setUpdateTime(DateUtils.getNowDate());
// 生成主键
aggregatedData.generateUniqueId();
return aggregatedData;
}
}

44
zc-business/src/main/resources/mapper/business/DcTrafficSectionDataMapper.xml

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.zc.business.mapper.DcTrafficSectionDataMapper">
<!-- 插入或更新交通路段数据 -->
<insert id="insertOrUpdate" parameterType="com.zc.business.domain.DcTrafficSectionData">
INSERT INTO
dc_traffic_section_data
(
id,
traffic_volume,
average_speed,
device_id,
statistical_date,
direction,
period_type,
stake_mark,
create_time
)
VALUES
(
#{id},
#{trafficVolume},
#{averageSpeed},
#{deviceId},
#{statisticalDate},
#{direction},
#{periodType},
#{stakeMark},
NOW())
ON DUPLICATE KEY UPDATE
traffic_volume = VALUES(traffic_volume),
average_speed = VALUES(average_speed),
device_id = VALUES(device_id),
statistical_date = VALUES(statistical_date),
direction = VALUES(direction),
period_type = VALUES(period_type),
stake_mark = VALUES(stake_mark),
update_time = NOW()
</insert>
</mapper>
Loading…
Cancel
Save