diff --git a/zc-business/src/main/java/com/zc/business/constant/DeviceFunctionIdConstants.java b/zc-business/src/main/java/com/zc/business/constant/DeviceFunctionIdConstants.java index 3231d3cd..46579be7 100644 --- a/zc-business/src/main/java/com/zc/business/constant/DeviceFunctionIdConstants.java +++ b/zc-business/src/main/java/com/zc/business/constant/DeviceFunctionIdConstants.java @@ -28,4 +28,9 @@ public class DeviceFunctionIdConstants { * 可变信息标志 1B 功能码 */ public static final String VARIABLE_INFORMATION_FLAG_1B = "1B"; + + /** + * 激光疲劳唤醒 SETTM 功能码 + */ + public static final String VARIABLE_INFORMATION_FLAG_SETTM = "SETTM"; } diff --git a/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java b/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java new file mode 100644 index 00000000..3edeb8e9 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java @@ -0,0 +1,154 @@ +package com.zc.business.domain; + +import cn.hutool.core.date.DateUtil; +import com.baomidou.mybatisplus.annotation.TableId; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import lombok.Data; +import org.apache.commons.codec.digest.DigestUtils; + +import java.util.Date; +import java.util.Objects; + +/** + * 交通断面数据统计定义 + * @author xiepufeng + */ +@Data +public class DcTrafficSectionData { + + /** + * 主键 + */ + @TableId + private String id; + + /** + * 车流量 + */ + private Integer trafficVolume; + + /** + * 平均速度 + */ + private Integer averageSpeed; + + /** + * 所属设备 + */ + private Long deviceId; + + /** + * 统计时间 + */ + private Date statisticalDate; + + /** + * 道路方向 + */ + private Byte direction; + + /** + * 时段类型 + * 1-年 2-月 3-季 4-日 + */ + private Byte periodType; + + + /** + * 所在桩号 + */ + private Integer stakeMark; + + + /** 创建时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date createTime; + + + /** 更新时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date updateTime; + + /** + * 设置交通数据的统计周期类型。 + * @param periodType 统计周期类型的枚举值。 + */ + public void setPeriodType(TrafficDataPeriodTypeEnum periodType) { + this.periodType = periodType.getCode(); // 将枚举类型转换为代码值存储 + } + + + /** + * 重写equals方法,用于比较两个对象是否相等。 + * @param o 要与当前对象比较的对象。 + * @return 如果两个对象相等,则返回true;否则返回false。 + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; // 同一对象比较,直接返回true + if (o == null || getClass() != o.getClass()) return false; // 对象为空或类型不同,返回false + DcTrafficSectionData that = (DcTrafficSectionData) o; // 类型转换 + return Objects.equals(deviceId, that.deviceId) && + Objects.equals(statisticalDate, that.statisticalDate) && + Objects.equals(direction, that.direction) && + Objects.equals(periodType, that.periodType) && + Objects.equals(stakeMark, that.stakeMark); // 比较各属性值 + } + + /** + * 重写hashCode方法,基于对象的属性生成哈希码。 + * @return 对象的哈希码值。 + */ + @Override + public int hashCode() { + return Objects.hash(deviceId, statisticalDate, direction, periodType, stakeMark); + } + + /** + * 设置统计日期,根据不同的统计周期类型来调整日期,使其对应周期的起始日期。 + * @param statisticalDate 统计日期,原始日期。 + */ + public void setStatisticalDate(Date statisticalDate) { + TrafficDataPeriodTypeEnum typeEnum = TrafficDataPeriodTypeEnum.valueOfCode(periodType); + setStatisticalDate(statisticalDate, typeEnum); + } + + /** + * 根据给定的统计周期类型和日期,设置统计日期为相应周期的起始日期。 + * @param statisticalDate 原始统计日期。 + * @param typeEnum 统计周期类型。 + */ + public void setStatisticalDate(Date statisticalDate, TrafficDataPeriodTypeEnum typeEnum) { + switch (typeEnum) { + case DAY: + // 设置为当天的起始时间 + this.statisticalDate = DateUtil.beginOfDay(statisticalDate); + break; + case MONTH: + // 设置为当月的起始日期 + this.statisticalDate = DateUtil.beginOfMonth(statisticalDate); + break; + case QUARTER: + // 设置为当季度的起始日期 + this.statisticalDate = DateUtil.beginOfQuarter(statisticalDate); + break; + case YEAR: + // 设置为当年的起始日期 + this.statisticalDate = DateUtil.beginOfYear(statisticalDate); + break; + default: + // 如果不是预定义的周期类型,则不做任何处理 + this.statisticalDate = statisticalDate; + } + } + + /** + * 根据设备ID、统计时间、方向、时段类型、桩号生成一个唯一ID + */ + public void generateUniqueId() { + String combinedAttributes = deviceId + "_" + DateUtil.format(statisticalDate, "yyyyMMdd_HHmmss") + "_" + direction + "_" + periodType + "_" + stakeMark; + this.id = DigestUtils.md5Hex(combinedAttributes); + } + +} diff --git a/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java b/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java new file mode 100644 index 00000000..82b9fe09 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java @@ -0,0 +1,49 @@ +package com.zc.business.enums; + +import lombok.Getter; + +/** + * 定义一个枚举类型 TrafficDataPeriodTypeEnum,用于表示周期类型(年、月、季、日) + */ +@Getter +public enum TrafficDataPeriodTypeEnum { + // 枚举成员:年,关联字节型代码 1 和描述 "年" + YEAR((byte) 1, "年"), + + // 枚举成员:月,关联字节型代码 2 和描述 "月" + MONTH((byte) 2, "月"), + + // 枚举成员:季,关联字节型代码 3 和描述 "季" + QUARTER((byte) 3, "季"), + + // 枚举成员:日,关联字节型代码 4 和描述 "日" + DAY((byte) 4, "日"); + + // 每个枚举成员的属性:代码,存储对应的字节型数值 + private final Byte code; + + // 每个枚举成员的属性:描述,存储该周期类型的中文描述 + private final String description; + + // 构造方法,初始化每个枚举成员的代码和描述信息 + TrafficDataPeriodTypeEnum(Byte code, String description) { + this.code = code; + this.description = description; + } + + // 反向查找方法,根据给定的字节型代码查找对应的枚举值 + // 如果找到匹配项,则返回该枚举类型;否则抛出IllegalArgumentException异常 + public static TrafficDataPeriodTypeEnum valueOfCode(Byte code) { + // 遍历所有PeriodTypeEnum枚举值 + for (TrafficDataPeriodTypeEnum type : values()) { + // 如果当前枚举类型的code与输入的code相等 + if (type.getCode().equals(code)) { + // 返回找到的枚举类型 + return type; + } + } + + // 如果循环结束都没有找到匹配的code,则抛出异常,说明提供的code非法 + throw new IllegalArgumentException("无效的周期类型代码: " + code); + } +} diff --git a/zc-business/src/main/java/com/zc/business/mapper/DcTrafficSectionDataMapper.java b/zc-business/src/main/java/com/zc/business/mapper/DcTrafficSectionDataMapper.java new file mode 100644 index 00000000..7825c2c0 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/mapper/DcTrafficSectionDataMapper.java @@ -0,0 +1,22 @@ +package com.zc.business.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.zc.business.domain.DcTrafficSectionData; +import org.apache.ibatis.annotations.Mapper; + +/** + * 交通断面数据统计Mapper接口 + * + * @author xiepufeng + */ +@Mapper +public interface DcTrafficSectionDataMapper extends BaseMapper { + + /** + * 插入或更新交通路段数据。 + * + * @param trafficSectionData 交通路段数据对象,包含需要插入或更新的数据。 + * @return 返回一个布尔值,表示操作是否成功。true表示插入或更新成功,false表示失败。 + */ + boolean insertOrUpdate(DcTrafficSectionData trafficSectionData); +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java index 45213776..eec32d66 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java +++ b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java @@ -1,32 +1,30 @@ package com.zc.business.message.device.handler; +import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.ruoyi.common.core.redis.RedisCache; import com.zc.business.constant.RedisKeyConstants; import com.zc.business.domain.DcDevice; +import com.zc.business.domain.DcMeteorologicalDetectorData; import com.zc.business.domain.DcWarning; +import com.zc.business.domain.MdDeviceData; import com.zc.business.enums.IotProductEnum; import com.zc.business.enums.WarningSourceEnum; import com.zc.business.enums.WarningStateEnum; import com.zc.business.enums.WarningSubclassEnum; -import com.zc.business.service.IDcDeviceService; -import com.zc.business.service.IDcWarningService; -import com.zc.business.service.IMiddleDatabaseService; +import com.zc.business.service.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -53,8 +51,14 @@ public class DeviceMessageHandler { @Resource private RedisCache redisCache; - @Value("${iot.address}") - private String iotAddress; + @Resource + private DcTrafficSectionDataService dcTrafficSectionDataService; + + @Autowired + private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; + + @Autowired + private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService; /** * 更新设备状态 @@ -99,6 +103,18 @@ public class DeviceMessageHandler { // 摄像头检测事件 if (IotProductEnum.CAMERA_DETECTION_EVENT.value().equals(productId)) { this.cameraDetectionEventHandle(data); + return; + } + + // 一站式情况调查站 + if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)) { + oneStopDeviceMessageHandle(data); + return; + } + + // 一站式情况调查站 + if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)) { + weatherDetectorMessageHandle(data); } } @@ -159,7 +175,7 @@ public class DeviceMessageHandler { String title = stakeMarkDescription + WarningSubclassEnum.getDecorateInfo(warningSubclass); // 标题 dcWarning.setWarningTitle(title); - dcWarning.setRemark(convertTimestampToString(captureTime) + " " + title); + dcWarning.setRemark(DateUtil.formatDateTime(DateUtil.date(captureTime)) + " " + title); // 影响车道 dcWarning.setLane(String.valueOf(data.getInteger("relatedLaneNo"))); // 物联设备id @@ -309,23 +325,49 @@ public class DeviceMessageHandler { } /** - * 将毫秒级时间戳转换为"yyyy-MM-dd HH:mm:ss"格式的字符串 - * - * @param timestampInMillis 毫秒级时间戳 - * @return 格式化后的日期时间字符串 + * 一站式情况调查站设备消息处理入口 + * @param msg 设备消息 */ - public static String convertTimestampToString(long timestampInMillis) { - // 转换为Instant对象 - Instant instant = Instant.ofEpochMilli(timestampInMillis); - - // 转换为LocalDateTime(默认时区) - LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + private void oneStopDeviceMessageHandle(JSONObject msg) { + dcTrafficSectionDataService.processRealtimeMessage(msg); + } - // 创建DateTimeFormatter实例并指定输出格式 - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - // 格式化LocalDateTime对象为字符串 - return localDateTime.format(formatter); + /** + * 气象检测器消息处理入口 + * @param msg 设备消息 + */ + private void weatherDetectorMessageHandle(JSONObject msg) { + + DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) msg.get("properties"); + meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString()); + meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); + + //设计院中间库 插入设备数据 + MdDeviceData mdDeviceData = new MdDeviceData(); + mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId()); + mdDeviceData.setDevType("3"); + mdDeviceData.setTimeStamp(new Date()); + mdDeviceData.setCreatorUserId("自动存储"); + + Map expands = new HashMap<>(); + expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量 + expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速 + expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向 + expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度 + expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度 + expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压 + expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑 + expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态 + expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度 + expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度 + expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度 + expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度 + expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况 + + mdDeviceData.setExpands(JSONObject.toJSONString(expands)); + + middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData); } } diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java index 67981ab2..214489bd 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java @@ -1,12 +1,11 @@ package com.zc.business.message.device.listener; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.zc.business.message.device.handler.DeviceMessageHandler; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -21,9 +20,8 @@ import javax.annotation.Resource; @Component public class DeviceEventListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class); - @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Resource diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java index 11cb395e..c8b4b8fd 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DeviceFunctionReplyListener.java @@ -4,6 +4,7 @@ import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -15,9 +16,8 @@ import org.springframework.stereotype.Component; @Component public class DeviceFunctionReplyListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class); - @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Override diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java index ddac0aa5..fc411370 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReadReplyListener.java @@ -4,6 +4,7 @@ import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -15,9 +16,8 @@ import org.springframework.stereotype.Component; @Component public class DevicePropertyReadReplyListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class); - @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Override diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java index 49fe359c..24dca16a 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java @@ -7,6 +7,7 @@ import com.zc.business.domain.DcDevice; import com.zc.business.domain.DcMeteorologicalDetectorData; import com.zc.business.domain.MdDeviceData; import com.zc.business.enums.IotProductEnum; +import com.zc.business.message.device.handler.DeviceMessageHandler; import com.zc.business.service.IDcMeteorologicalDetectorDataService; import com.zc.business.service.IMiddleDatabaseService; import com.zc.business.service.IDcYzsqbdcHmbldDataService; @@ -14,6 +15,7 @@ import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -32,68 +34,21 @@ import java.util.Map; @Component public class DevicePropertyReportListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class); @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Resource private ThreadPoolTaskExecutor threadPoolTaskExecutor; - @Autowired - private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; - @Autowired - private IMiddleDatabaseService middleDatabaseService; - @Autowired - private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService; + @Resource + private DeviceMessageHandler deviceMessageHandler; @Override public void onMessage(ObjectRecord message) { String streamKay = message.getStream(); RecordId recordId = message.getId(); - - threadPoolTaskExecutor.execute(() -> { - Map data = JSON.parseObject(message.getValue(), HashMap.class); - Map headers = (Map) data.get("headers"); - if (headers.get("productId") != null){ - String productId = headers.get("productId").toString(); - //气象检测器 - if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)){ - DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) data.get("properties"); - meteorologicalDetectorData.setIotDeviceId(data.get("deviceId").toString()); - meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); - - //设计院中间库 插入设备数据 - MdDeviceData mdDeviceData = new MdDeviceData(); - mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId()); - mdDeviceData.setDevType("3"); - mdDeviceData.setTimeStamp(new Date()); - mdDeviceData.setCreatorUserId("自动存储"); - - Map expands = new HashMap<>(); - expands.put("rainFall",meteorologicalDetectorData.getRainfall()); //雨量 - expands.put("windSpeed",meteorologicalDetectorData.getWindSpeed()); //风速 - expands.put("windDirection",meteorologicalDetectorData.getWindDirection()); //风向 - expands.put("temperature",meteorologicalDetectorData.getTemperature()); //大气温度 - expands.put("humidity",meteorologicalDetectorData.getHumidity()); //大气湿度 - expands.put("airPressure",meteorologicalDetectorData.getAtmosphericPressure()); //数字气压 - expands.put("wet",meteorologicalDetectorData.getWetSlipperyCoefficient()); //湿滑 - expands.put("rainXingTai",meteorologicalDetectorData.getPrecipitationType()); //雨量降水形态 - expands.put("visibility",meteorologicalDetectorData.getVisibility()); //能见度 - expands.put("pathContactLu",meteorologicalDetectorData.getRoadSurfaceTemperature()); //路面温度 - expands.put("pathContactBing",meteorologicalDetectorData.getFreezingPointTemperature()); //冰点温度 - expands.put("pathContactYan",meteorologicalDetectorData.getSalinityValue()); //路面盐度 - expands.put("pathContactZhuang",meteorologicalDetectorData.getRemoteRoadSurfaceStatus()); //路面状况 - - mdDeviceData.setExpands(JSONObject.toJSONString(expands)); - - middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData); - } else if (IotProductEnum.ONE_STOP_PRODUCT.value().equals(productId)){ - //交调 - dcYzsqbdcHmbldDataService.addDcYzsqbdcHmbldDataList(data); - - } - } - }); + threadPoolTaskExecutor.execute(() -> deviceMessageHandler.handle(message.getValue())); // 消费完后直接删除消息 redisStream.del(streamKay, String.valueOf(recordId)); diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java index 0ef5b4f4..2c1c95c0 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyWriteReplyListener.java @@ -4,6 +4,7 @@ import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -15,9 +16,8 @@ import org.springframework.stereotype.Component; @Component public class DevicePropertyWriteReplyListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class); - @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Override diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java index f4e24c6c..a281364d 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/OfflineMessageListener.java @@ -3,10 +3,11 @@ package com.zc.business.message.device.listener; import com.alibaba.fastjson.JSON; import com.zc.business.domain.DcDevice; import com.zc.business.message.device.handler.DeviceMessageHandler; -import com.zc.business.service.IDcDeviceService; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -15,7 +16,6 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; -import java.util.stream.Collectors; /** * 离线消息监听 @@ -23,9 +23,8 @@ import java.util.stream.Collectors; @Component public class OfflineMessageListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class); - - @Resource + @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Resource diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java index 51123731..b4978665 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/OnlineMessageListener.java @@ -8,6 +8,7 @@ import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -23,9 +24,8 @@ import java.util.List; @Component public class OnlineMessageListener implements StreamListener> { - private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class); - @Autowired + @Qualifier("iotRedisStream") private RedisStream redisStream; @Resource diff --git a/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java b/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java new file mode 100644 index 00000000..868f01ca --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java @@ -0,0 +1,15 @@ +package com.zc.business.service; + +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.extension.service.IService; +import com.zc.business.domain.DcTrafficSectionData; + +public interface DcTrafficSectionDataService extends IService { + + /** + * 处理实时接收到的设备消息,并将其转换为交通断面统计数据对象并缓存。 + * + * @param msg 设备发送的JSON格式实时消息 + */ + void processRealtimeMessage(JSONObject msg); +} diff --git a/zc-business/src/main/java/com/zc/business/service/impl/DcEmergencyPlansServiceImpl.java b/zc-business/src/main/java/com/zc/business/service/impl/DcEmergencyPlansServiceImpl.java index 48a6c287..4bae8457 100644 --- a/zc-business/src/main/java/com/zc/business/service/impl/DcEmergencyPlansServiceImpl.java +++ b/zc-business/src/main/java/com/zc/business/service/impl/DcEmergencyPlansServiceImpl.java @@ -21,6 +21,8 @@ import com.zc.business.service.DcEmergencyPlansService; import com.zc.business.service.DcExecuteActionService; import com.zc.business.service.IDcDeviceService; import com.zc.common.core.httpclient.exception.HttpException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -31,6 +33,7 @@ import java.util.*; import java.util.stream.Collectors; @Service +@Slf4j public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { @Resource @@ -46,6 +49,9 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { @Resource private EventPlanAssocMapper eventPlanAssocMapper; + @Resource + private ThreadPoolTaskExecutor threadPoolTaskExecutor; + /** * 查询事件预案 @@ -103,7 +109,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { JSONObject triggerJson = JSONObject.parseObject(triggerMechanism); String locationType = triggerJson.get("locationType").toString(); DcEventVehicleAccident dcEventVehicleAccident = event.getDcEventVehicleAccident(); - String eventLocationType =dcEventVehicleAccident.getLocationType().toString(); + String eventLocationType = dcEventVehicleAccident.getLocationType().toString(); return locationType.equals(eventLocationType); }) .collect(Collectors.toList()); @@ -141,7 +147,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { int warningType = Integer.parseInt(dcWarning.getWarningType().toString()); - if (warningType == WarningTypeEnum.UNUSUAL_WEATHER.getCode()) { + if (warningType == WarningTypeEnum.UNUSUAL_WEATHER.getCode()) { return dcEmergencyPlansList.stream() .filter(dcEmergencyPlans -> { String triggerMechanism = dcEmergencyPlans.getTriggerMechanism(); @@ -163,13 +169,13 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { * 交通事件-情报板确认回显原始模板 */ @Override - public Map> eventBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) { + public Map> eventBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) { // 获取事件数据 DcEvent dcEvent = dcEventAnDcEmergencyPlans.getDcEvent(); // 方向 String direction = dcEvent.getDirection(); // 事件桩号 - dcEvent.setStakeMark(dcEvent.getStakeMark().replace("K","")); + dcEvent.setStakeMark(dcEvent.getStakeMark().replace("K", "")); String[] markArray = dcEvent.getStakeMark().split("\\+"); if (markArray[1].length() < 3) { // 不足三位 补零 @@ -184,13 +190,13 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { * 感知事件-情报板确认回显原始模板 */ @Override - public Map> warningBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) { + public Map> warningBoardConfirm(DcEventAnDcEmergencyPlans dcEventAnDcEmergencyPlans) { // 获取事件数据 DcWarning dcWarning = dcEventAnDcEmergencyPlans.getDcWarning(); // 方向 String direction = dcWarning.getDirection(); // 事件桩号 - dcWarning.setStakeMark(dcWarning.getStakeMark().replace("K","")); + dcWarning.setStakeMark(dcWarning.getStakeMark().replace("K", "")); String[] markArray = dcWarning.getStakeMark().split("\\+"); if (markArray[1].length() < 3) { // 不足三位 补零 @@ -204,48 +210,50 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { /** * 情报板设备执行3A功能,获取模板 */ - public Map> getBoardTemplate(List dcDevices) { - Map> map = new HashMap<>(); + public Map> getBoardTemplate(List dcDevices) { + Map> map = new HashMap<>(); dcDevices.forEach(dcDevice -> { - try { - if (StringUtils.isEmpty(dcDevice.getIotDeviceId())) { - return; - } - AjaxResult ajaxResult = dcDeviceController.getDeviceRealtimeProperty(dcDevice.getIotDeviceId(), "3A", new HashMap<>()); - if (ajaxResult.get("code").equals(200)) { - JSONObject properties = JSON.parseObject(JSON.parseObject(ajaxResult.get("data").toString()).get("3A").toString()); - JSONArray contentArray = JSONArray.parseArray(properties.get("content").toString()); - List list = new ArrayList<>(); - contentArray.forEach(content -> { - DcInfoBoardTemplate dcInfoBoardTemplate = new DcInfoBoardTemplate(); - JSONObject jsonObject = JSON.parseObject(content.toString()); - String displayAreaWidth = jsonObject.get("displayAreaWidth").toString(); - String displayAreaHeight = jsonObject.get("displayAreaHeight").toString(); - // 内容 - dcInfoBoardTemplate.setContent(jsonObject.get("textContent").toString()); - // 前景颜色 - dcInfoBoardTemplate.setFontColor(jsonObject.get("foregroundColor").toString()); - // 屏幕尺寸 - dcInfoBoardTemplate.setScreenSize(displayAreaWidth+"*"+displayAreaHeight); - // 字号 - dcInfoBoardTemplate.setFontSize(jsonObject.get("fontSize").toString()); - // 字体风格 - dcInfoBoardTemplate.setFontType(jsonObject.get("fontStyle").toString()); - // 字距 - dcInfoBoardTemplate.setFontSpacing(jsonObject.get("fontSpacing").toString()); - // 停留时间 - dcInfoBoardTemplate.setStopTime(jsonObject.get("residenceTime").toString()); - // 入屏方式 - dcInfoBoardTemplate.setInScreenMode(jsonObject.get("screenEntryMethod").toString()); - // 水平对齐 - dcInfoBoardTemplate.setFormatStyle(jsonObject.get("horizontalAlignment").toString()); - list.add(dcInfoBoardTemplate); - }); - map.put(dcDevice.getDeviceName(),list); + threadPoolTaskExecutor.execute(() -> { + try { + if (StringUtils.isEmpty(dcDevice.getIotDeviceId())) { + return; + } + AjaxResult ajaxResult = dcDeviceController.getDeviceRealtimeProperty(dcDevice.getIotDeviceId(), "3A", new HashMap<>()); + if (ajaxResult.get("code").equals(200)) { + JSONObject properties = JSON.parseObject(JSON.parseObject(ajaxResult.get("data").toString()).get("3A").toString()); + JSONArray contentArray = JSONArray.parseArray(properties.get("content").toString()); + List list = new ArrayList<>(); + contentArray.forEach(content -> { + DcInfoBoardTemplate dcInfoBoardTemplate = new DcInfoBoardTemplate(); + JSONObject jsonObject = JSON.parseObject(content.toString()); + String displayAreaWidth = jsonObject.get("displayAreaWidth").toString(); + String displayAreaHeight = jsonObject.get("displayAreaHeight").toString(); + // 内容 + dcInfoBoardTemplate.setContent(jsonObject.get("textContent").toString()); + // 前景颜色 + dcInfoBoardTemplate.setFontColor(jsonObject.get("foregroundColor").toString()); + // 屏幕尺寸 + dcInfoBoardTemplate.setScreenSize(displayAreaWidth + "*" + displayAreaHeight); + // 字号 + dcInfoBoardTemplate.setFontSize(jsonObject.get("fontSize").toString()); + // 字体风格 + dcInfoBoardTemplate.setFontType(jsonObject.get("fontStyle").toString()); + // 字距 + dcInfoBoardTemplate.setFontSpacing(jsonObject.get("fontSpacing").toString()); + // 停留时间 + dcInfoBoardTemplate.setStopTime(jsonObject.get("residenceTime").toString()); + // 入屏方式 + dcInfoBoardTemplate.setInScreenMode(jsonObject.get("screenEntryMethod").toString()); + // 水平对齐 + dcInfoBoardTemplate.setFormatStyle(jsonObject.get("horizontalAlignment").toString()); + list.add(dcInfoBoardTemplate); + }); + map.put(dcDevice.getDeviceName(), list); + } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); - } + }); }); return map; @@ -253,12 +261,13 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { /** * 执行操作中的规则筛选 + * * @param dcExecuteAction * @param markArray * @param direction * @return */ - public List ruleFiltering(DcExecuteAction dcExecuteAction,String[] markArray,String direction){ + public List ruleFiltering(DcExecuteAction dcExecuteAction, String[] markArray, String direction) { Integer searchRule = dcExecuteAction.getSearchRule(); List start = new ArrayList<>(); @@ -273,11 +282,10 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { // 根据设备id,获取设备集合 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); JSONObject otherConfig = JSON.parseObject(dcExecuteAction.getOtherConfig()); - List deviceList = (List)otherConfig.get("deviceList"); + List deviceList = (List) otherConfig.get("deviceList"); queryWrapper.in(DcDevice::getIotDeviceId, deviceList); dcDevices = dcDeviceService.list(queryWrapper); - } - else if (searchRule.equals(2)) { + } else if (searchRule.equals(2)) { // 事件上游最近 if (direction.equals("1")) { @@ -296,7 +304,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { } }); if (dcDevices.size() > 0) { - dcDevices = dcDevices.subList(0 , dcExecuteAction.getNumber()); + dcDevices = dcDevices.subList(0, dcExecuteAction.getNumber()); } } else { // 下行 取最小的几个 @@ -318,8 +326,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { } } - } - else if (searchRule.equals(3)) { + } else if (searchRule.equals(3)) { // 事件下游最近 if (direction.equals("1")) { // 上行 取最大的几个 @@ -360,8 +367,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { dcDevices = dcDevices.subList(0, dcExecuteAction.getNumber()); } } - } - else { + } else { // 最近公里数 Integer kilometers = Integer.parseInt(markArray[0].replaceAll("K", "")); // 根据事件桩号、公里数 计算出 桩号范围 @@ -395,11 +401,12 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { String direction = dcEvent.getDirection(); // 事件编号 String id = dcEvent.getId(); - return executionConfirmation(dcEventAnDcEmergencyPlans,dcEvent.getStakeMark(),direction,id); + return executionConfirmation(dcEventAnDcEmergencyPlans, dcEvent.getStakeMark(), direction, id); } /** * 感知事件-情报板自动生成 + * * @param dcEventAnDcEmergencyPlans * @return */ @@ -411,17 +418,17 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { Integer warningType = dcWarning.getWarningType(); if (warningType.equals(WarningTypeEnum.TRAFFIC_JAM.getCode())) { // 交通拥堵 - dcInfoBoardTemplate.setContent("前方"+WarningTypeEnum.TRAFFIC_JAM.getInfo()+"请谨慎驾驶"); - }else if (warningType.equals(WarningTypeEnum.NON_MOTOR_VEHICLE.getCode())) { - dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.NON_MOTOR_VEHICLE.getInfo()+"请注意避让"); - }else if (warningType.equals(WarningTypeEnum.PEDESTRIAN.getCode())) { - dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.PEDESTRIAN.getInfo()+"请注意避让"); - }else if (warningType.equals(WarningTypeEnum.FIREWORKS.getCode())) { - dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.FIREWORKS.getInfo()+"请注意避让"); - }else if (warningType.equals(WarningTypeEnum.OUTFALL.getCode())) { - dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.OUTFALL.getInfo()+"请注意避让"); + dcInfoBoardTemplate.setContent("前方" + WarningTypeEnum.TRAFFIC_JAM.getInfo() + "请谨慎驾驶"); + } else if (warningType.equals(WarningTypeEnum.NON_MOTOR_VEHICLE.getCode())) { + dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.NON_MOTOR_VEHICLE.getInfo() + "请注意避让"); + } else if (warningType.equals(WarningTypeEnum.PEDESTRIAN.getCode())) { + dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.PEDESTRIAN.getInfo() + "请注意避让"); + } else if (warningType.equals(WarningTypeEnum.FIREWORKS.getCode())) { + dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.FIREWORKS.getInfo() + "请注意避让"); + } else if (warningType.equals(WarningTypeEnum.OUTFALL.getCode())) { + dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.OUTFALL.getInfo() + "请注意避让"); } else if (warningType.equals(WarningTypeEnum.VEHICLE_CONVERSE_RUNNING.getCode())) { - dcInfoBoardTemplate.setContent("前方出现"+WarningTypeEnum.OUTFALL.getInfo()+"请注意避让"); + dcInfoBoardTemplate.setContent("前方出现" + WarningTypeEnum.OUTFALL.getInfo() + "请注意避让"); } return dcInfoBoardTemplate; @@ -429,6 +436,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { /** * 交通事件-情报板自动生成 + * * @return */ @Override @@ -475,7 +483,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { .orElse("前方存在限行或关闭"); dcInfoBoardTemplate.setContent(content); - }else { + } else { // 施工建设 dcInfoBoardTemplate.setContent("前方施工请注意驾驶"); } @@ -484,6 +492,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { /** * 感知事件确认 + * * @param dcEventAnDcEmergencyPlans 事件数据 和 事件预案数据 * @return */ @@ -496,11 +505,12 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { String direction = dcWarning.getDirection(); // 事件编号 String id = dcWarning.getId(); - return executionConfirmation(dcEventAnDcEmergencyPlans,dcWarning.getStakeMark(),direction,id); + return executionConfirmation(dcEventAnDcEmergencyPlans, dcWarning.getStakeMark(), direction, id); } /** * 事件确认 + * * @param dcEventAnDcEmergencyPlans 事件数据 和 事件预案数据 * @return */ @@ -513,7 +523,7 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { // 存储所有设备的执行结果 JSONArray resultArray = new JSONArray(); // 事件桩号 - stakeMark = stakeMark.replace("K",""); + stakeMark = stakeMark.replace("K", ""); String[] markArray = stakeMark.split("\\+"); if (markArray[1].length() < 3) { // 不足三位 补零 @@ -575,156 +585,175 @@ public class DcEmergencyPlansServiceImpl implements DcEmergencyPlansService { /** * 根据不通设备类型,执行不通的功能操作 */ - public void invokedFunction( - Integer operationType, + public void invokedFunction(Integer operationType, List dcDevices, JSONObject otherConfig, - JSONArray resultArray) throws HttpException, IOException { - String iotDeviceId = ""; - String functionId = ""; + JSONArray resultArray) { + for (DcDevice device : dcDevices) { - iotDeviceId = device.getIotDeviceId(); - HashMap props = new HashMap<>(); - - if (device.getDeviceType().equals(DeviceTypeConstants.DRIVING_GUIDANCE)) { - // 行车诱导 - functionId = DeviceFunctionIdConstants.DRIVING_GUIDANCE; - // 控制模式 1-手动 2-自动 3-万年历 - String controlModel = otherConfig.get("controlModel").toString(); - props.put("onWorkStatus", otherConfig.get("state").toString()); - props.put("inWorkStatus", otherConfig.get("state").toString()); - if (controlModel.equals("1")) { - props.put("mode", "00"); - } else if (controlModel.equals("2")) { - String startTime = otherConfig.get("startTime").toString(); - String endTime = otherConfig.get("endTime").toString(); - props.put("mode", "01"); - props.put("startDisplayTime", startTime); - props.put("endDisplayTime", endTime); - } else { - props.put("mode", "02"); - } - AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, functionId, props); - // 将调用结果存入到 resultArray(操作结果) 中 - JSONObject result = new JSONObject(); - result.put("device", device.getId()); - result.put("result", ajaxResult); - resultArray.add(result); + threadPoolTaskExecutor.execute(() -> { + String iotDeviceId = ""; + String functionId = ""; + iotDeviceId = device.getIotDeviceId(); + HashMap props = new HashMap<>(); + try { + if (device.getDeviceType().equals(DeviceTypeConstants.DRIVING_GUIDANCE)) { + // 行车诱导 + functionId = DeviceFunctionIdConstants.DRIVING_GUIDANCE; + // 控制模式 1-手动 2-自动 3-万年历 + String controlModel = otherConfig.get("controlModel").toString(); + props.put("onWorkStatus", otherConfig.get("state").toString()); + props.put("inWorkStatus", otherConfig.get("state").toString()); + if (controlModel.equals("1")) { + props.put("mode", "00"); + } else if (controlModel.equals("2")) { + String startTime = otherConfig.get("startTime").toString(); + String endTime = otherConfig.get("endTime").toString(); + props.put("mode", "01"); + props.put("startDisplayTime", startTime); + props.put("endDisplayTime", endTime); + } else { + props.put("mode", "02"); + } - } - else if (device.getDeviceType().equals(DeviceTypeConstants.VARIABLE_INFORMATION_FLAG)) { - if (operationType == 1) { - // 执行操作 - // 可变信息标志 分三步 - // 1:执行11功能码 - functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_11; - props.put("fileName","play011.lst"); - props.put("size","65535"); - AjaxResult ajaxResult11 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props); - if (ajaxResult11.get("code").equals(200)) { - // 2:执行13功能码 - HashMap props11 = new HashMap<>(); - functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_13; - List> list = new ArrayList<>(); - Map parameters = new HashMap<>(); - DcInfoBoardTemplate dcInfoBoardTemplate = JSON.parseObject( - JSON.toJSONString(otherConfig.get("dcInfoBoardTemplate")), - DcInfoBoardTemplate.class); - // stopTime - parameters.put("STAY",dcInfoBoardTemplate.getStopTime()); - // inScreenMode - parameters.put("ACTION",dcInfoBoardTemplate.getInScreenMode()); - // fontSpacing - parameters.put("SPEED",dcInfoBoardTemplate.getFontSpacing()); - // fontColor - parameters.put("COLOR",dcInfoBoardTemplate.getFontColor()); - // fontType - parameters.put("FONT",dcInfoBoardTemplate.getFontType()); - // fontSize - parameters.put("FONT_SIZE",dcInfoBoardTemplate.getFontSize()); - // content - parameters.put("CONTENT",dcInfoBoardTemplate.getContent()); - // screenSize 768*64 宽度和高度 - parameters.put("width",dcInfoBoardTemplate.getScreenSize().split("\\*")[0]); - parameters.put("height",dcInfoBoardTemplate.getScreenSize().split("\\*")[1]); - // formatStyle - parameters.put("formatStyle",dcInfoBoardTemplate.getFormatStyle()); - list.add(parameters); - props11.put("parameters",list); - AjaxResult ajaxResult13 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props11); + AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, functionId, props); + // 将调用结果存入到 resultArray(操作结果) 中 JSONObject result = new JSONObject(); - if (ajaxResult13.get("code").equals(200)) { - HashMap props1B = new HashMap<>(); - // 3: 执行1B功能码 + result.put("device", device.getId()); + result.put("result", ajaxResult); + resultArray.add(result); + + + } + else if (device.getDeviceType().equals(DeviceTypeConstants.VARIABLE_INFORMATION_FLAG)) { + if (operationType == 1) { + + // 执行操作 + // 可变信息标志 分三步 + // 1:执行11功能码 + functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_11; + props.put("fileName", "play011.lst"); + props.put("size", "65535"); + AjaxResult ajaxResult11; + ajaxResult11 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props); + if (ajaxResult11.get("code").equals(200)) { + // 2:执行13功能码 + HashMap props11 = new HashMap<>(); + functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_13; + List> list = new ArrayList<>(); + Map parameters = new HashMap<>(); + DcInfoBoardTemplate dcInfoBoardTemplate = JSON.parseObject( + JSON.toJSONString(otherConfig.get("dcInfoBoardTemplate")), + DcInfoBoardTemplate.class); + // stopTime + parameters.put("STAY", dcInfoBoardTemplate.getStopTime()); + // inScreenMode + parameters.put("ACTION", dcInfoBoardTemplate.getInScreenMode()); + // fontSpacing + parameters.put("SPEED", dcInfoBoardTemplate.getFontSpacing()); + // fontColor + parameters.put("COLOR", dcInfoBoardTemplate.getFontColor()); + // fontType + parameters.put("FONT", dcInfoBoardTemplate.getFontType()); + // fontSize + parameters.put("FONT_SIZE", dcInfoBoardTemplate.getFontSize()); + // content + parameters.put("CONTENT", dcInfoBoardTemplate.getContent()); + // screenSize 768*64 宽度和高度 + parameters.put("width", dcInfoBoardTemplate.getScreenSize().split("\\*")[0]); + parameters.put("height", dcInfoBoardTemplate.getScreenSize().split("\\*")[1]); + // formatStyle + parameters.put("formatStyle", dcInfoBoardTemplate.getFormatStyle()); + list.add(parameters); + props11.put("parameters", list); + AjaxResult ajaxResult13 = null; + ajaxResult13 = dcDeviceController.invokedFunction(iotDeviceId, functionId, props11); + JSONObject result = new JSONObject(); + if (ajaxResult13.get("code").equals(200)) { + HashMap props1B = new HashMap<>(); + // 3: 执行1B功能码 + functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_1B; + props1B.put("fileId", "11"); + AjaxResult ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props1B); + result.put("device", device.getId()); + result.put("result", ajaxResult1B); + resultArray.add(result); + + } else { + result.put("device", device.getId()); + result.put("result", ajaxResult13); + resultArray.add(result); + } + } + + } else { + // 恢复操作 + props.put("fileId", "10"); functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_1B; - props1B.put("fileId","11"); - AjaxResult ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props1B); + AjaxResult ajaxResult1B; + ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props); + + JSONObject result = new JSONObject(); result.put("device", device.getId()); result.put("result", ajaxResult1B); resultArray.add(result); - }else { - result.put("device", device.getId()); - result.put("result", ajaxResult13); - resultArray.add(result); } + } - } - else { - // 恢复操作 - props.put("fileId","10"); - functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_1B; - AjaxResult ajaxResult1B = dcDeviceController.invokedFunction(iotDeviceId, functionId, props); - JSONObject result = new JSONObject(); - result.put("device", device.getId()); - result.put("result", ajaxResult1B); - resultArray.add(result); - } + else if (device.getDeviceType().equals(DeviceTypeConstants.ROAD_SECTION_VOICE_BROADCASTING)) { + // 路段广播 + JSONObject params = new JSONObject(); + params.put("name", "task-event"); + params.put("outVVol", "8"); + params.put("priority", "1"); + params.put("text", otherConfig.get("content")); + params.put("repeatTimes", "3"); + params.put("functionType", "startPaTts"); + JSONArray termList = new JSONArray(); + termList.add(JSON.parseObject(device.getOtherConfig())); + params.put("termList", termList); + + JSONObject returnResult = broadcastController.nearCamListDistance(params); + JSONObject result = new JSONObject(); + result.put("device", device.getId()); + result.put("result", returnResult); + resultArray.add(result); - } - else if (device.getDeviceType().equals(DeviceTypeConstants.ROAD_SECTION_VOICE_BROADCASTING)) { - // 路段广播 - JSONObject params = new JSONObject(); - params.put("name","task-event"); - params.put("outVVol","8"); - params.put("priority","1"); - params.put("text",otherConfig.get("content")); - params.put("repeatTimes","3"); - params.put("functionType","startPaTts"); - JSONArray termList = new JSONArray(); - termList.add(JSON.parseObject(device.getOtherConfig())); - params.put("termList",termList); - JSONObject returnResult =broadcastController.nearCamListDistance(params); - JSONObject result = new JSONObject(); - result.put("device", device.getId()); - result.put("result", returnResult); - resultArray.add(result); - } - else if (device.getDeviceType().equals(DeviceTypeConstants.LASER_FATIGUE_AWAKENING)) { - // 激光疲劳唤醒 - functionId = otherConfig.get("state").toString(); - - AjaxResult ajaxResultState = dcDeviceController.invokedFunction(iotDeviceId, functionId, new HashMap<>()); - // 将调用结果存入到 resultArray(操作结果) 中 - JSONObject result = new JSONObject(); - result.put("device", device.getId()); - result.put("result", ajaxResultState); - resultArray.add(result); - // 操作时长 - String operationDuration = "SETTM" + otherConfig.get("operationDuration").toString(); - AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, operationDuration, new HashMap<>()); - JSONObject resultTime = new JSONObject(); - resultTime.put("device", device.getId()); - resultTime.put("result", ajaxResult); - resultArray.add(resultTime); - } else { - break; - } + } + else if (device.getDeviceType().equals(DeviceTypeConstants.LASER_FATIGUE_AWAKENING)) { + // 激光疲劳唤醒 + functionId = otherConfig.get("state").toString(); + + AjaxResult ajaxResultState = dcDeviceController.invokedFunction(iotDeviceId, functionId, new HashMap<>()); + JSONObject result = new JSONObject(); + result.put("device", device.getId()); + result.put("result", ajaxResultState); + resultArray.add(result); + + // 操作时长 + String operationDuration = "SETTM" + otherConfig.get("operationDuration").toString(); + HashMap propsTime = new HashMap<>(); + propsTime.put("SET", operationDuration); + functionId = DeviceFunctionIdConstants.VARIABLE_INFORMATION_FLAG_SETTM; + + AjaxResult ajaxResult = dcDeviceController.invokedFunction(iotDeviceId, functionId, propsTime); + JSONObject resultTime = new JSONObject(); + resultTime.put("device", device.getId()); + resultTime.put("result", ajaxResult); + resultArray.add(resultTime); + } + } catch (HttpException | IOException e) { + log.error(e.toString()); + throw new RuntimeException(e); + } + }); } + } diff --git a/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java b/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java new file mode 100644 index 00000000..1ee4e846 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java @@ -0,0 +1,166 @@ +package com.zc.business.service.impl; + +import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.zc.business.domain.DcTrafficSectionData; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import com.zc.business.statistics.cache.*; +import com.zc.business.mapper.DcTrafficSectionDataMapper; +import com.zc.business.service.DcTrafficSectionDataService; +import com.zc.business.statistics.handler.RealtimeTrafficStatistics; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +/** + * 交通断面数据服务实现类,负责处理实时设备消息、缓存数据、定时任务以及数据保存等功能。 + * + * @author xiepufeng + */ +@Service +public class DcTrafficSectionDataServiceImpl + extends ServiceImpl + implements DcTrafficSectionDataService { + + @Resource + private DcTrafficSectionDataMapper dcTrafficSectionDataMapper; + + /** + * 初始化方法,用于在对象创建后恢复各种周期的交通数据缓存。 + * 该方法标注了@PostConstruct注解,确保在依赖注入完成后调用。 + */ + @PostConstruct + public void init() { + // TODO 恢复每天交通数据缓存(es获取数据) + recoveryMonthlyCache(); // 恢复每月交通数据缓存 + recoveryQuarterlyCache(); // 恢复每季度交通数据缓存 + recoveryYearlyCache(); // 恢复每年交通数据缓存 + } + + /** + * 恢复每月交通数据缓存的方法。 + * 通过查询当前月份至今的每日交通数据,并将其添加到每月交通统计缓存中。 + */ + private void recoveryMonthlyCache() { + // 构建查询条件,查询当前月份至今的每日交通数据 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.DAY); + queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfMonth(new Date()), new Date()); + List dcTrafficSectionDataList = this.list(queryWrapper); + // 遍历查询结果,将每日数据添加到每月交通统计缓存 + dcTrafficSectionDataList.forEach(MonthlyTrafficStatisticsCache::addCacheData); + } + + /** + * 恢复每季度交通数据缓存的方法。 + * 通过查询当前季度至今的每月交通数据,并将其添加到每季度交通统计缓存中。 + */ + private void recoveryQuarterlyCache() { + // 构建查询条件,查询当前季度至今的每月交通数据 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.MONTH); + queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfQuarter(new Date()), new Date()); + List dcTrafficSectionDataList = this.list(queryWrapper); + // 遍历查询结果,将每月数据添加到每季度交通统计缓存 + dcTrafficSectionDataList.forEach(QuarterlyTrafficStatisticsCache::addCacheData); + } + + /** + * 恢复每年交通数据缓存的方法。 + * 通过查询当前年份至今的每季度交通数据,并将其添加到每年交通统计缓存中。 + */ + private void recoveryYearlyCache() { + // 构建查询条件,查询当前年份至今的每季度交通数据 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.QUARTER); + queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfYear(new Date()), new Date()); + List dcTrafficSectionDataList = this.list(queryWrapper); + // 遍历查询结果,将每季度数据添加到每年交通统计缓存 + dcTrafficSectionDataList.forEach(YearlyTrafficStatisticsCache::addCacheData); + } + + /** + * 处理实时接收到的设备消息,并将其转换为交通断面统计数据对象并缓存。 + * + * @param msg 设备发送的JSON格式实时消息 + */ + @Override + public void processRealtimeMessage(JSONObject msg) { + + // 1. 将设备消息转换为交通断面数据统计定义对象 + DcTrafficSectionData dcTrafficSectionData = convertToTrafficStatistics(msg); + + // 2. 将转换后的数据添加到缓存中 + DailyTrafficStatisticsCache.addCacheData(dcTrafficSectionData); + } + + /** + * 将设备实时消息转换为交通断面数据统计定义对象 + * + * @param msg JSON格式的设备实时消息 + * @return 转换后的交通断面数据统计定义对象 + */ + public DcTrafficSectionData convertToTrafficStatistics(JSONObject msg) { + DcTrafficSectionData dcTrafficSectionData = new DcTrafficSectionData(); + + // TODO + return dcTrafficSectionData; + } + + + /** + * 定义每小时第20分钟执行的任务,用于清除过期缓存数据并将缓存中的数据整合后保存至数据库。 + */ + @Scheduled(cron = "0 20 * * * ?") // 每小时的20分整点执行该任务 + public void performHourlyCleanupAndPersist() { + // 清除已过期的缓存数据 + DailyTrafficStatisticsCache.clearExpiredData(); + MonthlyTrafficStatisticsCache.clearExpiredData(); + QuarterlyTrafficStatisticsCache.clearExpiredData(); + YearlyTrafficStatisticsCache.clearExpiredData(); + // 整合缓存数据并保存至数据库 + // 将缓存中的数据按日统计后保存至数据库 + // 添加月交通断面数据到缓存中 + persistAggregatedData(DailyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.DAY, MonthlyTrafficStatisticsCache::addCacheData); + // 将缓存中的数据按月统计后保存至数据库 + // 添加季度交通断面数据到缓存中 + persistAggregatedData(MonthlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.MONTH, QuarterlyTrafficStatisticsCache::addCacheData); + // 将缓存中的数据按季度统计后保存至数据库 + // 添加年交通断面数据到缓存中 + persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData); + // 将缓存中的数据按年统计后保存至数据库 + persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {}); + } + + /** + * 将缓存中的数据统计后保存至数据库。 + */ + public void persistAggregatedData( + Map cache, + TrafficDataPeriodTypeEnum periodType, + Consumer consumer + ) { + for (AbstractTrafficStatisticsCache data : cache.values()) { + // 如果数据已经存储过,则跳过此次处理 + if (data.isStored()) { + continue; + } + DcTrafficSectionData aggregatedData = RealtimeTrafficStatistics.trafficStatistics(data.getData(), periodType); + if (dcTrafficSectionDataMapper.insertOrUpdate(aggregatedData)) { + // 设置数据已存储状态 + data.setStored(true); + // 调用回调函数 + consumer.accept(aggregatedData); + } + } + } + +} diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/AbstractTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/AbstractTrafficStatisticsCache.java new file mode 100644 index 00000000..c3e02751 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/AbstractTrafficStatisticsCache.java @@ -0,0 +1,52 @@ +package com.zc.business.statistics.cache; + +import com.zc.business.domain.DcTrafficSectionData; +import lombok.Getter; +import lombok.Setter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; + +/** + * 交通断面数据缓存定义 + * @author xiepufeng + */ +@Getter +@Setter +public abstract class AbstractTrafficStatisticsCache { + + // 日志记录器 + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** + * 缓存对象的键,由设备ID与统计日期组成 + */ + private String cacheKey; + + /** + * 设备上报日期字符串 + */ + private String statisticalDateStr; + + /** + * 数据最后添加到缓存中的时间 + */ + private Date lastAddedTime; + + /** + * 标记该缓存实例是否已存储过数据 + */ + private boolean stored; + + /** + * 存储具体交通断面数据的列表 + */ + private Collection data; + + public AbstractTrafficStatisticsCache() { + this.data = new HashSet<>(); + } +} diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java new file mode 100644 index 00000000..f2cc4f26 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java @@ -0,0 +1,137 @@ +package com.zc.business.statistics.cache; + +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import com.zc.business.domain.DcTrafficSectionData; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import lombok.Getter; +import lombok.Setter; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * + * 以天为单位的交通交通断面数据缓存类,用于存储和管理设备上报的交通断面统计数据,同时提供了数据缓存的有效性管理和清理功能。 + * @author xiepufeng + */ +@Getter +@Setter +public class DailyTrafficStatisticsCache extends AbstractTrafficStatisticsCache { + + @Getter + // 静态缓存容器,使用ConcurrentHashMap保证线程安全 + private static final Map cache = new ConcurrentHashMap<>(); + + // 最大缓存时间(单位:秒) + private static final long MAX_CACHE_TIME = 25 * 60 * 60; // 缓存数据最长保留25小时 + + // 最大容量限制(一个设备),防止内存溢出 + private static final int MAX_CAPACITY = 60/5 * (24 + 1) + 1000; // 缓存的最大条目数 + + // 私有构造函数,确保只能通过静态方法获取实例 + private DailyTrafficStatisticsCache() { + } + + /** + * 添加交通断面数据到缓存中 + * + * @param dcTrafficSectionData 待添加的交通断面统计数据 + */ + public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) { + // 获取或新建对应的缓存实例 + DailyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData); + + // 检查缓存容量是否达到上限 + if (instance.getData().size() >= MAX_CAPACITY) { + instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId()); + return; + } + + // 更新最后添加时间 + instance.setLastAddedTime(DateUtil.date()); + + // 更新状态 + instance.setStored(false); + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + String key = generateCacheKey(dcTrafficSectionData); + + // 设置key和thatDay属性(仅在首次添加时) + if (instance.getCacheKey() == null) { + instance.setCacheKey(key); + instance.setStatisticalDateStr(formattedDate); + } + + dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.DAY); + + // 将新数据添加到数据列表中 + instance.getData().add(dcTrafficSectionData); + } + + /** + * 获取或创建对应设备与日期的DcTrafficSectionDataCache实例 + * + * @param dcTrafficSectionData 交通断面数据统计定义 + * @return 对应的交通断面数据缓存实例 + */ + private static DailyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) { + // 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例 + return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new DailyTrafficStatisticsCache()); + } + + /** + * 生成缓存键。 + * 该方法通过设备ID、统计日期和道路方向生成一个唯一的缓存键,用于存储或检索特定条件下的交通数据。 + * + * @param dcTrafficSectionData 包含设备ID、统计日期和道路方向的交通段数据对象。 + * @return 返回一个字符串形式的缓存键,由设备ID、格式化的统计日期和道路方向以"|"字符连接而成。 + */ + public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) { + // 获取设备ID + Long deviceId = dcTrafficSectionData.getDeviceId(); + + // 获取并格式化统计日期 + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + + // 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键 + return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection(); + } + + /** + * 清除所有过期的交通断面数据缓存项 + */ + public static void clearExpiredData() { + // 使用stream API找出所有过期的数据缓存项键 + Set keysToRemove = cache.keySet().stream() + .filter(DailyTrafficStatisticsCache::isCacheItemExpire) + .collect(Collectors.toSet()); + + // 安全地从缓存中删除这些过期项 + keysToRemove.forEach(cache::remove); + } + + /** + * 检查给定缓存键所对应的缓存项是否已经过期 + * + * @param key 缓存key + * @return 如果已过期则返回true,否则返回false + */ + private static boolean isCacheItemExpire(String key) { + Date lastAddedTime = cache.get(key).getLastAddedTime(); + Date currentTime = DateUtil.date(); + long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND); + return betweenSecond > MAX_CACHE_TIME; + } + + /** + * 将 Date 类型的日期格式化为指定格式的字符串。 + * + * @param date 需要格式化的 Date 对象。 + * @return 格式化后的日期字符串。 + */ + private static String formatDate(Date date) { + // 使用 DateUtil 工具类将 date 格式化为指定格式的字符串 + return DateUtil.formatDate(date); + } +} diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java new file mode 100644 index 00000000..41defc79 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java @@ -0,0 +1,137 @@ +package com.zc.business.statistics.cache; + +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import com.zc.business.domain.DcTrafficSectionData; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import lombok.Getter; +import lombok.Setter; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * 以月为单位的交通交通断面数据缓存类,用于存储和管理设备上报的交通断面统计数据,同时提供了数据缓存的有效性管理和清理功能。 + * @author xiepufeng + */ +@Getter +@Setter +public class MonthlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache { + + // 静态缓存容器,使用ConcurrentHashMap保证线程安全 + @Getter + private static final Map cache = new ConcurrentHashMap<>(); + + // 最大缓存时间(单位:秒) + private static final long MAX_CACHE_TIME = (60 * 60) * (31 * 24 + 1); + + // 最大容量限制,防止内存溢出 + private static final int MAX_CAPACITY = 31 + 1000; // 缓存的最大条目数 + + + // 私有构造函数,确保只能通过静态方法获取实例 + private MonthlyTrafficStatisticsCache() { + } + + /** + * 添加交通断面数据到缓存中 + * + * @param dcTrafficSectionData 待添加的交通断面统计数据 + */ + public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) { + // 获取或新建对应的缓存实例 + MonthlyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData); + + // 检查缓存容量是否达到上限 + if (instance.getData().size() >= MAX_CAPACITY) { + instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId()); + return; + } + + // 更新最后添加时间 + instance.setLastAddedTime(DateUtil.date()); + + // 更新状态 + instance.setStored(false); + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + String key = generateCacheKey(dcTrafficSectionData); + + // 设置key和thatDay属性(仅在首次添加时) + if (instance.getCacheKey() == null) { + instance.setCacheKey(key); + instance.setStatisticalDateStr(formattedDate); + } + + dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.MONTH); + + // 将新数据添加到数据列表中 + instance.getData().add(dcTrafficSectionData); + } + + /** + * 获取或创建对应设备与日期的DcTrafficSectionDataCache实例 + * + * @param dcTrafficSectionData 交通断面数据统计定义 + * @return 对应的交通断面数据缓存实例 + */ + private static MonthlyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) { + // 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例 + return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new MonthlyTrafficStatisticsCache()); + } + + /** + * 生成缓存键。 + * 该方法通过设备ID、统计日期和道路方向生成一个唯一的缓存键,用于存储或检索特定条件下的交通数据。 + * + * @param dcTrafficSectionData 包含设备ID、统计日期和道路方向的交通段数据对象。 + * @return 返回一个字符串形式的缓存键,由设备ID、格式化的统计日期和道路方向以"|"字符连接而成。 + */ + public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) { + // 获取设备ID + Long deviceId = dcTrafficSectionData.getDeviceId(); + + // 获取并格式化统计日期 + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + + // 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键 + return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection(); + } + + /** + * 清除所有过期的交通断面数据缓存项 + */ + public static void clearExpiredData() { + // 使用stream API找出所有过期的数据缓存项键 + Set keysToRemove = cache.keySet().stream() + .filter(MonthlyTrafficStatisticsCache::isCacheItemExpire) + .collect(Collectors.toSet()); + + // 安全地从缓存中删除这些过期项 + keysToRemove.forEach(cache::remove); + } + + /** + * 检查给定缓存键所对应的缓存项是否已经过期 + * + * @param key 缓存key + * @return 如果已过期则返回true,否则返回false + */ + private static boolean isCacheItemExpire(String key) { + Date lastAddedTime = cache.get(key).getLastAddedTime(); + Date currentTime = DateUtil.date(); + long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND); + return betweenSecond > MAX_CACHE_TIME; + } + + /** + * 将 Date 类型的日期格式化为 "yyyy-MM-01" 格式的字符串。 + * @param date 需要格式化的日期对象。 + * @return 格式化后的日期字符串。 + */ + private static String formatDate(Date date) { + // 使用 DateUtil 工具类将 date 格式化为指定格式的字符串 + return DateUtil.format(date, "yyyy-MM-01"); + } + +} diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java new file mode 100644 index 00000000..41f1d5ce --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java @@ -0,0 +1,142 @@ +package com.zc.business.statistics.cache; + +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import com.zc.business.domain.DcTrafficSectionData; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import lombok.Getter; +import lombok.Setter; + +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * 以季度为单位的交通交通断面数据缓存类,用于存储和管理设备上报的交通断面统计数据,同时提供了数据缓存的有效性管理和清理功能。 + * @author xiepufeng + */ +@Getter +@Setter +public class QuarterlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache { + + // 静态缓存容器,使用ConcurrentHashMap保证线程安全 + @Getter + private static final Map cache = new ConcurrentHashMap<>(); + + // 最大缓存时间(单位:秒) + private static final long MAX_CACHE_TIME = (60 * 60) * (3 * 31 * 24 + 1); + + // 最大容量限制,防止内存溢出 + private static final int MAX_CAPACITY = 3 + 1000; // 缓存的最大条目数 + + + // 私有构造函数,确保只能通过静态方法获取实例 + private QuarterlyTrafficStatisticsCache() { + } + + /** + * 添加交通断面数据到缓存中 + * + * @param dcTrafficSectionData 待添加的交通断面统计数据 + */ + public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) { + // 获取或新建对应的缓存实例 + QuarterlyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData); + + // 检查缓存容量是否达到上限 + if (instance.getData().size() >= MAX_CAPACITY) { + instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId()); + return; + } + + // 更新最后添加时间 + instance.setLastAddedTime(DateUtil.date()); + + // 更新状态 + instance.setStored(false); + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + String key = generateCacheKey(dcTrafficSectionData); + + // 设置key和thatDay属性(仅在首次添加时) + if (instance.getCacheKey() == null) { + instance.setCacheKey(key); + instance.setStatisticalDateStr(formattedDate); + } + + dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.QUARTER); + + // 将新数据添加到数据列表中 + instance.getData().add(dcTrafficSectionData); + } + + /** + * 获取或创建对应设备与日期的DcTrafficSectionDataCache实例 + * + * @param dcTrafficSectionData 交通断面数据统计定义 + * @return 对应的交通断面数据缓存实例 + */ + private static QuarterlyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) { + // 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例 + return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new QuarterlyTrafficStatisticsCache()); + } + + /** + * 生成缓存键。 + * 该方法通过设备ID、统计日期和道路方向生成一个唯一的缓存键,用于存储或检索特定条件下的交通数据。 + * + * @param dcTrafficSectionData 包含设备ID、统计日期和道路方向的交通段数据对象。 + * @return 返回一个字符串形式的缓存键,由设备ID、格式化的统计日期和道路方向以"|"字符连接而成。 + */ + public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) { + // 获取设备ID + Long deviceId = dcTrafficSectionData.getDeviceId(); + + // 获取并格式化统计日期 + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + + // 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键 + return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection(); + } + + /** + * 清除所有过期的交通断面数据缓存项 + */ + public static void clearExpiredData() { + // 使用stream API找出所有过期的数据缓存项键 + Set keysToRemove = cache.keySet().stream() + .filter(QuarterlyTrafficStatisticsCache::isCacheItemExpire) + .collect(Collectors.toSet()); + + // 安全地从缓存中删除这些过期项 + keysToRemove.forEach(cache::remove); + } + + /** + * 检查给定缓存键所对应的缓存项是否已经过期 + * + * @param key 缓存key + * @return 如果已过期则返回true,否则返回false + */ + private static boolean isCacheItemExpire(String key) { + Date lastAddedTime = cache.get(key).getLastAddedTime(); + Date currentTime = DateUtil.date(); + long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND); + return betweenSecond > MAX_CACHE_TIME; + } + + + /** + * 将 Date 类型的日期格式化为 "yyyy-MM-01" 格式字符串。 + * + * @param date 需要格式化的日期对象。 + * @return 格式化后的日期字符串,格式为 "yyyy-MM-01"。 + */ + private static String formatDate(Date date) { + // 使用 DateUtil 工具类将 date 格式化为指定格式的字符串 + return DateUtil.format(date, "yyyy-MM-01"); + } + +} diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java new file mode 100644 index 00000000..907eecc2 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java @@ -0,0 +1,141 @@ +package com.zc.business.statistics.cache; + +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import com.zc.business.domain.DcTrafficSectionData; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import lombok.Getter; +import lombok.Setter; + +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * 以年为单位的交通交通断面数据缓存类,用于存储和管理设备上报的交通断面统计数据,同时提供了数据缓存的有效性管理和清理功能。 + * @author xiepufeng + */ +@Getter +@Setter +public class YearlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache { + + // 静态缓存容器,使用ConcurrentHashMap保证线程安全 + @Getter + private static final Map cache = new ConcurrentHashMap<>(); + + // 最大缓存时间(单位:秒) + private static final long MAX_CACHE_TIME = (60 * 60) * (366 * 24 + 1); + + // 最大容量限制,防止内存溢出 + private static final int MAX_CAPACITY = 4 + 1000; // 缓存的最大条目数 + + + // 私有构造函数,确保只能通过静态方法获取实例 + private YearlyTrafficStatisticsCache() { + } + + /** + * 添加交通断面数据到缓存中 + * + * @param dcTrafficSectionData 待添加的交通断面统计数据 + */ + public static void addCacheData(DcTrafficSectionData dcTrafficSectionData) { + // 获取或新建对应的缓存实例 + YearlyTrafficStatisticsCache instance = getInstance(dcTrafficSectionData); + + // 检查缓存容量是否达到上限 + if (instance.getData().size() >= MAX_CAPACITY) { + instance.getLogger().error("交通断面数据缓存出现异常,最大缓存量达到设定上线 {}, 当前设备是 {}", MAX_CAPACITY, dcTrafficSectionData.getDeviceId()); + return; + } + + // 更新最后添加时间 + instance.setLastAddedTime(DateUtil.date()); + + // 更新状态 + instance.setStored(false); + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + String key = generateCacheKey(dcTrafficSectionData); + + // 设置key和thatDay属性(仅在首次添加时) + if (instance.getCacheKey() == null) { + instance.setCacheKey(key); + instance.setStatisticalDateStr(formattedDate); + } + + dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.YEAR); + + // 将新数据添加到数据列表中 + instance.getData().add(dcTrafficSectionData); + } + + /** + * 获取或创建对应设备与日期的DcTrafficSectionDataCache实例 + * + * @param dcTrafficSectionData 交通断面数据统计定义 + * @return 对应的交通断面数据缓存实例 + */ + private static YearlyTrafficStatisticsCache getInstance(DcTrafficSectionData dcTrafficSectionData) { + // 使用toKey方法生成唯一键,并根据此键计算并返回缓存实例 + return cache.computeIfAbsent(generateCacheKey(dcTrafficSectionData), k -> new YearlyTrafficStatisticsCache()); + } + + /** + * 生成缓存键。 + * 该方法通过设备ID、统计日期和道路方向生成一个唯一的缓存键,用于存储或检索特定条件下的交通数据。 + * + * @param dcTrafficSectionData 包含设备ID、统计日期和道路方向的交通段数据对象。 + * @return 返回一个字符串形式的缓存键,由设备ID、格式化的统计日期和道路方向以"|"字符连接而成。 + */ + public static String generateCacheKey(DcTrafficSectionData dcTrafficSectionData) { + // 获取设备ID + Long deviceId = dcTrafficSectionData.getDeviceId(); + + // 获取并格式化统计日期 + String formattedDate = formatDate(dcTrafficSectionData.getStatisticalDate()); + + // 使用"|"字符连接设备ID、格式化的日期和道路方向,形成唯一键 + return deviceId + "|" + formattedDate + "|" + dcTrafficSectionData.getDirection(); + } + + /** + * 清除所有过期的交通断面数据缓存项 + */ + public static void clearExpiredData() { + // 使用stream API找出所有过期的数据缓存项键 + Set keysToRemove = cache.keySet().stream() + .filter(YearlyTrafficStatisticsCache::isCacheItemExpire) + .collect(Collectors.toSet()); + + // 安全地从缓存中删除这些过期项 + keysToRemove.forEach(cache::remove); + } + + /** + * 检查给定缓存键所对应的缓存项是否已经过期 + * + * @param key 缓存key + * @return 如果已过期则返回true,否则返回false + */ + private static boolean isCacheItemExpire(String key) { + Date lastAddedTime = cache.get(key).getLastAddedTime(); + Date currentTime = DateUtil.date(); + long betweenSecond = DateUtil.between(lastAddedTime, currentTime, DateUnit.SECOND); + return betweenSecond > MAX_CACHE_TIME; + } + + /** + * 将 Date 类型的日期格式化为 "yyyy-01-01" 格式的字符串。 + * + * @param date 需要格式化的日期对象。 + * @return 格式化后的日期字符串。 + */ + private static String formatDate(Date date) { + // 使用 DateUtil 工具类将 date 格式化为 "yyyy-01-01" 格式的字符串 + return DateUtil.format(date, "yyyy-01-01"); + } + +} diff --git a/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java b/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java new file mode 100644 index 00000000..32be9330 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java @@ -0,0 +1,70 @@ +package com.zc.business.statistics.handler; + +import com.ruoyi.common.utils.DateUtils; +import com.zc.business.domain.DcTrafficSectionData; +import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import org.springframework.util.CollectionUtils; + +import java.util.Collection; + +public class RealtimeTrafficStatistics { + + /** + * 对给定的交通数据集合进行统计分析,返回一个综合交通数据对象。 + * + * @param dataCollection 交通数据集合,不可为null或空。包含多个交通路段的详细数据。 + * @param trafficDataPeriodType 交通数据的时段类型,例如:小时、日、周等。 + * @return 综合交通数据对象,包含车流量总和、平均车速等统计结果。如果输入数据为空,则返回null。 + */ + public static DcTrafficSectionData trafficStatistics(Collection dataCollection, TrafficDataPeriodTypeEnum trafficDataPeriodType) { + + // 判断输入数据是否为空 + if (CollectionUtils.isEmpty(dataCollection)) { + return null; + } + + // 创建一个汇总统计用的对象 + DcTrafficSectionData aggregatedData = new DcTrafficSectionData(); + + // 初始化车流量总和和计算平均车速所需的分子部分 + int trafficVolume = 0; + double numerator = 0; + + // 遍历原始数据列表,累加车流量并计算平均车速 + for (DcTrafficSectionData data: dataCollection) { + // 累加车流量 + trafficVolume += data.getTrafficVolume(); + // 计算分子部分 + numerator += data.getAverageSpeed() * data.getTrafficVolume(); + } + + // 使用第一个数据项的信息填充汇总统计对象的基本属性(设备ID、桩号、方向) + DcTrafficSectionData firstDcTrafficSectionData = dataCollection.iterator().next(); + // 设备id + aggregatedData.setDeviceId(firstDcTrafficSectionData.getDeviceId()); + // 桩号 + aggregatedData.setStakeMark(firstDcTrafficSectionData.getStakeMark()); + // 道路方向 + aggregatedData.setDirection(firstDcTrafficSectionData.getDirection()); + + // 计算平均车速并设置到汇总统计对象中 + if (trafficVolume != 0) { + aggregatedData.setAverageSpeed((int) Math.round(numerator / trafficVolume)); + } else { + // 若车流量为0,则默认设置平均车速为0 + aggregatedData.setAverageSpeed(0); + } + // 时段类型 + aggregatedData.setPeriodType(trafficDataPeriodType); + // 设置统计时间 + aggregatedData.setStatisticalDate(firstDcTrafficSectionData.getStatisticalDate(), trafficDataPeriodType); + // 车流量 + aggregatedData.setTrafficVolume(trafficVolume); + // 更新或插入操作 + aggregatedData.setUpdateTime(DateUtils.getNowDate()); + // 生成主键 + aggregatedData.generateUniqueId(); + + return aggregatedData; + } +} diff --git a/zc-business/src/main/resources/mapper/business/DcTrafficSectionDataMapper.xml b/zc-business/src/main/resources/mapper/business/DcTrafficSectionDataMapper.xml new file mode 100644 index 00000000..da504afd --- /dev/null +++ b/zc-business/src/main/resources/mapper/business/DcTrafficSectionDataMapper.xml @@ -0,0 +1,44 @@ + + + + + + + INSERT INTO + dc_traffic_section_data + ( + id, + traffic_volume, + average_speed, + device_id, + statistical_date, + direction, + period_type, + stake_mark, + create_time + ) + VALUES + ( + #{id}, + #{trafficVolume}, + #{averageSpeed}, + #{deviceId}, + #{statisticalDate}, + #{direction}, + #{periodType}, + #{stakeMark}, + NOW()) + ON DUPLICATE KEY UPDATE + traffic_volume = VALUES(traffic_volume), + average_speed = VALUES(average_speed), + device_id = VALUES(device_id), + statistical_date = VALUES(statistical_date), + direction = VALUES(direction), + period_type = VALUES(period_type), + stake_mark = VALUES(stake_mark), + update_time = NOW() + + +