|
|
@ -1,6 +1,8 @@ |
|
|
|
package com.zc.business.controller; |
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
import java.text.ParseException; |
|
|
|
import java.text.SimpleDateFormat; |
|
|
|
import java.util.*; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
import javax.annotation.Resource; |
|
|
@ -11,7 +13,9 @@ import cn.hutool.core.date.DateUtil; |
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.ruoyi.common.core.redis.RedisCache; |
|
|
|
import com.ruoyi.system.domain.SysLogininfor; |
|
|
|
import com.zc.business.constant.RedisKeyConstants; |
|
|
|
import com.zc.business.domain.*; |
|
|
|
import com.zc.business.enums.UniversalEnum; |
|
|
|
import com.zc.common.core.httpclient.exception.HttpException; |
|
|
@ -55,8 +59,20 @@ public class DcTrafficSurveyDataController extends BaseController |
|
|
|
@Resource |
|
|
|
private DcDeviceController dcDeviceController; |
|
|
|
|
|
|
|
@Scheduled(cron = "0 5 * * * ?") |
|
|
|
public void syncTrafficSectionData() throws HttpException, IOException { |
|
|
|
@Resource |
|
|
|
private RedisCache redisCache; |
|
|
|
|
|
|
|
/** |
|
|
|
* 每小时执行一次 |
|
|
|
* 如果请求失败则将当前设备、时间范围放入redis |
|
|
|
* 下次执行时先处理redis中之前未成功的,最多执行三次 |
|
|
|
*/ |
|
|
|
@Scheduled(cron = "0 13 * * * ?") |
|
|
|
public void syncTrafficSectionData() throws HttpException, IOException, ParseException { |
|
|
|
|
|
|
|
|
|
|
|
//先执行之前不成功的
|
|
|
|
executeFailedData(); |
|
|
|
|
|
|
|
HashMap<String, Object> props = new HashMap<>(); |
|
|
|
// 设置查询条件的键为“timestamp$BTW”,表示时间戳在一定范围内
|
|
|
@ -68,8 +84,10 @@ public class DcTrafficSurveyDataController extends BaseController |
|
|
|
Date lastHourStart = DateUtil.beginOfHour(DateUtil.offsetHour(now, -1)); |
|
|
|
Date lastHourEnd = DateUtil.endOfHour(DateUtil.offsetHour(now, -1)); |
|
|
|
// 将上一个小时的开始和结束时间添加到列表
|
|
|
|
dateList.add(DateUtil.format(lastHourStart, "yyyy-MM-dd HH:mm:ss")); |
|
|
|
dateList.add(DateUtil.format(lastHourEnd, "yyyy-MM-dd HH:mm:ss")); |
|
|
|
String startTime = DateUtil.format(lastHourStart, "yyyy-MM-dd HH:mm:ss"); |
|
|
|
String endTime = DateUtil.format(lastHourEnd, "yyyy-MM-dd HH:mm:ss"); |
|
|
|
dateList.add(startTime); |
|
|
|
dateList.add(endTime); |
|
|
|
// 将日期列表以逗号分隔并设置为查询条件的值
|
|
|
|
props.put("terms[0].value", String.join(UniversalEnum.COMMA.getValue(), dateList)); |
|
|
|
props.put("paging", false); |
|
|
@ -83,7 +101,21 @@ public class DcTrafficSurveyDataController extends BaseController |
|
|
|
List<DcTrafficSurveyData> batchData = new ArrayList<>(); |
|
|
|
for (DcDevice dcDevice : deviceList) { |
|
|
|
|
|
|
|
Object data = JSON.parseObject(dcDeviceController.queryDeviceProperties(dcDevice.getIotDeviceId(), propertyId, props).get("data").toString()).get("data"); |
|
|
|
Object data = null; |
|
|
|
AjaxResult ajaxResult = dcDeviceController.queryDeviceProperties(dcDevice.getIotDeviceId(), propertyId, props); |
|
|
|
if (ajaxResult.get("code").equals(UniversalEnum.TWO_HUNDRED.getNumber()) && ajaxResult.containsKey("data") && ajaxResult.get("data") != null) { |
|
|
|
data = JSON.parseObject(ajaxResult.get("data").toString()).get("data"); |
|
|
|
} else { |
|
|
|
//超时、错误等异常状态,放入缓存下次定时执行时再次请求一遍
|
|
|
|
Map<String,Object> cacheMap = new HashMap<>(); |
|
|
|
cacheMap.put("iotDeviceId",dcDevice.getIotDeviceId()); |
|
|
|
cacheMap.put("stakeMark",dcDevice.getStakeMark()); |
|
|
|
cacheMap.put("startTime",startTime); |
|
|
|
cacheMap.put("endTime",endTime); |
|
|
|
cacheMap.put("count",3); //剩余请求次数
|
|
|
|
redisCache.setCacheMapValue(RedisKeyConstants.TRAFFIC_SURVEY_HOURS_DATA,dcDevice.getIotDeviceId()+startTime,cacheMap); |
|
|
|
break; |
|
|
|
} |
|
|
|
JSONArray dataArray = JSON.parseArray(data.toString()); |
|
|
|
if (dataArray == null || dataArray.size() < 1){ |
|
|
|
break; |
|
|
@ -151,6 +183,122 @@ public class DcTrafficSurveyDataController extends BaseController |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* @Description 再次执行之前未成功请求的 |
|
|
|
* |
|
|
|
* @author liuwenge |
|
|
|
* @date 2024/12/25 9:36 |
|
|
|
* @param |
|
|
|
* @return void |
|
|
|
*/ |
|
|
|
public void executeFailedData() throws IOException, HttpException, ParseException { |
|
|
|
Map<String,Map<String,Object>> cache = redisCache.getCacheMap(RedisKeyConstants.TRAFFIC_SURVEY_HOURS_DATA); |
|
|
|
if (cache == null || cache.size() == 0) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
|
|
List<DcTrafficSurveyData> batchData = new ArrayList<>(); |
|
|
|
for (Map<String, Object> value : cache.values()) { |
|
|
|
|
|
|
|
String iotDeviceId = value.get("iotDeviceId").toString(); |
|
|
|
String stakeMark = value.get("stakeMark").toString(); |
|
|
|
String startTime = value.get("startTime").toString(); |
|
|
|
String endTime = value.get("endTime").toString(); |
|
|
|
int count = Integer.parseInt(value.get("count").toString()); |
|
|
|
|
|
|
|
HashMap<String, Object> props = new HashMap<>(); |
|
|
|
// 设置查询条件的键为“timestamp$BTW”,表示时间戳在一定范围内
|
|
|
|
props.put("terms[0].column", "timestamp$BTW"); |
|
|
|
ArrayList<String> dateList = new ArrayList<>(); |
|
|
|
dateList.add(startTime); |
|
|
|
dateList.add(endTime); |
|
|
|
// 将日期列表以逗号分隔并设置为查询条件的值
|
|
|
|
props.put("terms[0].value", String.join(UniversalEnum.COMMA.getValue(), dateList)); |
|
|
|
props.put("paging", false); |
|
|
|
props.put("sorts[0].order", "asc"); |
|
|
|
props.put("sorts[0].name", "timestamp"); |
|
|
|
|
|
|
|
Object data = null; |
|
|
|
AjaxResult ajaxResult = dcDeviceController.queryDeviceProperties(iotDeviceId, "01", props); |
|
|
|
if (ajaxResult.get("code").equals(UniversalEnum.TWO_HUNDRED.getNumber()) && ajaxResult.containsKey("data") && ajaxResult.get("data") != null) { |
|
|
|
data = JSON.parseObject(ajaxResult.get("data").toString()).get("data"); |
|
|
|
redisCache.delCacheMapValue(RedisKeyConstants.TRAFFIC_SURVEY_HOURS_DATA,iotDeviceId+startTime); |
|
|
|
} else { |
|
|
|
count--; |
|
|
|
if (count < 1){ |
|
|
|
redisCache.delCacheMapValue(RedisKeyConstants.TRAFFIC_SURVEY_HOURS_DATA,iotDeviceId+startTime); |
|
|
|
} else { |
|
|
|
value.put("count",count); |
|
|
|
redisCache.setCacheMapValue(RedisKeyConstants.TRAFFIC_SURVEY_HOURS_DATA,iotDeviceId+startTime,value); |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
JSONArray dataArray = JSON.parseArray(data.toString()); |
|
|
|
if (dataArray == null || dataArray.size() < 1){ |
|
|
|
break; |
|
|
|
} |
|
|
|
Integer hezeTotal = 0; |
|
|
|
Integer jinanTotal = 0; |
|
|
|
//一小时内所有的车道数据
|
|
|
|
JSONArray lanesData = new JSONArray(); |
|
|
|
for (Object o : dataArray) { |
|
|
|
JSONObject jsonObject = JSON.parseObject(o.toString()); |
|
|
|
JSONObject formatValue = JSON.parseObject(jsonObject.get("formatValue").toString()); |
|
|
|
hezeTotal += Integer.parseInt(formatValue.get("1").toString()); |
|
|
|
jinanTotal += Integer.parseInt(formatValue.get("3").toString()); |
|
|
|
|
|
|
|
//车道数据
|
|
|
|
JSONArray lanes = formatValue.getJSONArray("lanes"); |
|
|
|
lanesData.addAll(lanes); |
|
|
|
} |
|
|
|
|
|
|
|
// 转换类型 以便使用stream
|
|
|
|
List<JSONObject> lanesList = lanesData.toJavaList(JSONObject.class); |
|
|
|
Map<String, JSONArray> dataList = lanesList.stream() |
|
|
|
.collect(Collectors.groupingBy( |
|
|
|
item -> item.getString("laneNumber").substring(0, 1), // 提取 laneNumber 的第一个字符作为 key
|
|
|
|
Collectors.mapping(item -> item, |
|
|
|
Collectors.collectingAndThen( |
|
|
|
Collectors.toList(), |
|
|
|
list -> { // 然后将 List<JSONObject> 转换为 JSONArray
|
|
|
|
JSONArray jsonArray = new JSONArray(); |
|
|
|
jsonArray.addAll(list); |
|
|
|
return jsonArray; |
|
|
|
} |
|
|
|
) |
|
|
|
) |
|
|
|
)); |
|
|
|
|
|
|
|
//菏泽方向数据
|
|
|
|
DcTrafficSurveyData hezeData = new DcTrafficSurveyData(); |
|
|
|
hezeData.setIotDeviceId(iotDeviceId); |
|
|
|
hezeData.setStakeMark(stakeMark); |
|
|
|
hezeData.setDirection("1"); |
|
|
|
hezeData.setTimestamp(formatter.parse(startTime)); |
|
|
|
hezeData.setTrafficVolume(Long.valueOf(hezeTotal)); |
|
|
|
//济南方向数据
|
|
|
|
DcTrafficSurveyData jinanData = new DcTrafficSurveyData(); |
|
|
|
jinanData.setIotDeviceId(iotDeviceId); |
|
|
|
jinanData.setStakeMark(stakeMark); |
|
|
|
jinanData.setDirection("3"); |
|
|
|
jinanData.setTimestamp(formatter.parse(startTime)); |
|
|
|
jinanData.setTrafficVolume(Long.valueOf(jinanTotal)); |
|
|
|
//各方向车道级数据
|
|
|
|
if (dataList != null && dataList.size() > 0){ |
|
|
|
JSONArray hezeLanesData = dataList.get("1"); |
|
|
|
dcTrafficSurveyDataService.formatTrafficSurveyData(hezeData,hezeLanesData); |
|
|
|
JSONArray jinanLanesData = dataList.get("3"); |
|
|
|
dcTrafficSurveyDataService.formatTrafficSurveyData(jinanData,jinanLanesData); |
|
|
|
} |
|
|
|
|
|
|
|
batchData.add(hezeData); |
|
|
|
batchData.add(jinanData); |
|
|
|
} |
|
|
|
dcTrafficSurveyDataService.batchInsert(batchData); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 查询一类交调数据列表 |
|
|
|
*/ |
|
|
|