diff --git a/ruoyi-common/src/main/java/com/zc/common/core/httpclient/OkHttp.java b/ruoyi-common/src/main/java/com/zc/common/core/httpclient/OkHttp.java index 782d845e..6cd209aa 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/httpclient/OkHttp.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/httpclient/OkHttp.java @@ -11,6 +11,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Type; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; /** @@ -185,18 +186,16 @@ public class OkHttp throw new HttpException("OkHttp: url 不存在!"); } - StringBuilder urlBuilder = new StringBuilder(url).append("?"); + HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(url)).newBuilder(); if (requestParams != null) { - requestParams.params.forEach((key, value) -> - { - urlBuilder.append(key).append("=").append(toJson(value)).append("&"); - - }); + requestParams.params.forEach((key, value) -> urlBuilder.addQueryParameter(key, toJson(value))); } - Call call = okHttpClient.newCall(requestBuilder.url(urlBuilder.substring(0, urlBuilder.length() - 1)).get().build()); + String url = urlBuilder.build().toString(); + + Call call = okHttpClient.newCall(requestBuilder.url(url).get().build()); return call.execute(); } @@ -437,4 +436,4 @@ public class OkHttp return gson.toJson(object, type); } -} \ No newline at end of file +} diff --git a/zc-business/src/main/java/com/zc/business/constant/StatisticalRecoveryOffsetTime.java b/zc-business/src/main/java/com/zc/business/constant/StatisticalRecoveryOffsetTime.java new file mode 100644 index 00000000..78117d8d --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/constant/StatisticalRecoveryOffsetTime.java @@ -0,0 +1,14 @@ +package com.zc.business.constant; + +/** + * 统计恢复偏移时间类 + * 用于定义与交通数据恢复相关的偏移时间常量。 + * + * @author xiepufeng + */ +public class StatisticalRecoveryOffsetTime { + + // 定义交通数据段偏移的天数常量,表示偏移-10天 + public static final int TRAFFIC_SECTION_DATA_OFFSET_DAY = -10; +} + diff --git a/zc-business/src/main/java/com/zc/business/controller/DcDeviceController.java b/zc-business/src/main/java/com/zc/business/controller/DcDeviceController.java index 41b68888..1e6f55f3 100644 --- a/zc-business/src/main/java/com/zc/business/controller/DcDeviceController.java +++ b/zc-business/src/main/java/com/zc/business/controller/DcDeviceController.java @@ -161,6 +161,30 @@ public class DcDeviceController extends BaseController { //***********************************物联设备接口************************************** + + /** + * 根据物联产品id获取设备数据 + * + * @param productId 物联产品id + * @return 获取设备数据操作结果 + */ + @ApiOperation("根据物联产品id获取设备数据") + @GetMapping("/devices/{productId}") + public AjaxResult getDeviceByProductId(@PathVariable @Parameter(description = "产品ID") String productId) throws HttpException, IOException { + + if (!StringUtils.hasText(productId)) { + return AjaxResult.error("产品ID不能为空"); + } + + OkHttp okHttp = new OkHttp(); + Response response // 请求响应 + = okHttp + .url(iotAddress + "/api/iot/device/cache/" + productId) // 请求地址 + .get(); // 请求方法 + return JSON.parseObject(response.body().string(), AjaxResult.class); + } + + /** * 根据物联设备id获取最新属性数据 * diff --git a/zc-business/src/main/java/com/zc/business/domain/DcDevice.java b/zc-business/src/main/java/com/zc/business/domain/DcDevice.java index ee3d8a32..dbcc7fe8 100644 --- a/zc-business/src/main/java/com/zc/business/domain/DcDevice.java +++ b/zc-business/src/main/java/com/zc/business/domain/DcDevice.java @@ -77,4 +77,25 @@ public class DcDevice { //设备厂商 @TableField(exist = false) private String manufacturer; + + public Integer stakeMarkToInt() { + if (stakeMark == null) { + return null; + } + + // 不区分大小写的正则表达式匹配 'k' 和 '+' + String[] parts = this.stakeMark.split("(?i)k|\\+"); + + // 提取出公里数部分和米数部分 + String kmStr = parts[1].trim(); + // 将公里数和米数转换为整数 + int km = Integer.parseInt(kmStr); + int m = 0; + if (parts.length == 3) { + String mStr = parts[2].trim(); + m = Integer.parseInt(mStr); + } + // 计算总米数 + return km * 1000 + m; + } } diff --git a/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java b/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java index 3edeb8e9..00c5efe9 100644 --- a/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java +++ b/zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java @@ -1,6 +1,7 @@ package com.zc.business.domain; import cn.hutool.core.date.DateUtil; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.fasterxml.jackson.annotation.JsonFormat; import com.zc.business.enums.TrafficDataPeriodTypeEnum; @@ -43,6 +44,12 @@ public class DcTrafficSectionData { */ private Date statisticalDate; + /** + * 上报时间 + */ + @TableField(exist = false) + private Date reportTime; + /** * 道路方向 */ @@ -90,7 +97,7 @@ public class DcTrafficSectionData { if (o == null || getClass() != o.getClass()) return false; // 对象为空或类型不同,返回false DcTrafficSectionData that = (DcTrafficSectionData) o; // 类型转换 return Objects.equals(deviceId, that.deviceId) && - Objects.equals(statisticalDate, that.statisticalDate) && + Objects.equals(reportTime, that.reportTime) && Objects.equals(direction, that.direction) && Objects.equals(periodType, that.periodType) && Objects.equals(stakeMark, that.stakeMark); // 比较各属性值 @@ -102,16 +109,7 @@ public class DcTrafficSectionData { */ @Override public int hashCode() { - return Objects.hash(deviceId, statisticalDate, direction, periodType, stakeMark); - } - - /** - * 设置统计日期,根据不同的统计周期类型来调整日期,使其对应周期的起始日期。 - * @param statisticalDate 统计日期,原始日期。 - */ - public void setStatisticalDate(Date statisticalDate) { - TrafficDataPeriodTypeEnum typeEnum = TrafficDataPeriodTypeEnum.valueOfCode(periodType); - setStatisticalDate(statisticalDate, typeEnum); + return Objects.hash(deviceId, reportTime, direction, periodType, stakeMark); } /** @@ -144,10 +142,10 @@ public class DcTrafficSectionData { } /** - * 根据设备ID、统计时间、方向、时段类型、桩号生成一个唯一ID + * 根据设备ID、上报时间、方向、时段类型、桩号生成一个唯一ID */ public void generateUniqueId() { - String combinedAttributes = deviceId + "_" + DateUtil.format(statisticalDate, "yyyyMMdd_HHmmss") + "_" + direction + "_" + periodType + "_" + stakeMark; + String combinedAttributes = deviceId + "_" + DateUtil.format(reportTime, "yyyyMMdd_HHmmss") + "_" + direction + "_" + periodType + "_" + stakeMark; this.id = DigestUtils.md5Hex(combinedAttributes); } diff --git a/zc-business/src/main/java/com/zc/business/enums/DeviceDataCategory.java b/zc-business/src/main/java/com/zc/business/enums/DeviceDataCategory.java new file mode 100644 index 00000000..82bd185a --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/enums/DeviceDataCategory.java @@ -0,0 +1,19 @@ +package com.zc.business.enums; + +import lombok.Getter; + +/** + * @author xiepufeng + */ +@Getter +public enum DeviceDataCategory { + REAL_TIME("实时数据"), + HISTORY("历史数据"); + + private final String description; + + DeviceDataCategory(String description) { + this.description = description; + } + +} diff --git a/zc-business/src/main/java/com/zc/business/enums/IotProductPropertiesEnum.java b/zc-business/src/main/java/com/zc/business/enums/IotProductPropertiesEnum.java new file mode 100644 index 00000000..73658394 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/enums/IotProductPropertiesEnum.java @@ -0,0 +1,28 @@ +package com.zc.business.enums; + +/** + * 产品属性枚举 + * @author xiepufeng + */ +public enum IotProductPropertiesEnum { + + /** + * 一站式产品属性01 + */ + ONE_STOP_PRODUCT_01("01"), + + /** + * 一站式产品属性11 + */ + ONE_STOP_PRODUCT_11("11"); + + private final String number; + + IotProductPropertiesEnum(String number) { + this.number = number; + } + + public String getNumber() { + return this.number; + } +} diff --git a/zc-business/src/main/java/com/zc/business/enums/LaneDirection.java b/zc-business/src/main/java/com/zc/business/enums/LaneDirection.java new file mode 100644 index 00000000..ee51bfe8 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/enums/LaneDirection.java @@ -0,0 +1,34 @@ +package com.zc.business.enums; + + +import lombok.Getter; + +/** + * 道路方向 + * @author xiepufeng + **/ +@Getter +public enum LaneDirection { + + UPWARD((byte)1, "上行"), + BIDIRECTIONAL((byte) 2, "上下行(双向)"), + DOWNWARD((byte)3, "下行"); + + private final byte value; + private final String description; + + LaneDirection(byte value, String description) { + this.value = value; + this.description = description; + } + + + public static LaneDirection fromValue(int value) { + for (LaneDirection direction : values()) { + if (direction.getValue() == value) { + return direction; + } + } + throw new IllegalArgumentException("Invalid value for LaneDirection: " + value); + } +} diff --git a/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java b/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java index 82b9fe09..796e6311 100644 --- a/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java +++ b/zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java @@ -10,11 +10,11 @@ public enum TrafficDataPeriodTypeEnum { // 枚举成员:年,关联字节型代码 1 和描述 "年" YEAR((byte) 1, "年"), - // 枚举成员:月,关联字节型代码 2 和描述 "月" - MONTH((byte) 2, "月"), - // 枚举成员:季,关联字节型代码 3 和描述 "季" - QUARTER((byte) 3, "季"), + QUARTER((byte) 2, "季"), + + // 枚举成员:月,关联字节型代码 2 和描述 "月" + MONTH((byte) 3, "月"), // 枚举成员:日,关联字节型代码 4 和描述 "日" DAY((byte) 4, "日"); diff --git a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java index eec32d66..48cc74a7 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java +++ b/zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java @@ -57,8 +57,6 @@ public class DeviceMessageHandler { @Autowired private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; - @Autowired - private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService; /** * 更新设备状态 @@ -112,7 +110,7 @@ public class DeviceMessageHandler { return; } - // 一站式情况调查站 + // 气象检测器 if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)) { weatherDetectorMessageHandle(data); } @@ -329,7 +327,7 @@ public class DeviceMessageHandler { * @param msg 设备消息 */ private void oneStopDeviceMessageHandle(JSONObject msg) { - dcTrafficSectionDataService.processRealtimeMessage(msg); + // dcTrafficSectionDataService.processRealtimeOneStopMessage(msg); } diff --git a/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java b/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java index 868f01ca..7e3db831 100644 --- a/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java +++ b/zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java @@ -7,9 +7,9 @@ import com.zc.business.domain.DcTrafficSectionData; public interface DcTrafficSectionDataService extends IService { /** - * 处理实时接收到的设备消息,并将其转换为交通断面统计数据对象并缓存。 + * 处理实时接收到的一类交流站设备消息,并将其转换为交通断面统计数据对象并缓存。 * * @param msg 设备发送的JSON格式实时消息 */ - void processRealtimeMessage(JSONObject msg); + void processRealtimeOneStopMessage(JSONObject msg); } diff --git a/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java b/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java index 431fe06b..0548a7f6 100644 --- a/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java +++ b/zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java @@ -1,23 +1,31 @@ package com.zc.business.service.impl; import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ruoyi.common.core.redis.RedisCache; +import com.zc.business.constant.RedisKeyConstants; +import com.zc.business.constant.StatisticalRecoveryOffsetTime; +import com.zc.business.controller.DcDeviceController; +import com.zc.business.domain.DcDevice; import com.zc.business.domain.DcTrafficSectionData; -import com.zc.business.enums.TrafficDataPeriodTypeEnum; +import com.zc.business.enums.*; import com.zc.business.statistics.cache.*; import com.zc.business.mapper.DcTrafficSectionDataMapper; import com.zc.business.service.DcTrafficSectionDataService; import com.zc.business.statistics.handler.RealtimeTrafficStatistics; +import com.zc.common.core.httpclient.exception.HttpException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.function.Consumer; /** @@ -30,21 +38,197 @@ public class DcTrafficSectionDataServiceImpl extends ServiceImpl implements DcTrafficSectionDataService { + // 日志记录器 + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Resource private DcTrafficSectionDataMapper dcTrafficSectionDataMapper; + @Resource + private RedisCache redisCache; + + @Resource + private DcDeviceController dcDeviceController; + /** * 初始化方法,用于在对象创建后恢复各种周期的交通数据缓存。 * 该方法标注了@PostConstruct注解,确保在依赖注入完成后调用。 */ @PostConstruct public void init() { - // TODO 恢复每天交通数据缓存(es获取数据) + recoveryDailyCache(); // 从es中恢复当天交通数据缓存 recoveryMonthlyCache(); // 恢复每月交通数据缓存 recoveryQuarterlyCache(); // 恢复每季度交通数据缓存 recoveryYearlyCache(); // 恢复每年交通数据缓存 } + /** + * 处理实时接收到的一类交流站设备消息,并将其转换为交通断面统计数据对象并缓存。 + * + * @param msg 设备发送的JSON格式实时消息 + */ + @Override + public void processRealtimeOneStopMessage(JSONObject msg) { + + // 1. 将设备消息转换为交通断面数据统计定义对象 + List dcTrafficSectionDataList = convertToTrafficStatistics(msg, DeviceDataCategory.REAL_TIME); + + if (dcTrafficSectionDataList != null && !dcTrafficSectionDataList.isEmpty()) { + // 2. 将转换后的数据添加到缓存中 + dcTrafficSectionDataList.forEach(DailyTrafficStatisticsCache::addCacheData); + } + } + + /** + * 定义每小时第20分钟执行的任务,用于清除过期缓存数据并将缓存中的数据整合后保存至数据库。 + */ + // @Scheduled(cron = "0 20 * * * ?") // 每小时的20分整点执行该任务 + public void performHourlyCleanupAndPersist() { + // 清除已过期的缓存数据 + DailyTrafficStatisticsCache.clearExpiredData(); + MonthlyTrafficStatisticsCache.clearExpiredData(); + QuarterlyTrafficStatisticsCache.clearExpiredData(); + YearlyTrafficStatisticsCache.clearExpiredData(); + // 整合缓存数据并保存至数据库 + // 将缓存中的数据按日统计后保存至数据库 + // 添加月交通断面数据到缓存中 + persistAggregatedData(DailyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.DAY, MonthlyTrafficStatisticsCache::addCacheData); + // 将缓存中的数据按月统计后保存至数据库 + // 添加季度交通断面数据到缓存中 + persistAggregatedData(MonthlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.MONTH, QuarterlyTrafficStatisticsCache::addCacheData); + // 将缓存中的数据按季度统计后保存至数据库 + // 添加年交通断面数据到缓存中 + persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData); + // 将缓存中的数据按年统计后保存至数据库 + persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {}); + } + + /** + * 恢复每日缓存的函数。 + * 该方法尝试从物联平台获取所有设备信息,并对这些信息进行处理。 + * 如果获取信息失败或处理过程中发生异常,则记录错误信息。 + */ + private void recoveryDailyCache() { + + try { + // 尝试从指定产品ID获取设备信息 + Map oneStopDeviceMap = dcDeviceController.getDeviceByProductId(IotProductEnum.ONE_STOP_PRODUCT.value()); + + // 检查获取的设备信息是否为空 + if (oneStopDeviceMap == null || oneStopDeviceMap.get("data") == null) { + logger.error("获取一站式设备数据失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value()); + return; + } + + // 将获取的设备信息转换为JSON数组,并遍历处理每个设备的数据 + JSONArray deviceJsonArray = JSONArray.parseArray(oneStopDeviceMap.get("data").toString()); + deviceJsonArray.forEach(this::processDeviceData); + + } catch (HttpException | IOException e) { + // 记录处理设备数据时发生的异常 + logger.error("处理设备数据时发生异常", e); + } + } + + /** + * 处理设备数据的函数。 + * 该方法首先将传入的设备对象转换为JSON对象,然后从JSON对象中提取设备ID。 + * 如果设备ID有效,则查询设备的实时流量数据,并对获取的设备属性数据进行处理。 + * + * @param deviceObject 设备对象,需要是一个JSONObject,包含设备信息。 + */ + private void processDeviceData(Object deviceObject) { + JSONObject deviceJsonObject = (JSONObject)deviceObject; + // 提取设备ID + String deviceId = deviceJsonObject.getString("id"); + + // 检查设备ID的有效性 + if (deviceId == null || deviceId.isEmpty()) { + logger.error("获取一站式设备id失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value()); + return; + } + + // 查询并处理设备属性数据 + try { + HashMap props = buildPropertiesRequiredParameter(); + + // 查询设备的实时属性数据 + Map deviceProperties = dcDeviceController.queryDeviceProperties(deviceId, IotProductPropertiesEnum.ONE_STOP_PRODUCT_01.getNumber(), props); + + // 对获取的设备属性数据进行进一步处理 + processDeviceProperties(deviceProperties); + } catch (HttpException | IOException e) { + // 记录处理设备属性数据时发生的异常 + logger.error("处理设备属性数据时发生异常,设备id:{}", deviceId, e); + } + } + + + /** + * 构建必需的属性请求参数集合。 + * 该方法不接受任何参数,返回一个包含特定键值对的HashMap对象。 + * 键值对表达了查询条件,其中键是查询条件的结构化字符串表示,值是条件的具体值。 + * + * @return HashMap 包含查询条件的HashMap对象。 + */ + private HashMap buildPropertiesRequiredParameter() { + + // todo 数据库中查询出最大的时间 + + HashMap props = new HashMap<>(); + // 设置查询条件的键为“timestamp$BTW”,表示时间戳在一定范围内 + props.put("terms[0].column", "timestamp$BTW"); + ArrayList dateList = new ArrayList<>(); + // 添加当前日期的开始和结束时间到列表,用于设定时间范围 + dateList.add(DateUtil.beginOfDay(DateUtil.offsetDay(new Date(), StatisticalRecoveryOffsetTime.TRAFFIC_SECTION_DATA_OFFSET_DAY)).toString()); + dateList.add(DateUtil.endOfDay(new Date()).toString()); + // 将日期列表以逗号分隔并设置为查询条件的值 + props.put("terms[0].value", String.join(",", dateList)); + props.put("paging", false); + return props; + } + + /** + * 处理设备属性信息。 + * 该方法接收一个设备属性的映射,从中提取属性数据,并将其转换为交通断面数据统计定义对象列表,然后将这些数据添加到缓存中。 + * + * @param deviceProperties 包含设备属性数据的映射。该映射应包含一个名为"data"的键,其值是一个包含具体属性信息的JSON对象。 + */ + private void processDeviceProperties(Map deviceProperties) { + // 检查传入的设备属性映射是否为空,或者其中的"data"键对应的值是否为空 + if (deviceProperties == null || deviceProperties.get("data") == null) { + logger.error("获取一站式属性数据失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value()); + return; + } + + // 从设备属性映射的"data"值中解析出JSON对象 + JSONObject propertiesObject = JSONObject.parseObject(deviceProperties.get("data").toString()); + + // 检查解析出的JSON对象是否包含"data"键 + if (propertiesObject.get("data") == null) { + logger.error("获取一站式属性数据失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value()); + return; + } + + // 从解析出的JSON对象中获取设备属性数据列表 + JSONArray properties = JSONArray.parseArray(propertiesObject.get("data").toString()); + + // 遍历设备属性数据列表,对每个属性对象进行处理 + properties.forEach(propertyObject -> { + JSONObject propertyJsonObject = (JSONObject)propertyObject; + + // 将设备消息转换为交通断面数据统计定义对象 + List dcTrafficSectionDataList = convertToTrafficStatistics(propertyJsonObject, DeviceDataCategory.HISTORY); + + // 如果转换结果非空且不为空列表,则将转换后的数据添加到缓存中 + if (dcTrafficSectionDataList != null && !dcTrafficSectionDataList.isEmpty()) { + // 将转换后的数据添加到缓存中 + dcTrafficSectionDataList.forEach(DailyTrafficStatisticsCache::addCacheData); + } + + }); + } + /** * 恢复每月交通数据缓存的方法。 * 通过查询当前月份至今的每日交通数据,并将其添加到每月交通统计缓存中。 @@ -87,60 +271,202 @@ public class DcTrafficSectionDataServiceImpl dcTrafficSectionDataList.forEach(YearlyTrafficStatisticsCache::addCacheData); } + /** - * 处理实时接收到的设备消息,并将其转换为交通断面统计数据对象并缓存。 + * 将设备消息转换为交通断面数据统计定义对象 * - * @param msg 设备发送的JSON格式实时消息 + * @param msg JSON格式的设备实时消息 + * @param category 设备数据类型 + * @return 转换后的交通断面数据统计定义对象 */ - @Override - public void processRealtimeMessage(JSONObject msg) { - // 1. 将设备消息转换为交通断面数据统计定义对象 - DcTrafficSectionData dcTrafficSectionData = convertToTrafficStatistics(msg); + public List convertToTrafficStatistics(JSONObject msg, DeviceDataCategory category) { + // 获取属性 + JSONObject property; + + if (category == DeviceDataCategory.REAL_TIME) { + JSONObject properties = msg.getJSONObject("properties"); + + // 属性数据 + if (properties == null) { + logger.error("接收实时属性数据,属性数据不存在,请检查物联网一站式设备数据是否正常"); + return null; + } + + // 获取属性01的数据 + property = properties.getJSONObject(IotProductPropertiesEnum.ONE_STOP_PRODUCT_01.getNumber()); + } else { + property = msg.getJSONObject("value"); + } + + // 判断是否属性01的数据 + if (property == null) { + logger.error("非属性01的数据,无法处理,请检查物联网一站式设备数据是否正常"); + return null; + } + + // 物联设备id + String iotDeviceId = msg.getString("deviceId"); + + if (iotDeviceId == null || iotDeviceId.isEmpty()) { + logger.error("设备id为空,无法处理,请检查物联网平台一站式设备数据是否正常"); + return null; + } + + // 上报时间(时间戳) + Long timestamp = msg.getLong("timestamp"); + + if (timestamp == null || timestamp == 0L) { + logger.error("上报时间为空,无法处理,请检查物联网平台一站式设备数据是否正常"); + return null; + } + + // 从缓存中获取设备信息 + DcDevice dcDevice = redisCache.getCacheMapValue(RedisKeyConstants.DC_DEVICES, iotDeviceId); + + if (dcDevice == null) { + logger.error("设备信息不存在,请检查是否配置设备信息,物联平台设备id:{}无对应设备信息", iotDeviceId); + return null; + } - // 2. 将转换后的数据添加到缓存中 - // DailyTrafficStatisticsCache.addCacheData(dcTrafficSectionData); + // 上报时间 + Date reportTime = DateUtil.date(timestamp); + + // 车道信息 + JSONArray lanes = property.getJSONArray("lanes"); + + if (lanes == null || lanes.isEmpty()) { + logger.error("车道信息为空,无法处理,请检查物联网平台一站式设备数据是否正常"); + return null; + } + + // 分别处理上行和下行数据 + return new ArrayList<>(processLaneData(lanes, dcDevice, reportTime).values()); } /** - * 将设备实时消息转换为交通断面数据统计定义对象 + * 处理车道数据,计算上行和下行的车流量和累计平均速度。 * - * @param msg JSON格式的设备实时消息 - * @return 转换后的交通断面数据统计定义对象 + * @param lanes 车道数据的JSON数组,包含各个车道的信息。 + * @param dcDevice 设备信息,用于标识数据来源。 + * @param reportTime 报告时间,标识数据的时间戳。 + * @return 返回一个Map,包含上行和下行的交通段数据。 */ - public DcTrafficSectionData convertToTrafficStatistics(JSONObject msg) { + private Map processLaneData(JSONArray lanes, DcDevice dcDevice, Date reportTime) { + + Map resultMap = new HashMap<>(); + + // 初始化上行和下行的交通段数据 + DcTrafficSectionData upwardData = new DcTrafficSectionData(); + DcTrafficSectionData downwardData = new DcTrafficSectionData(); + + initializeTrafficSectionData(upwardData, dcDevice, reportTime, LaneDirection.UPWARD); + initializeTrafficSectionData(downwardData, dcDevice, reportTime, LaneDirection.DOWNWARD); + // 初始化上行和下行的车流量和累计平均速度 + int upwardTrafficVolume = 0, downwardTrafficVolume = 0; + double upwardCumulativeAverageSpeed = 0.0, downwardCumulativeAverageSpeed = 0.0; + // 遍历车道信息,计算上行和下行的车流量和累计平均速度 + for (Object lane : lanes) { + // 解析车道数据 + JSONObject laneData = (JSONObject) lane; + // 获取车道号,并判断是否为下行车道 + Integer laneNumber = laneData.getInteger("laneNumber"); + boolean isDownward = laneNumber >= 31 || laneNumber == 3; - DcTrafficSectionData dcTrafficSectionData = new DcTrafficSectionData(); + // 根据车道方向累加车流量和累计平均速度 + int totalTrafficFlow = laneData.getInteger("totalTrafficFlow"); + double cumulativeAverageSpeed = calculateCumulativeAverageSpeed(laneData); - // TODO - return dcTrafficSectionData; + if (isDownward) { + downwardTrafficVolume += totalTrafficFlow; + downwardCumulativeAverageSpeed += cumulativeAverageSpeed; + + } else { + upwardTrafficVolume += totalTrafficFlow; + upwardCumulativeAverageSpeed += cumulativeAverageSpeed; + } + } + // 设置上行和下行的车流量和累计平均速度 + setUpTrafficSectionData(upwardData, upwardTrafficVolume, upwardCumulativeAverageSpeed); + setUpTrafficSectionData(downwardData, downwardTrafficVolume, downwardCumulativeAverageSpeed); + + // 将上行和下行的交通段数据放入结果映射中 + resultMap.put(LaneDirection.UPWARD, upwardData); + resultMap.put(LaneDirection.DOWNWARD, downwardData); + + return resultMap; } /** - * 定义每小时第20分钟执行的任务,用于清除过期缓存数据并将缓存中的数据整合后保存至数据库。 + * 初始化交通段数据 + * @param data 交通段数据对象,用于存储交通数据 + * @param dcDevice 设备对象,包含设备的详细信息 + * @param reportTime 数据上报时间 + * @param direction 车道方向 */ -// @Scheduled(cron = "0 20 * * * ?") // 每小时的20分整点执行该任务 - public void performHourlyCleanupAndPersist() { - // 清除已过期的缓存数据 - DailyTrafficStatisticsCache.clearExpiredData(); - MonthlyTrafficStatisticsCache.clearExpiredData(); - QuarterlyTrafficStatisticsCache.clearExpiredData(); - YearlyTrafficStatisticsCache.clearExpiredData(); - // 整合缓存数据并保存至数据库 - // 将缓存中的数据按日统计后保存至数据库 - // 添加月交通断面数据到缓存中 - persistAggregatedData(DailyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.DAY, MonthlyTrafficStatisticsCache::addCacheData); - // 将缓存中的数据按月统计后保存至数据库 - // 添加季度交通断面数据到缓存中 - persistAggregatedData(MonthlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.MONTH, QuarterlyTrafficStatisticsCache::addCacheData); - // 将缓存中的数据按季度统计后保存至数据库 - // 添加年交通断面数据到缓存中 - persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData); - // 将缓存中的数据按年统计后保存至数据库 - persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {}); + private void initializeTrafficSectionData(DcTrafficSectionData data, DcDevice dcDevice, Date reportTime, LaneDirection direction) { + + // 设置设备id + data.setDeviceId(dcDevice.getId()); + // 设置车道方向 + data.setDirection(direction.getValue()); + // 设置上报时间 + data.setReportTime(reportTime); + // 设置数据上报时间 + data.setStatisticalDate(reportTime); + // 设置统计时段类型为日 + data.setPeriodType(TrafficDataPeriodTypeEnum.DAY); + // 将设备的桩号转换为整数后设置 + data.setStakeMark(dcDevice.stakeMarkToInt()); + } + + + /** + * 设置交通路段数据。 + * 该方法用于根据给定的交通量和累积平均速度,设置交通路段的数据。 + * + * @param data 交通路段数据对象,用于存储交通路段的相关信息。 + * @param trafficVolume 该路段的交通量,单位通常为车辆数。 + * @param cumulativeAverageSpeed 该路段的累积平均速度,单位通常为千米/小时。 + */ + private void setUpTrafficSectionData(DcTrafficSectionData data, int trafficVolume, double cumulativeAverageSpeed) { + data.setTrafficVolume(trafficVolume); // 设置交通量 + data.setAverageSpeed(0); + if (trafficVolume != 0) { // 当交通量不为0时,计算并设置平均速度 + data.setAverageSpeed((int) Math.round(cumulativeAverageSpeed / trafficVolume)); // 平均速度 = 累积平均速度 / 交通量 + } + } + + /** + * 计算给定车道数据的累计平均速度。 + * 该函数根据laneData中提供的各种车辆类型的数量和平均速度,计算出累计的平均速度。 + * + * @param laneData 包含车道数据的JSONObject对象,其中包含了各种车辆类型的数量和平均速度等数据。 + * @return 返回计算出的累计平均速度。 + */ + private int calculateCumulativeAverageSpeed(JSONObject laneData) { + // 根据laneData中的数据计算累计平均速度 + // 累加平均速度(中小客车) + return laneData.getInteger("trafficNumberOfInAndSmall") * laneData.getInteger("inAndSmallAverageVehicleSpeed") + + // 累加平均速度(小型货车) + laneData.getInteger("trafficVolumeOfSmallTrucks") * laneData.getInteger("smallTrucksAverageVehicleSpeed") + + // 累加平均速度(大客车) + laneData.getInteger("busTrafficVolume") * laneData.getInteger("averageSpeedOfBus") + + // 累加平均速度(中型货车) + laneData.getInteger("mediumTruckTrafficVolume") * laneData.getInteger("averageSpeedOfMediumSizeTrucks") + + // 累加平均速度(大型货车) + laneData.getInteger("largeTruckTrafficVolume") * laneData.getInteger("averageSpeedOfLargeTrucks") + + // 累加平均速度(特大型货车) + laneData.getInteger("extraLargeTrucksTrafficVolume") * laneData.getInteger("averageSpeedOfExtraLargeTrucks") + + // 累加平均速度(集装箱车) + laneData.getInteger("containerTruckTrafficVolume") * laneData.getInteger("averageSpeedOfContainerTruck") + + // 累加平均速度(拖拉机) + laneData.getInteger("tractorTrafficVolume") * laneData.getInteger("averageSpeedOfTractor") + + // 累加平均速度(摩托车) + laneData.getInteger("motorcycleTrafficVolume") * laneData.getInteger("averageSpeedOfMotorcycle"); } /** diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java index f2cc4f26..043bbada 100644 --- a/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java @@ -63,6 +63,11 @@ public class DailyTrafficStatisticsCache extends AbstractTrafficStatisticsCache instance.setStatisticalDateStr(formattedDate); } + // 更新上报时间 + dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate()); + // 更新数据周期类型 + dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.DAY); + // 更新统计日期 dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.DAY); // 将新数据添加到数据列表中 diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java index 41defc79..0d3d5956 100644 --- a/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java @@ -63,6 +63,11 @@ public class MonthlyTrafficStatisticsCache extends AbstractTrafficStatisticsCach instance.setStatisticalDateStr(formattedDate); } + // 更新上报时间 + dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate()); + // 更新数据周期类型 + dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.MONTH); + // 更新统计日期 dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.MONTH); // 将新数据添加到数据列表中 diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java index 41f1d5ce..5b4d5ef0 100644 --- a/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java @@ -66,6 +66,11 @@ public class QuarterlyTrafficStatisticsCache extends AbstractTrafficStatisticsCa instance.setStatisticalDateStr(formattedDate); } + // 更新上报时间 + dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate()); + // 更新数据周期类型 + dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.QUARTER); + // 更新统计日期 dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.QUARTER); // 将新数据添加到数据列表中 diff --git a/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java b/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java index 907eecc2..8b21f675 100644 --- a/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java +++ b/zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java @@ -66,6 +66,11 @@ public class YearlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache instance.setStatisticalDateStr(formattedDate); } + // 更新上报时间 + dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate()); + // 更新数据周期类型 + dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.YEAR); + // 更新统计日期 dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.YEAR); // 将新数据添加到数据列表中 diff --git a/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java b/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java index 32be9330..e122cc0e 100644 --- a/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java +++ b/zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java @@ -46,6 +46,8 @@ public class RealtimeTrafficStatistics { aggregatedData.setStakeMark(firstDcTrafficSectionData.getStakeMark()); // 道路方向 aggregatedData.setDirection(firstDcTrafficSectionData.getDirection()); + // 上报时间 + aggregatedData.setReportTime(firstDcTrafficSectionData.getReportTime()); // 计算平均车速并设置到汇总统计对象中 if (trafficVolume != 0) {