|
|
@ -1,9 +1,9 @@ |
|
|
|
package com.zc.business.service.impl; |
|
|
|
|
|
|
|
import cn.hutool.core.date.DateUtil; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
|
|
|
import com.ruoyi.common.utils.DateUtils; |
|
|
|
import com.zc.business.domain.DcTrafficSectionData; |
|
|
|
import com.zc.business.enums.TrafficDataPeriodTypeEnum; |
|
|
|
import com.zc.business.statistics.cache.*; |
|
|
@ -13,12 +13,16 @@ import com.zc.business.statistics.handler.RealtimeTrafficStatistics; |
|
|
|
import org.springframework.scheduling.annotation.Scheduled; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.Resource; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
/** |
|
|
|
* 通断面数据服务实现类,负责处理实时设备消息、缓存数据、定时任务以及数据保存等功能。 |
|
|
|
* 交通断面数据服务实现类,负责处理实时设备消息、缓存数据、定时任务以及数据保存等功能。 |
|
|
|
* |
|
|
|
* @author xiepufeng |
|
|
|
*/ |
|
|
|
@Service |
|
|
@ -29,6 +33,60 @@ public class DcTrafficSectionDataServiceImpl |
|
|
|
@Resource |
|
|
|
private DcTrafficSectionDataMapper dcTrafficSectionDataMapper; |
|
|
|
|
|
|
|
/** |
|
|
|
* 初始化方法,用于在对象创建后恢复各种周期的交通数据缓存。 |
|
|
|
* 该方法标注了@PostConstruct注解,确保在依赖注入完成后调用。 |
|
|
|
*/ |
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
// TODO 恢复每天交通数据缓存(es获取数据)
|
|
|
|
recoveryMonthlyCache(); // 恢复每月交通数据缓存
|
|
|
|
recoveryQuarterlyCache(); // 恢复每季度交通数据缓存
|
|
|
|
recoveryYearlyCache(); // 恢复每年交通数据缓存
|
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 恢复每月交通数据缓存的方法。 |
|
|
|
* 通过查询当前月份至今的每日交通数据,并将其添加到每月交通统计缓存中。 |
|
|
|
*/ |
|
|
|
private void recoveryMonthlyCache() { |
|
|
|
// 构建查询条件,查询当前月份至今的每日交通数据
|
|
|
|
LambdaQueryWrapper<DcTrafficSectionData> queryWrapper = new LambdaQueryWrapper<>(); |
|
|
|
queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.DAY); |
|
|
|
queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfMonth(new Date()), new Date()); |
|
|
|
List<DcTrafficSectionData> dcTrafficSectionDataList = this.list(queryWrapper); |
|
|
|
// 遍历查询结果,将每日数据添加到每月交通统计缓存
|
|
|
|
dcTrafficSectionDataList.forEach(MonthlyTrafficStatisticsCache::addCacheData); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 恢复每季度交通数据缓存的方法。 |
|
|
|
* 通过查询当前季度至今的每月交通数据,并将其添加到每季度交通统计缓存中。 |
|
|
|
*/ |
|
|
|
private void recoveryQuarterlyCache() { |
|
|
|
// 构建查询条件,查询当前季度至今的每月交通数据
|
|
|
|
LambdaQueryWrapper<DcTrafficSectionData> queryWrapper = new LambdaQueryWrapper<>(); |
|
|
|
queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.MONTH); |
|
|
|
queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfQuarter(new Date()), new Date()); |
|
|
|
List<DcTrafficSectionData> dcTrafficSectionDataList = this.list(queryWrapper); |
|
|
|
// 遍历查询结果,将每月数据添加到每季度交通统计缓存
|
|
|
|
dcTrafficSectionDataList.forEach(QuarterlyTrafficStatisticsCache::addCacheData); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 恢复每年交通数据缓存的方法。 |
|
|
|
* 通过查询当前年份至今的每季度交通数据,并将其添加到每年交通统计缓存中。 |
|
|
|
*/ |
|
|
|
private void recoveryYearlyCache() { |
|
|
|
// 构建查询条件,查询当前年份至今的每季度交通数据
|
|
|
|
LambdaQueryWrapper<DcTrafficSectionData> queryWrapper = new LambdaQueryWrapper<>(); |
|
|
|
queryWrapper.eq(DcTrafficSectionData::getPeriodType, TrafficDataPeriodTypeEnum.QUARTER); |
|
|
|
queryWrapper.between(DcTrafficSectionData::getStatisticalDate, DateUtil.beginOfYear(new Date()), new Date()); |
|
|
|
List<DcTrafficSectionData> dcTrafficSectionDataList = this.list(queryWrapper); |
|
|
|
// 遍历查询结果,将每季度数据添加到每年交通统计缓存
|
|
|
|
dcTrafficSectionDataList.forEach(YearlyTrafficStatisticsCache::addCacheData); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 处理实时接收到的设备消息,并将其转换为交通断面统计数据对象并缓存。 |
|
|
|
* |
|
|
@ -79,7 +137,7 @@ public class DcTrafficSectionDataServiceImpl |
|
|
|
// 添加年交通断面数据到缓存中
|
|
|
|
persistAggregatedData(QuarterlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.QUARTER, YearlyTrafficStatisticsCache::addCacheData); |
|
|
|
// 将缓存中的数据按年统计后保存至数据库
|
|
|
|
persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {}); |
|
|
|
persistAggregatedData(YearlyTrafficStatisticsCache.getCache(), TrafficDataPeriodTypeEnum.YEAR, (a) -> {}); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
@ -105,69 +163,4 @@ public class DcTrafficSectionDataServiceImpl |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 将计算给定列表中所有交通断面数据的统计结果保存至数据库 |
|
|
|
*/ |
|
|
|
public boolean persistData(DcTrafficSectionData aggregatedData) { |
|
|
|
|
|
|
|
if (aggregatedData == null) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
// 创建更新条件封装器
|
|
|
|
LambdaUpdateWrapper<DcTrafficSectionData> updateWrapper = new LambdaUpdateWrapper<>(); |
|
|
|
|
|
|
|
// 设置更新条件:根据设备ID、方向、时段类型、桩号、统计时间查找记录
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getDeviceId, aggregatedData.getDeviceId()); |
|
|
|
// 方向
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getDirection, aggregatedData.getDirection()); |
|
|
|
// 时段类型
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getPeriodType, aggregatedData.getPeriodType()); |
|
|
|
// 桩号
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getStakeMark, aggregatedData.getStakeMark()); |
|
|
|
// 统计时间
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getStatisticalDate, aggregatedData.getStatisticalDate()); |
|
|
|
|
|
|
|
|
|
|
|
if (this.update(aggregatedData, updateWrapper)) { |
|
|
|
return true; |
|
|
|
} else { |
|
|
|
// 更新失败则尝试插入
|
|
|
|
aggregatedData.setCreateTime(DateUtils.getNowDate()); |
|
|
|
return this.save(aggregatedData); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* public boolean persistData(DcTrafficSectionData aggregatedData) { |
|
|
|
|
|
|
|
if (aggregatedData == null) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
// 创建更新条件封装器
|
|
|
|
LambdaUpdateWrapper<DcTrafficSectionData> updateWrapper = new LambdaUpdateWrapper<>(); |
|
|
|
|
|
|
|
// 设置更新条件:根据设备ID、方向、时段类型、桩号、统计时间查找记录
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getDeviceId, aggregatedData.getDeviceId()); |
|
|
|
// 方向
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getDirection, aggregatedData.getDirection()); |
|
|
|
// 时段类型
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getPeriodType, aggregatedData.getPeriodType()); |
|
|
|
// 桩号
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getStakeMark, aggregatedData.getStakeMark()); |
|
|
|
// 统计时间
|
|
|
|
updateWrapper.eq(DcTrafficSectionData::getStatisticalDate, aggregatedData.getStatisticalDate()); |
|
|
|
|
|
|
|
|
|
|
|
if (this.update(aggregatedData, updateWrapper)) { |
|
|
|
return true; |
|
|
|
} else { |
|
|
|
// 更新失败则尝试插入
|
|
|
|
aggregatedData.setCreateTime(DateUtils.getNowDate()); |
|
|
|
return this.save(aggregatedData); |
|
|
|
} |
|
|
|
}*/ |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|