Browse Source

交通流接口加入重试

develop
王兴琳 9 months ago
parent
commit
51e0f1bf80
  1. 64
      zc-business/src/main/java/com/zc/business/controller/FTPDeletion.java
  2. 198
      zc-business/src/main/java/com/zc/business/service/impl/DcTrafficStatisticsServiceImpl.java
  3. 53
      zc-business/src/main/java/com/zc/business/utils/RetryInterceptorUtil.java

64
zc-business/src/main/java/com/zc/business/controller/FTPDeletion.java

@ -1,10 +1,17 @@
package com.zc.business.controller; package com.zc.business.controller;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.system.service.ISysConfigService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPFile;
@ -13,11 +20,13 @@ import org.apache.commons.net.ftp.FTPFile;
* *
*/ */
@Component("FTPDeletion") @Component()
@Slf4j @Slf4j
public class FTPDeletion { public class FTPDeletion {
ISysConfigService configService = SpringUtils.getBean(ISysConfigService.class);
private static int retentionDays = 60; // 默认保存天数为60天 /*
private static int retentionDays = 90; // 默认保存天数为60天
public void deleteEventFile() { public void deleteEventFile() {
log.info("定时任务执行,当前保存天气设置天数:"+retentionDays+"当前时间:"+java.time.LocalTime.now()); log.info("定时任务执行,当前保存天气设置天数:"+retentionDays+"当前时间:"+java.time.LocalTime.now());
new FTPDeletion().remoteFileDeletion(); new FTPDeletion().remoteFileDeletion();
@ -30,28 +39,51 @@ public class FTPDeletion {
//获取当前保存天数 //获取当前保存天数
public int getRetentionDays() { public int getRetentionDays() {
return retentionDays; return retentionDays;
} }*/
// 递归方法来处理文件和目录的删除 // 递归方法来处理文件和目录的删除
private void deleteDirectoryRecursively(FTPClient ftpClient, String parentDirPath) throws IOException { private void deleteDirectoryRecursively(FTPClient ftpClient, String parentDirPath) throws IOException {
int retentionDays= Integer.parseInt(configService.selectConfigByKey("EVENT-TIME"));//密钥
ftpClient.setControlEncoding("GBK"); ftpClient.setControlEncoding("GBK");
FTPFile[] files = ftpClient.listFiles(parentDirPath); FTPFile[] files = ftpClient.listFiles(parentDirPath);
//选择要保留的天数 //选择要保留的天数
Instant thirtyDaysAgo = Instant.now().minus(retentionDays, ChronoUnit.DAYS); Instant thirtyDaysAgo = Instant.now().minus(retentionDays, ChronoUnit.DAYS);
log.info("当前设置的文件保存天数为:"+FTPDeletion.retentionDays+"当前时间:"+java.time.LocalTime.now());
// 转换为本地日期
LocalDate localDate = thirtyDaysAgo.atZone(ZoneOffset.UTC).toLocalDate();
// 获取 30 天前当天的零点时间
Instant thirtyDaysAgoStartOfDay = localDate.atStartOfDay(ZoneOffset.UTC).toInstant();
log.info("当前设置的文件保存天数为:"+retentionDays+"当前时间:"+java.time.LocalTime.now());
for (FTPFile file : files) { for (FTPFile file : files) {
String filePath = parentDirPath + "/" + file.getName(); String filePath = parentDirPath + "/" + file.getName();
if (file.isDirectory()) { if (file.isDirectory()) {
System.out.println("地址"+filePath);
// 如果是目录,则递归调用 // 如果是目录,则递归调用
// 删除30天前的文件
Instant lastModifiedTime = file.getTimestamp().toInstant();
String[] split = filePath.split("交通事件");
if (split.length>1){
if (!lastModifiedTime.isBefore(thirtyDaysAgoStartOfDay)){
System.out.println("跳回天====="+filePath);
continue;
}
}
if (filePath.contains("ATTACHFILE")){
continue;
}
deleteDirectoryRecursively(ftpClient, filePath); deleteDirectoryRecursively(ftpClient, filePath);
} else { } else {
// 排除包含特定关键词的文件名 // 排除包含特定关键词的文件名
if (!file.getName().contains("事故")) { if (!file.getName().contains("事故")) {
// 删除30天前的文件 // 删除30天前的文件
Instant lastModifiedTime = file.getTimestamp().toInstant(); Instant lastModifiedTime = file.getTimestamp().toInstant();
if (lastModifiedTime.isBefore(thirtyDaysAgo)) { if (lastModifiedTime.isBefore(thirtyDaysAgoStartOfDay)) {
boolean deleted = ftpClient.deleteFile(filePath); boolean deleted = ftpClient.deleteFile(filePath);
if (deleted) { if (deleted) {
log.info("已删除文件:"+filePath); log.info("已删除文件:"+filePath);
@ -60,6 +92,10 @@ public class FTPDeletion {
log.info("无法删除文件:"+filePath); log.info("无法删除文件:"+filePath);
} }
}else {
System.out.println("跳出循环-------------------------------------");
continue;
} }
} else { } else {
log.info("文件名包含关键词'事故',跳过删除: :"+file.getName()); log.info("文件名包含关键词'事故',跳过删除: :"+file.getName());
@ -81,13 +117,21 @@ public class FTPDeletion {
} }
} }
} }
public void remoteFileDeletion() { public void remoteFileDeletion() {
String server = "192.168.3.1"; //10.0.111.12
int port = 21; String server= configService.selectConfigByKey("FTP-IP");//密钥
String user = "1911390090@qq.com"; //String server = "192.168.3.1";
String password = "989878wxl";
// int port = 21;
int port= Integer.parseInt(configService.selectConfigByKey("FTP-PORT"));
//ftpuser
//String user = "1911390090@qq.com";
String user= configService.selectConfigByKey("FTP-USER");//密钥
//Dxc123!@#
String password= configService.selectConfigByKey("FPT-PASSWORD");//密钥
//String password = "989878wxl";
try { try {
FTPClient ftpClient = new FTPClient(); FTPClient ftpClient = new FTPClient();
ftpClient.setControlEncoding("GBK"); ftpClient.setControlEncoding("GBK");

198
zc-business/src/main/java/com/zc/business/service/impl/DcTrafficStatisticsServiceImpl.java

@ -1,10 +1,10 @@
package com.zc.business.service.impl; package com.zc.business.service.impl;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.gson.Gson;
import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.common.utils.spring.SpringUtils;
import com.zc.business.constant.RedisKeyConstants; import com.zc.business.constant.RedisKeyConstants;
@ -17,14 +17,12 @@ import com.zc.business.enums.UniversalEnum;
import com.zc.business.service.IDcFacilityService; import com.zc.business.service.IDcFacilityService;
import com.zc.business.service.IDcRoadSectionService; import com.zc.business.service.IDcRoadSectionService;
import com.zc.business.service.IDcTrafficStatisticsService; import com.zc.business.service.IDcTrafficStatisticsService;
import com.zc.business.utils.RetryInterceptorUtil;
import com.zc.business.utils.StakeMarkUtils; import com.zc.business.utils.StakeMarkUtils;
import com.zc.common.core.httpclient.OkHttp; import com.zc.common.core.httpclient.OkHttp;
import com.zc.common.core.httpclient.exception.HttpException; import com.zc.common.core.httpclient.exception.HttpException;
import com.zc.common.core.httpclient.request.RequestParams; import com.zc.common.core.httpclient.request.RequestParams;
import okhttp3.Call; import okhttp3.*;
import okhttp3.Callback;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -637,6 +635,7 @@ public class DcTrafficStatisticsServiceImpl implements IDcTrafficStatisticsServi
.data(requestParams) // 请求参数 .data(requestParams) // 请求参数
.post(); // 请求方法 .post(); // 请求方法
ResponseBody body = response.body(); ResponseBody body = response.body();
if (body != null) { if (body != null) {
String jsonString = body.string(); String jsonString = body.string();
@ -660,46 +659,98 @@ public class DcTrafficStatisticsServiceImpl implements IDcTrafficStatisticsServi
*/ */
@Override @Override
public JSONArray sectionHourlyTrafficFlow(String startDate, String endDate) throws HttpException, IOException { public JSONArray sectionHourlyTrafficFlow(String startDate, String endDate) throws HttpException, IOException {
OkHttp okHttp = new OkHttp();
RequestParams requestParams = new RequestParams();
//
//
// OkHttp okHttp = new OkHttp();
//
// RequestParams requestParams = new RequestParams();
//
// requestParams.put("sysid", sysid);
//
// JSONObject parameters = new JSONObject() {
// {
// put("start_date", startDate);
// put("end_date", endDate);
// }
// };
//
// requestParams.put("parameters", parameters.toJSONString());
//
// Map<String, String> headers = new HashMap<>();
// headers.put("Authorization", getAccessToken());
// try {
// Response response // 请求响应
// = okHttp
// .headers(headers)
// .url(baseUrl + UniversalEnum.SECTION_HOURLY_TRAFFIC_FLOW_BY_VEHICLE_TYPE.getValue()) // 请求地址
// .data(requestParams) // 请求参数
// .post(); // 请求方法
//
// ResponseBody body = response.body();
// if (body != null) {
// String jsonString = body.string();
// if (JSON.isValidArray(jsonString)) {
// return JSON.parseArray(jsonString);
// }else {
// return new JSONArray();
// }
// }
// return new JSONArray();
// } catch (IOException e) {
// // 处理异常
// e.printStackTrace();
// return new JSONArray();
// }
// 创建OkHttpClient.Builder实例
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
// 添加重试拦截器到OkHttpClient
httpClientBuilder.addInterceptor(new RetryInterceptorUtil());
// 构建最终的OkHttpClient实例
OkHttpClient okHttpClient = httpClientBuilder.build();
// 现在使用带有重试机制的OkHttpClient发起请求
Map<String, Object> requestParams = new HashMap<>();
requestParams.put("sysid", sysid); requestParams.put("sysid", sysid);
JSONObject parameters = new JSONObject() { JSONObject parameters = new JSONObject();
{ parameters.put("start_date", startDate);
put("start_date", startDate); parameters.put("end_date", endDate);
put("end_date", endDate);
}
};
requestParams.put("parameters", parameters.toJSONString()); requestParams.put("parameters", parameters);
Map<String, String> headers = new HashMap<>(); Map<String, String> headers = new HashMap<>();
headers.put("Authorization", getAccessToken()); headers.put("Authorization", getAccessToken());
Gson gson = new Gson();
String requestParamsJson = gson.toJson(requestParams);
// 使用okHttpClient实例发起请求
try { try {
Response response // 请求响应 Request request = new Request.Builder()
= okHttp .url(baseUrl + UniversalEnum.SECTION_HOURLY_TRAFFIC_FLOW_BY_VEHICLE_TYPE.getValue())
.headers(headers) .headers(Headers.of(headers))
.url(baseUrl + UniversalEnum.SECTION_HOURLY_TRAFFIC_FLOW_BY_VEHICLE_TYPE.getValue()) // 请求地址 .post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), requestParamsJson))
.data(requestParams) // 请求参数 .build();
.post(); // 请求方法
ResponseBody body = response.body(); Response response = okHttpClient.newCall(request).execute();
if (body != null) {
String jsonString = body.string(); ResponseBody body = response.body();
if (JSON.isValidArray(jsonString)) { if (body != null) {
return JSON.parseArray(jsonString); String jsonString = body.string();
}else { if (JSON.isValidArray(jsonString)) {
return new JSONArray(); return JSON.parseArray(jsonString);
} else {
return new JSONArray();
}
} }
} return new JSONArray();
return new JSONArray();
} catch (IOException e) { } catch (IOException e) {
// 处理异常
e.printStackTrace(); e.printStackTrace();
return new JSONArray(); return new JSONArray();
} }
} }
/** /**
@ -707,39 +758,38 @@ public class DcTrafficStatisticsServiceImpl implements IDcTrafficStatisticsServi
*/ */
@Override @Override
public List<Map<String, String>> trafficFlowAtTollStationEntrance(String startDate, String endDate, String stationType) throws HttpException, IOException { public List<Map<String, String>> trafficFlowAtTollStationEntrance(String startDate, String endDate, String stationType) throws HttpException, IOException {
OkHttp okHttp = new OkHttp(); // 创建OkHttpClient.Builder实例
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
RequestParams requestParams = new RequestParams(); // 添加重试拦截器到OkHttpClient
httpClientBuilder.addInterceptor(new RetryInterceptorUtil());
requestParams.put("sysid", sysid); // 构建最终的OkHttpClient实例
OkHttpClient okHttpClient = httpClientBuilder.build();
JSONObject parameters = new JSONObject() { // 现在使用带有重试机制的OkHttpClient发起请求
{ Map<String, Object> requestParams = new HashMap<>();
put("start_date", startDate); requestParams.put("sysid", sysid);
put("end_date", endDate); JSONObject parameters = new JSONObject();
put("station_type", stationType); parameters.put("start_date", startDate);
} parameters.put("end_date", endDate);
}; parameters.put("station_type", stationType);
requestParams.put("parameters", parameters.toJSONString());
requestParams.put("parameters", parameters);
Map<String, String> headers = new HashMap<>(); Map<String, String> headers = new HashMap<>();
headers.put("Authorization", getAccessToken()); headers.put("Authorization", getAccessToken());
Gson gson = new Gson();
String requestParamsJson = gson.toJson(requestParams);
try { try {
Response response = okHttp Request request = new Request.Builder()
.headers(headers)
.url(baseUrl + UniversalEnum.EACH_TOLL_STATION_ENTRANCE_BY_TYPE_OF_HOURLY_TRAFFIC_FLOW.getValue()) .url(baseUrl + UniversalEnum.EACH_TOLL_STATION_ENTRANCE_BY_TYPE_OF_HOURLY_TRAFFIC_FLOW.getValue())
.data(requestParams) .headers(Headers.of(headers))
.post(); .post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), requestParamsJson))
// 确保响应成功 .build();
if (!response.isSuccessful()) { Response response = okHttpClient.newCall(request).execute();
throw new IOException("请求不成功,HTTP代码:" + response.code());
}
ResponseBody body = response.body(); ResponseBody body = response.body();
if (body != null) { if (body != null) {
String jsonString = body.string(); String jsonString = body.string();
System.out.println(jsonString);
if (JSON.isValidArray(jsonString)) { if (JSON.isValidArray(jsonString)) {
JSONArray jsonArray = JSON.parseArray(jsonString); JSONArray jsonArray = JSON.parseArray(jsonString);
Map<String, Integer> sumByName = new LinkedHashMap<>(); Map<String, Integer> sumByName = new LinkedHashMap<>();
@ -1189,32 +1239,36 @@ public class DcTrafficStatisticsServiceImpl implements IDcTrafficStatisticsServi
} }
private Response getResponseTrafficFlowAtToll(String startDate, String stationType) throws HttpException, IOException { private Response getResponseTrafficFlowAtToll(String startDate, String stationType) throws HttpException, IOException {
OkHttp okHttp = new OkHttp(); // 创建OkHttpClient.Builder实例
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
RequestParams requestParams = new RequestParams(); // 添加重试拦截器到OkHttpClient
httpClientBuilder.addInterceptor(new RetryInterceptorUtil());
requestParams.put("sysid", sysid); // 构建最终的OkHttpClient实例
OkHttpClient okHttpClient = httpClientBuilder.build();
JSONObject parameters = new JSONObject() { // 现在使用带有重试机制的OkHttpClient发起请求
{ Map<String, Object> requestParams = new HashMap<>();
put("start_date", startDate); requestParams.put("sysid", sysid);
put("end_date", startDate); JSONObject parameters = new JSONObject();
put("station_type", stationType); parameters.put("start_date", startDate);
} parameters.put("end_date", startDate);
}; parameters.put("station_type", stationType);
requestParams.put("parameters", parameters.toJSONString());
requestParams.put("parameters", parameters);
Map<String, String> headers = new HashMap<>(); Map<String, String> headers = new HashMap<>();
headers.put("Authorization", getAccessToken()); headers.put("Authorization", getAccessToken());
Gson gson = new Gson();
String requestParamsJson = gson.toJson(requestParams);
Request request = new Request.Builder()
.url(baseUrl + UniversalEnum.EACH_TOLL_STATION_ENTRANCE_BY_TYPE_OF_HOURLY_TRAFFIC_FLOW.getValue())
.headers(Headers.of(headers))
.post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), requestParamsJson))
.build();
Response response = okHttpClient.newCall(request).execute();
Response response // 请求响应
= okHttp
.headers(headers)
.url(baseUrl + UniversalEnum.EACH_TOLL_STATION_ENTRANCE_BY_TYPE_OF_HOURLY_TRAFFIC_FLOW.getValue()) // 请求地址
.data(requestParams) // 请求参数
.post(); // 请求方法
return response; return response;
} }

53
zc-business/src/main/java/com/zc/business/utils/RetryInterceptorUtil.java

@ -0,0 +1,53 @@
package com.zc.business.utils;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.rmi.UnknownHostException;
/**
*重试拦截器
*/
public class RetryInterceptorUtil implements Interceptor {
private static final int MAX_RETRIES = 3; // 最大重试次数
private int retryCount = 0;
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
while (retryCount <= MAX_RETRIES) {
try {
Response response = chain.proceed(request);
if (!response.isSuccessful() && retryCount < MAX_RETRIES) {
retryCount++;
System.out.println("请求失败的代码 " + response.code() + ". 重试 (" + retryCount + "/" + MAX_RETRIES + ")...");
response.close();
continue;
} else {
return response;
}
} catch (IOException e) { // 捕获所有IO异常,包括连接被拒绝
if (++retryCount <= MAX_RETRIES) {
System.out.println("IOException。重试 (" + retryCount + "/" + MAX_RETRIES + ")...");
// 可能需要短暂停顿后再重试,避免立即重试导致问题持续
try {
Thread.sleep(1000); // 暂停1秒后重试
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("重试睡眠期间中断", ie);
}
} else {
throw e; // 超过最大重试次数,重新抛出异常
}
}
}
return chain.proceed(request); // 如果没有异常或重试完毕,正常返回
}
}
Loading…
Cancel
Save