Browse Source

交通断面统计

develop
xiepufeng 8 months ago
parent
commit
410ca9d444
  1. 13
      ruoyi-common/src/main/java/com/zc/common/core/httpclient/OkHttp.java
  2. 14
      zc-business/src/main/java/com/zc/business/constant/StatisticalRecoveryOffsetTime.java
  3. 24
      zc-business/src/main/java/com/zc/business/controller/DcDeviceController.java
  4. 21
      zc-business/src/main/java/com/zc/business/domain/DcDevice.java
  5. 24
      zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java
  6. 19
      zc-business/src/main/java/com/zc/business/enums/DeviceDataCategory.java
  7. 28
      zc-business/src/main/java/com/zc/business/enums/IotProductPropertiesEnum.java
  8. 34
      zc-business/src/main/java/com/zc/business/enums/LaneDirection.java
  9. 8
      zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java
  10. 6
      zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
  11. 4
      zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java
  12. 406
      zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java
  13. 5
      zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java
  14. 5
      zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java
  15. 5
      zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java
  16. 5
      zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java
  17. 2
      zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java

13
ruoyi-common/src/main/java/com/zc/common/core/httpclient/OkHttp.java

@ -11,6 +11,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@ -185,18 +186,16 @@ public class OkHttp
throw new HttpException("OkHttp: url 不存在!");
}
StringBuilder urlBuilder = new StringBuilder(url).append("?");
HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(url)).newBuilder();
if (requestParams != null)
{
requestParams.params.forEach((key, value) ->
{
urlBuilder.append(key).append("=").append(toJson(value)).append("&");
});
requestParams.params.forEach((key, value) -> urlBuilder.addQueryParameter(key, toJson(value)));
}
Call call = okHttpClient.newCall(requestBuilder.url(urlBuilder.substring(0, urlBuilder.length() - 1)).get().build());
String url = urlBuilder.build().toString();
Call call = okHttpClient.newCall(requestBuilder.url(url).get().build());
return call.execute();
}

14
zc-business/src/main/java/com/zc/business/constant/StatisticalRecoveryOffsetTime.java

@ -0,0 +1,14 @@
package com.zc.business.constant;
/**
* 统计恢复偏移时间类
* 用于定义与交通数据恢复相关的偏移时间常量
*
* @author xiepufeng
*/
public class StatisticalRecoveryOffsetTime {
// 定义交通数据段偏移的天数常量,表示偏移-10天
public static final int TRAFFIC_SECTION_DATA_OFFSET_DAY = -10;
}

24
zc-business/src/main/java/com/zc/business/controller/DcDeviceController.java

@ -161,6 +161,30 @@ public class DcDeviceController extends BaseController {
//***********************************物联设备接口**************************************
/**
* 根据物联产品id获取设备数据
*
* @param productId 物联产品id
* @return 获取设备数据操作结果
*/
@ApiOperation("根据物联产品id获取设备数据")
@GetMapping("/devices/{productId}")
public AjaxResult getDeviceByProductId(@PathVariable @Parameter(description = "产品ID") String productId) throws HttpException, IOException {
if (!StringUtils.hasText(productId)) {
return AjaxResult.error("产品ID不能为空");
}
OkHttp okHttp = new OkHttp();
Response response // 请求响应
= okHttp
.url(iotAddress + "/api/iot/device/cache/" + productId) // 请求地址
.get(); // 请求方法
return JSON.parseObject(response.body().string(), AjaxResult.class);
}
/**
* 根据物联设备id获取最新属性数据
*

21
zc-business/src/main/java/com/zc/business/domain/DcDevice.java

@ -77,4 +77,25 @@ public class DcDevice {
//设备厂商
@TableField(exist = false)
private String manufacturer;
public Integer stakeMarkToInt() {
if (stakeMark == null) {
return null;
}
// 不区分大小写的正则表达式匹配 'k' 和 '+'
String[] parts = this.stakeMark.split("(?i)k|\\+");
// 提取出公里数部分和米数部分
String kmStr = parts[1].trim();
// 将公里数和米数转换为整数
int km = Integer.parseInt(kmStr);
int m = 0;
if (parts.length == 3) {
String mStr = parts[2].trim();
m = Integer.parseInt(mStr);
}
// 计算总米数
return km * 1000 + m;
}
}

24
zc-business/src/main/java/com/zc/business/domain/DcTrafficSectionData.java

@ -1,6 +1,7 @@
package com.zc.business.domain;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
@ -43,6 +44,12 @@ public class DcTrafficSectionData {
*/
private Date statisticalDate;
/**
* 上报时间
*/
@TableField(exist = false)
private Date reportTime;
/**
* 道路方向
*/
@ -90,7 +97,7 @@ public class DcTrafficSectionData {
if (o == null || getClass() != o.getClass()) return false; // 对象为空或类型不同,返回false
DcTrafficSectionData that = (DcTrafficSectionData) o; // 类型转换
return Objects.equals(deviceId, that.deviceId) &&
Objects.equals(statisticalDate, that.statisticalDate) &&
Objects.equals(reportTime, that.reportTime) &&
Objects.equals(direction, that.direction) &&
Objects.equals(periodType, that.periodType) &&
Objects.equals(stakeMark, that.stakeMark); // 比较各属性值
@ -102,16 +109,7 @@ public class DcTrafficSectionData {
*/
@Override
public int hashCode() {
return Objects.hash(deviceId, statisticalDate, direction, periodType, stakeMark);
}
/**
* 设置统计日期根据不同的统计周期类型来调整日期使其对应周期的起始日期
* @param statisticalDate 统计日期原始日期
*/
public void setStatisticalDate(Date statisticalDate) {
TrafficDataPeriodTypeEnum typeEnum = TrafficDataPeriodTypeEnum.valueOfCode(periodType);
setStatisticalDate(statisticalDate, typeEnum);
return Objects.hash(deviceId, reportTime, direction, periodType, stakeMark);
}
/**
@ -144,10 +142,10 @@ public class DcTrafficSectionData {
}
/**
* 根据设备ID统计时间方向时段类型桩号生成一个唯一ID
* 根据设备ID上报时间方向时段类型桩号生成一个唯一ID
*/
public void generateUniqueId() {
String combinedAttributes = deviceId + "_" + DateUtil.format(statisticalDate, "yyyyMMdd_HHmmss") + "_" + direction + "_" + periodType + "_" + stakeMark;
String combinedAttributes = deviceId + "_" + DateUtil.format(reportTime, "yyyyMMdd_HHmmss") + "_" + direction + "_" + periodType + "_" + stakeMark;
this.id = DigestUtils.md5Hex(combinedAttributes);
}

19
zc-business/src/main/java/com/zc/business/enums/DeviceDataCategory.java

@ -0,0 +1,19 @@
package com.zc.business.enums;
import lombok.Getter;
/**
* @author xiepufeng
*/
@Getter
public enum DeviceDataCategory {
REAL_TIME("实时数据"),
HISTORY("历史数据");
private final String description;
DeviceDataCategory(String description) {
this.description = description;
}
}

28
zc-business/src/main/java/com/zc/business/enums/IotProductPropertiesEnum.java

@ -0,0 +1,28 @@
package com.zc.business.enums;
/**
* 产品属性枚举
* @author xiepufeng
*/
public enum IotProductPropertiesEnum {
/**
* 一站式产品属性01
*/
ONE_STOP_PRODUCT_01("01"),
/**
* 一站式产品属性11
*/
ONE_STOP_PRODUCT_11("11");
private final String number;
IotProductPropertiesEnum(String number) {
this.number = number;
}
public String getNumber() {
return this.number;
}
}

34
zc-business/src/main/java/com/zc/business/enums/LaneDirection.java

@ -0,0 +1,34 @@
package com.zc.business.enums;
import lombok.Getter;
/**
* 道路方向
* @author xiepufeng
**/
@Getter
public enum LaneDirection {
UPWARD((byte)1, "上行"),
BIDIRECTIONAL((byte) 2, "上下行(双向)"),
DOWNWARD((byte)3, "下行");
private final byte value;
private final String description;
LaneDirection(byte value, String description) {
this.value = value;
this.description = description;
}
public static LaneDirection fromValue(int value) {
for (LaneDirection direction : values()) {
if (direction.getValue() == value) {
return direction;
}
}
throw new IllegalArgumentException("Invalid value for LaneDirection: " + value);
}
}

8
zc-business/src/main/java/com/zc/business/enums/TrafficDataPeriodTypeEnum.java

@ -10,11 +10,11 @@ public enum TrafficDataPeriodTypeEnum {
// 枚举成员:年,关联字节型代码 1 和描述 "年"
YEAR((byte) 1, "年"),
// 枚举成员:月,关联字节型代码 2 和描述 "月"
MONTH((byte) 2, "月"),
// 枚举成员:季,关联字节型代码 3 和描述 "季"
QUARTER((byte) 3, "季"),
QUARTER((byte) 2, "季"),
// 枚举成员:月,关联字节型代码 2 和描述 "月"
MONTH((byte) 3, "月"),
// 枚举成员:日,关联字节型代码 4 和描述 "日"
DAY((byte) 4, "日");

6
zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java

@ -57,8 +57,6 @@ public class DeviceMessageHandler {
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Autowired
private IDcYzsqbdcHmbldDataService dcYzsqbdcHmbldDataService;
/**
* 更新设备状态
@ -112,7 +110,7 @@ public class DeviceMessageHandler {
return;
}
// 一站式情况调查站
// 气象检测器
if (IotProductEnum.WEATHER_DETECTOR.value().equals(productId)) {
weatherDetectorMessageHandle(data);
}
@ -329,7 +327,7 @@ public class DeviceMessageHandler {
* @param msg 设备消息
*/
private void oneStopDeviceMessageHandle(JSONObject msg) {
dcTrafficSectionDataService.processRealtimeMessage(msg);
// dcTrafficSectionDataService.processRealtimeOneStopMessage(msg);
}

4
zc-business/src/main/java/com/zc/business/service/DcTrafficSectionDataService.java

@ -7,9 +7,9 @@ import com.zc.business.domain.DcTrafficSectionData;
public interface DcTrafficSectionDataService extends IService<DcTrafficSectionData> {
/**
* 处理实时接收到的设备消息并将其转换为交通断面统计数据对象并缓存
* 处理实时接收到的一类交流站设备消息并将其转换为交通断面统计数据对象并缓存
*
* @param msg 设备发送的JSON格式实时消息
*/
void processRealtimeMessage(JSONObject msg);
void processRealtimeOneStopMessage(JSONObject msg);
}

406
zc-business/src/main/java/com/zc/business/service/impl/DcTrafficSectionDataServiceImpl.java

@ -1,23 +1,31 @@
package com.zc.business.service.impl;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.redis.RedisCache;
import com.zc.business.constant.RedisKeyConstants;
import com.zc.business.constant.StatisticalRecoveryOffsetTime;
import com.zc.business.controller.DcDeviceController;
import com.zc.business.domain.DcDevice;
import com.zc.business.domain.DcTrafficSectionData;
import com.zc.business.enums.TrafficDataPeriodTypeEnum;
import com.zc.business.enums.*;
import com.zc.business.statistics.cache.*;
import com.zc.business.mapper.DcTrafficSectionDataMapper;
import com.zc.business.service.DcTrafficSectionDataService;
import com.zc.business.statistics.handler.RealtimeTrafficStatistics;
import com.zc.common.core.httpclient.exception.HttpException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.function.Consumer;
/**
@ -30,21 +38,197 @@ public class DcTrafficSectionDataServiceImpl
extends ServiceImpl<DcTrafficSectionDataMapper, DcTrafficSectionData>
implements DcTrafficSectionDataService {
// 日志记录器
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private DcTrafficSectionDataMapper dcTrafficSectionDataMapper;
@Resource
private RedisCache redisCache;
@Resource
private DcDeviceController dcDeviceController;
/**
* 初始化方法用于在对象创建后恢复各种周期的交通数据缓存
* 该方法标注了@PostConstruct注解确保在依赖注入完成后调用
*/
@PostConstruct
public void init() {
// TODO 恢复每天交通数据缓存(es获取数据)
recoveryDailyCache(); // 从es中恢复当天交通数据缓存
recoveryMonthlyCache(); // 恢复每月交通数据缓存
recoveryQuarterlyCache(); // 恢复每季度交通数据缓存
recoveryYearlyCache(); // 恢复每年交通数据缓存
}
/**
* 处理实时接收到的一类交流站设备消息并将其转换为交通断面统计数据对象并缓存
*
* @param msg 设备发送的JSON格式实时消息
*/
@Override
public void processRealtimeOneStopMessage(JSONObject msg) {
// 1. 将设备消息转换为交通断面数据统计定义对象
List<DcTrafficSectionData> dcTrafficSectionDataList = convertToTrafficStatistics(msg, DeviceDataCategory.REAL_TIME);
if (dcTrafficSectionDataList != null && !dcTrafficSectionDataList.isEmpty()) {
// 2. 将转换后的数据添加到缓存中
dcTrafficSectionDataList.forEach(DailyTrafficStatisticsCache::addCacheData);
}
}
/**
* 定义每小时第20分钟执行的任务用于清除过期缓存数据并将缓存中的数据整合后保存至数据库
*/
// @Scheduled(cron = "0 20 * * * ?") // 每小时的20分整点执行该任务
public void performHourlyCleanupAndPersist() {
// 清除已过期的缓存数据
DailyTrafficStatisticsCache.clearExpiredData();
MonthlyTrafficStatisticsCache.clearExpiredData();
QuarterlyTrafficStatisticsCache.clearExpiredData();
YearlyTrafficStatisticsCache.clearExpiredData();
// 整合缓存数据并保存至数据库
// 将缓存中的数据按日统计后保存至数据库
// 添加月交通断面数据到缓存中
persistAggregatedData(DailyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.DAY, MonthlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按月统计后保存至数据库
// 添加季度交通断面数据到缓存中
persistAggregatedData(MonthlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.MONTH, QuarterlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按季度统计后保存至数据库
// 添加年交通断面数据到缓存中
persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按年统计后保存至数据库
persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {});
}
/**
* 恢复每日缓存的函数
* 该方法尝试从物联平台获取所有设备信息并对这些信息进行处理
* 如果获取信息失败或处理过程中发生异常则记录错误信息
*/
private void recoveryDailyCache() {
try {
// 尝试从指定产品ID获取设备信息
Map<String, Object> oneStopDeviceMap = dcDeviceController.getDeviceByProductId(IotProductEnum.ONE_STOP_PRODUCT.value());
// 检查获取的设备信息是否为空
if (oneStopDeviceMap == null || oneStopDeviceMap.get("data") == null) {
logger.error("获取一站式设备数据失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value());
return;
}
// 将获取的设备信息转换为JSON数组,并遍历处理每个设备的数据
JSONArray deviceJsonArray = JSONArray.parseArray(oneStopDeviceMap.get("data").toString());
deviceJsonArray.forEach(this::processDeviceData);
} catch (HttpException | IOException e) {
// 记录处理设备数据时发生的异常
logger.error("处理设备数据时发生异常", e);
}
}
/**
* 处理设备数据的函数
* 该方法首先将传入的设备对象转换为JSON对象然后从JSON对象中提取设备ID
* 如果设备ID有效则查询设备的实时流量数据并对获取的设备属性数据进行处理
*
* @param deviceObject 设备对象需要是一个JSONObject包含设备信息
*/
private void processDeviceData(Object deviceObject) {
JSONObject deviceJsonObject = (JSONObject)deviceObject;
// 提取设备ID
String deviceId = deviceJsonObject.getString("id");
// 检查设备ID的有效性
if (deviceId == null || deviceId.isEmpty()) {
logger.error("获取一站式设备id失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value());
return;
}
// 查询并处理设备属性数据
try {
HashMap<String, Object> props = buildPropertiesRequiredParameter();
// 查询设备的实时属性数据
Map<String, Object> deviceProperties = dcDeviceController.queryDeviceProperties(deviceId, IotProductPropertiesEnum.ONE_STOP_PRODUCT_01.getNumber(), props);
// 对获取的设备属性数据进行进一步处理
processDeviceProperties(deviceProperties);
} catch (HttpException | IOException e) {
// 记录处理设备属性数据时发生的异常
logger.error("处理设备属性数据时发生异常,设备id:{}", deviceId, e);
}
}
/**
* 构建必需的属性请求参数集合
* 该方法不接受任何参数返回一个包含特定键值对的HashMap对象
* 键值对表达了查询条件其中键是查询条件的结构化字符串表示值是条件的具体值
*
* @return HashMap<String, Object> 包含查询条件的HashMap对象
*/
private HashMap<String, Object> buildPropertiesRequiredParameter() {
// todo 数据库中查询出最大的时间
HashMap<String, Object> props = new HashMap<>();
// 设置查询条件的键为“timestamp$BTW”,表示时间戳在一定范围内
props.put("terms[0].column", "timestamp$BTW");
ArrayList<String> dateList = new ArrayList<>();
// 添加当前日期的开始和结束时间到列表,用于设定时间范围
dateList.add(DateUtil.beginOfDay(DateUtil.offsetDay(new Date(), StatisticalRecoveryOffsetTime.TRAFFIC_SECTION_DATA_OFFSET_DAY)).toString());
dateList.add(DateUtil.endOfDay(new Date()).toString());
// 将日期列表以逗号分隔并设置为查询条件的值
props.put("terms[0].value", String.join(",", dateList));
props.put("paging", false);
return props;
}
/**
* 处理设备属性信息
* 该方法接收一个设备属性的映射从中提取属性数据并将其转换为交通断面数据统计定义对象列表然后将这些数据添加到缓存中
*
* @param deviceProperties 包含设备属性数据的映射该映射应包含一个名为"data"的键其值是一个包含具体属性信息的JSON对象
*/
private void processDeviceProperties(Map<String, Object> deviceProperties) {
// 检查传入的设备属性映射是否为空,或者其中的"data"键对应的值是否为空
if (deviceProperties == null || deviceProperties.get("data") == null) {
logger.error("获取一站式属性数据失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value());
return;
}
// 从设备属性映射的"data"值中解析出JSON对象
JSONObject propertiesObject = JSONObject.parseObject(deviceProperties.get("data").toString());
// 检查解析出的JSON对象是否包含"data"键
if (propertiesObject.get("data") == null) {
logger.error("获取一站式属性数据失败,产品id:{}", IotProductEnum.ONE_STOP_PRODUCT.value());
return;
}
// 从解析出的JSON对象中获取设备属性数据列表
JSONArray properties = JSONArray.parseArray(propertiesObject.get("data").toString());
// 遍历设备属性数据列表,对每个属性对象进行处理
properties.forEach(propertyObject -> {
JSONObject propertyJsonObject = (JSONObject)propertyObject;
// 将设备消息转换为交通断面数据统计定义对象
List<DcTrafficSectionData> dcTrafficSectionDataList = convertToTrafficStatistics(propertyJsonObject, DeviceDataCategory.HISTORY);
// 如果转换结果非空且不为空列表,则将转换后的数据添加到缓存中
if (dcTrafficSectionDataList != null && !dcTrafficSectionDataList.isEmpty()) {
// 将转换后的数据添加到缓存中
dcTrafficSectionDataList.forEach(DailyTrafficStatisticsCache::addCacheData);
}
});
}
/**
* 恢复每月交通数据缓存的方法
* 通过查询当前月份至今的每日交通数据并将其添加到每月交通统计缓存中
@ -87,60 +271,202 @@ public class DcTrafficSectionDataServiceImpl
dcTrafficSectionDataList.forEach(YearlyTrafficStatisticsCache::addCacheData);
}
/**
* 处理实时接收到的设备消息并将其转换为交通断面统计数据对象并缓存
* 将设备消息转换为交通断面数据统计定义对象
*
* @param msg 设备发送的JSON格式实时消息
* @param msg JSON格式的设备实时消息
* @param category 设备数据类型
* @return 转换后的交通断面数据统计定义对象
*/
@Override
public void processRealtimeMessage(JSONObject msg) {
// 1. 将设备消息转换为交通断面数据统计定义对象
DcTrafficSectionData dcTrafficSectionData = convertToTrafficStatistics(msg);
public List<DcTrafficSectionData> convertToTrafficStatistics(JSONObject msg, DeviceDataCategory category) {
// 获取属性
JSONObject property;
if (category == DeviceDataCategory.REAL_TIME) {
JSONObject properties = msg.getJSONObject("properties");
// 属性数据
if (properties == null) {
logger.error("接收实时属性数据,属性数据不存在,请检查物联网一站式设备数据是否正常");
return null;
}
// 获取属性01的数据
property = properties.getJSONObject(IotProductPropertiesEnum.ONE_STOP_PRODUCT_01.getNumber());
} else {
property = msg.getJSONObject("value");
}
// 判断是否属性01的数据
if (property == null) {
logger.error("非属性01的数据,无法处理,请检查物联网一站式设备数据是否正常");
return null;
}
// 物联设备id
String iotDeviceId = msg.getString("deviceId");
if (iotDeviceId == null || iotDeviceId.isEmpty()) {
logger.error("设备id为空,无法处理,请检查物联网平台一站式设备数据是否正常");
return null;
}
// 上报时间(时间戳)
Long timestamp = msg.getLong("timestamp");
if (timestamp == null || timestamp == 0L) {
logger.error("上报时间为空,无法处理,请检查物联网平台一站式设备数据是否正常");
return null;
}
// 从缓存中获取设备信息
DcDevice dcDevice = redisCache.getCacheMapValue(RedisKeyConstants.DC_DEVICES, iotDeviceId);
if (dcDevice == null) {
logger.error("设备信息不存在,请检查是否配置设备信息,物联平台设备id:{}无对应设备信息", iotDeviceId);
return null;
}
// 2. 将转换后的数据添加到缓存中
// DailyTrafficStatisticsCache.addCacheData(dcTrafficSectionData);
// 上报时间
Date reportTime = DateUtil.date(timestamp);
// 车道信息
JSONArray lanes = property.getJSONArray("lanes");
if (lanes == null || lanes.isEmpty()) {
logger.error("车道信息为空,无法处理,请检查物联网平台一站式设备数据是否正常");
return null;
}
// 分别处理上行和下行数据
return new ArrayList<>(processLaneData(lanes, dcDevice, reportTime).values());
}
/**
* 将设备实时消息转换为交通断面数据统计定义对象
* 处理车道数据计算上行和下行的车流量和累计平均速度
*
* @param msg JSON格式的设备实时消息
* @return 转换后的交通断面数据统计定义对象
* @param lanes 车道数据的JSON数组包含各个车道的信息
* @param dcDevice 设备信息用于标识数据来源
* @param reportTime 报告时间标识数据的时间戳
* @return 返回一个Map包含上行和下行的交通段数据
*/
public DcTrafficSectionData convertToTrafficStatistics(JSONObject msg) {
private Map<LaneDirection, DcTrafficSectionData> processLaneData(JSONArray lanes, DcDevice dcDevice, Date reportTime) {
Map<LaneDirection, DcTrafficSectionData> resultMap = new HashMap<>();
// 初始化上行和下行的交通段数据
DcTrafficSectionData upwardData = new DcTrafficSectionData();
DcTrafficSectionData downwardData = new DcTrafficSectionData();
initializeTrafficSectionData(upwardData, dcDevice, reportTime, LaneDirection.UPWARD);
initializeTrafficSectionData(downwardData, dcDevice, reportTime, LaneDirection.DOWNWARD);
// 初始化上行和下行的车流量和累计平均速度
int upwardTrafficVolume = 0, downwardTrafficVolume = 0;
double upwardCumulativeAverageSpeed = 0.0, downwardCumulativeAverageSpeed = 0.0;
// 遍历车道信息,计算上行和下行的车流量和累计平均速度
for (Object lane : lanes) {
// 解析车道数据
JSONObject laneData = (JSONObject) lane;
// 获取车道号,并判断是否为下行车道
Integer laneNumber = laneData.getInteger("laneNumber");
boolean isDownward = laneNumber >= 31 || laneNumber == 3;
DcTrafficSectionData dcTrafficSectionData = new DcTrafficSectionData();
// 根据车道方向累加车流量和累计平均速度
int totalTrafficFlow = laneData.getInteger("totalTrafficFlow");
double cumulativeAverageSpeed = calculateCumulativeAverageSpeed(laneData);
// TODO
return dcTrafficSectionData;
if (isDownward) {
downwardTrafficVolume += totalTrafficFlow;
downwardCumulativeAverageSpeed += cumulativeAverageSpeed;
} else {
upwardTrafficVolume += totalTrafficFlow;
upwardCumulativeAverageSpeed += cumulativeAverageSpeed;
}
}
// 设置上行和下行的车流量和累计平均速度
setUpTrafficSectionData(upwardData, upwardTrafficVolume, upwardCumulativeAverageSpeed);
setUpTrafficSectionData(downwardData, downwardTrafficVolume, downwardCumulativeAverageSpeed);
// 将上行和下行的交通段数据放入结果映射中
resultMap.put(LaneDirection.UPWARD, upwardData);
resultMap.put(LaneDirection.DOWNWARD, downwardData);
return resultMap;
}
/**
* 定义每小时第20分钟执行的任务用于清除过期缓存数据并将缓存中的数据整合后保存至数据库
* 初始化交通段数据
* @param data 交通段数据对象用于存储交通数据
* @param dcDevice 设备对象包含设备的详细信息
* @param reportTime 数据上报时间
* @param direction 车道方向
*/
// @Scheduled(cron = "0 20 * * * ?") // 每小时的20分整点执行该任务
public void performHourlyCleanupAndPersist() {
// 清除已过期的缓存数据
DailyTrafficStatisticsCache.clearExpiredData();
MonthlyTrafficStatisticsCache.clearExpiredData();
QuarterlyTrafficStatisticsCache.clearExpiredData();
YearlyTrafficStatisticsCache.clearExpiredData();
// 整合缓存数据并保存至数据库
// 将缓存中的数据按日统计后保存至数据库
// 添加月交通断面数据到缓存中
persistAggregatedData(DailyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.DAY, MonthlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按月统计后保存至数据库
// 添加季度交通断面数据到缓存中
persistAggregatedData(MonthlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.MONTH, QuarterlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按季度统计后保存至数据库
// 添加年交通断面数据到缓存中
persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData);
// 将缓存中的数据按年统计后保存至数据库
persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {});
private void initializeTrafficSectionData(DcTrafficSectionData data, DcDevice dcDevice, Date reportTime, LaneDirection direction) {
// 设置设备id
data.setDeviceId(dcDevice.getId());
// 设置车道方向
data.setDirection(direction.getValue());
// 设置上报时间
data.setReportTime(reportTime);
// 设置数据上报时间
data.setStatisticalDate(reportTime);
// 设置统计时段类型为日
data.setPeriodType(TrafficDataPeriodTypeEnum.DAY);
// 将设备的桩号转换为整数后设置
data.setStakeMark(dcDevice.stakeMarkToInt());
}
/**
* 设置交通路段数据
* 该方法用于根据给定的交通量和累积平均速度设置交通路段的数据
*
* @param data 交通路段数据对象用于存储交通路段的相关信息
* @param trafficVolume 该路段的交通量单位通常为车辆数
* @param cumulativeAverageSpeed 该路段的累积平均速度单位通常为千米/小时
*/
private void setUpTrafficSectionData(DcTrafficSectionData data, int trafficVolume, double cumulativeAverageSpeed) {
data.setTrafficVolume(trafficVolume); // 设置交通量
data.setAverageSpeed(0);
if (trafficVolume != 0) { // 当交通量不为0时,计算并设置平均速度
data.setAverageSpeed((int) Math.round(cumulativeAverageSpeed / trafficVolume)); // 平均速度 = 累积平均速度 / 交通量
}
}
/**
* 计算给定车道数据的累计平均速度
* 该函数根据laneData中提供的各种车辆类型的数量和平均速度计算出累计的平均速度
*
* @param laneData 包含车道数据的JSONObject对象其中包含了各种车辆类型的数量和平均速度等数据
* @return 返回计算出的累计平均速度
*/
private int calculateCumulativeAverageSpeed(JSONObject laneData) {
// 根据laneData中的数据计算累计平均速度
// 累加平均速度(中小客车)
return laneData.getInteger("trafficNumberOfInAndSmall") * laneData.getInteger("inAndSmallAverageVehicleSpeed") +
// 累加平均速度(小型货车)
laneData.getInteger("trafficVolumeOfSmallTrucks") * laneData.getInteger("smallTrucksAverageVehicleSpeed") +
// 累加平均速度(大客车)
laneData.getInteger("busTrafficVolume") * laneData.getInteger("averageSpeedOfBus") +
// 累加平均速度(中型货车)
laneData.getInteger("mediumTruckTrafficVolume") * laneData.getInteger("averageSpeedOfMediumSizeTrucks") +
// 累加平均速度(大型货车)
laneData.getInteger("largeTruckTrafficVolume") * laneData.getInteger("averageSpeedOfLargeTrucks") +
// 累加平均速度(特大型货车)
laneData.getInteger("extraLargeTrucksTrafficVolume") * laneData.getInteger("averageSpeedOfExtraLargeTrucks") +
// 累加平均速度(集装箱车)
laneData.getInteger("containerTruckTrafficVolume") * laneData.getInteger("averageSpeedOfContainerTruck") +
// 累加平均速度(拖拉机)
laneData.getInteger("tractorTrafficVolume") * laneData.getInteger("averageSpeedOfTractor") +
// 累加平均速度(摩托车)
laneData.getInteger("motorcycleTrafficVolume") * laneData.getInteger("averageSpeedOfMotorcycle");
}
/**

5
zc-business/src/main/java/com/zc/business/statistics/cache/DailyTrafficStatisticsCache.java

@ -63,6 +63,11 @@ public class DailyTrafficStatisticsCache extends AbstractTrafficStatisticsCache
instance.setStatisticalDateStr(formattedDate);
}
// 更新上报时间
dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate());
// 更新数据周期类型
dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.DAY);
// 更新统计日期
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.DAY);
// 将新数据添加到数据列表中

5
zc-business/src/main/java/com/zc/business/statistics/cache/MonthlyTrafficStatisticsCache.java

@ -63,6 +63,11 @@ public class MonthlyTrafficStatisticsCache extends AbstractTrafficStatisticsCach
instance.setStatisticalDateStr(formattedDate);
}
// 更新上报时间
dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate());
// 更新数据周期类型
dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.MONTH);
// 更新统计日期
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.MONTH);
// 将新数据添加到数据列表中

5
zc-business/src/main/java/com/zc/business/statistics/cache/QuarterlyTrafficStatisticsCache.java

@ -66,6 +66,11 @@ public class QuarterlyTrafficStatisticsCache extends AbstractTrafficStatisticsCa
instance.setStatisticalDateStr(formattedDate);
}
// 更新上报时间
dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate());
// 更新数据周期类型
dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.QUARTER);
// 更新统计日期
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.QUARTER);
// 将新数据添加到数据列表中

5
zc-business/src/main/java/com/zc/business/statistics/cache/YearlyTrafficStatisticsCache.java

@ -66,6 +66,11 @@ public class YearlyTrafficStatisticsCache extends AbstractTrafficStatisticsCache
instance.setStatisticalDateStr(formattedDate);
}
// 更新上报时间
dcTrafficSectionData.setReportTime(dcTrafficSectionData.getStatisticalDate());
// 更新数据周期类型
dcTrafficSectionData.setPeriodType(TrafficDataPeriodTypeEnum.YEAR);
// 更新统计日期
dcTrafficSectionData.setStatisticalDate(dcTrafficSectionData.getStatisticalDate(), TrafficDataPeriodTypeEnum.YEAR);
// 将新数据添加到数据列表中

2
zc-business/src/main/java/com/zc/business/statistics/handler/RealtimeTrafficStatistics.java

@ -46,6 +46,8 @@ public class RealtimeTrafficStatistics {
aggregatedData.setStakeMark(firstDcTrafficSectionData.getStakeMark());
// 道路方向
aggregatedData.setDirection(firstDcTrafficSectionData.getDirection());
// 上报时间
aggregatedData.setReportTime(firstDcTrafficSectionData.getReportTime());
// 计算平均车速并设置到汇总统计对象中
if (trafficVolume != 0) {

Loading…
Cancel
Save