Browse Source

边坡监测事件接入

develop
lau572 1 month ago
parent
commit
6acef2ad99
  1. 304
      zc-business/src/main/java/com/zc/business/controller/SideSlopeController.java

304
zc-business/src/main/java/com/zc/business/controller/SideSlopeController.java

@ -2,203 +2,161 @@ package com.zc.business.controller;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.core.controller.BaseController; import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.utils.uuid.IdUtils;
import com.ruoyi.system.service.ISysConfigService; import com.ruoyi.system.service.ISysConfigService;
import com.zc.business.domain.DcWarning;
import com.zc.business.enums.UniversalEnum; import com.zc.business.enums.UniversalEnum;
import com.zc.common.core.httpclient.OkHttp; import com.zc.business.service.IDcWarningService;
import com.zc.common.core.httpclient.exception.HttpException;
import com.zc.common.core.httpclient.request.RequestParams;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import org.eclipse.paho.client.mqttv3.*;
import io.swagger.annotations.ApiParam; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.BufferedReader; import java.text.SimpleDateFormat;
import java.io.ByteArrayInputStream; import java.util.Date;
import java.io.IOException; import java.util.concurrent.Executors;
import java.io.InputStreamReader; import java.util.concurrent.ScheduledExecutorService;
import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit;
import java.util.Base64;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.zip.GZIPInputStream;
/** /**
* @Description 边坡报警Controller
* mqtt订阅 路基和边坡的报警数据
* *
* @author liuwenge
* @date 2024/10/23 09:38
*/ */
@Api(tags = "边坡数据接口") @Api(tags = "边坡报警")
@RestController @RestController
@Component @Component
@RequestMapping("/sideSlope") @RequestMapping("/sideSlope")
public class SideSlopeController extends BaseController { public class SideSlopeController extends BaseController {
@Resource @Resource
private RedisCache redisCache; private IDcWarningService dcWarningService;
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
@Autowired @Autowired
private ISysConfigService configService; private ISysConfigService configService;
@ApiOperation("获取token") private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@PostMapping("/authenticate")
public JSONObject authenticate() throws IOException {
OkHttpClient client = new OkHttpClient();
@PostConstruct
// 构造请求体 private AjaxResult mqttSubscription() throws Exception {
Map<String, String> map = new LinkedHashMap<>();
map.put("UserNameOrEmailAddress", configService.selectConfigByKey("UserNameOrEmailAddress"));//用户名 JSONObject sideSlopeConfig = JSONObject.parseObject(configService.selectConfigByKey("sideSlope"));
map.put("Password", configService.selectConfigByKey("Password"));//登录密码
ObjectMapper objectMapper = new ObjectMapper(); String clientId = "jhgs_"+ IdUtils.fastUUID();
String jsonInput = objectMapper.writeValueAsString(map); String serverURI = sideSlopeConfig.getString("serverURI");
RequestBody requestBody = RequestBody.create(JSON, jsonInput); String userName = sideSlopeConfig.getString("userName");
String password = sideSlopeConfig.getString("password");
// 创建请求 String topicName = sideSlopeConfig.getString("topic");
Request request = new Request.Builder() try {
.url(configService.selectConfigByKey("accessTokenApi")) // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
.post(requestBody) MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());
.build(); // MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 发送请求并处理响应 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
try (Response response = client.newCall(request).execute()) { options.setCleanSession(true);
if (!response.isSuccessful()) { // 设置连接的用户名
throw new IOException("Unexpected code " + response); options.setUserName(userName);
} // 设置连接的密码
// 正确解析响应体中的JSON数据 options.setPassword(password.toCharArray());
String responseBody = response.body().string(); // 设置超时时间 单位为秒
JSONObject jsonResult = JSONObject.parseObject(responseBody); options.setConnectionTimeout(UniversalEnum.TEN.getNumber());
return jsonResult; // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
} options.setKeepAliveInterval(UniversalEnum.TWENTY.getNumber());
} // 设置回调函数
client.setCallback(new MqttCallback() {
/**
* 获取测点列表 public void connectionLost(Throwable cause) {
* @return logger.info("边坡监测mqtt断开连接..........");
* @throws IOException // 延迟重连
* @throws HttpException scheduler.schedule(() -> {
*/ try {
@ApiOperation("获取边坡测点列表") mqttSubscription();
} catch (Exception e) {
@GetMapping("/GetMeasurePointList") throw new RuntimeException(e);
public AjaxResult GetMeasurePointList() throws IOException, HttpException { }
JSONObject jsonResult = null; }, UniversalEnum.TEN.getNumber(), TimeUnit.SECONDS);
OkHttp okHttp = new OkHttp(); }
RequestParams requestParams = new RequestParams(); public void messageArrived(String topic, MqttMessage message) {
requestParams.put("proCode", configService.selectConfigByKey("proCode")); logger.info("======边坡监测--监听到来自[" + topic + "]的消息======");
requestParams.put("unitCode", configService.selectConfigByKey("unitCode")); String jsonObjString = new String(message.getPayload());
JSONArray jsonArray = JSONObject.parseArray(jsonObjString);
Object accessToken = redisCache.getCacheObject("accessToken");
if (accessToken==null){ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
JSONObject authenticate = authenticate(); for (Object o : jsonArray) {
accessToken = authenticate.getJSONObject("result").getString("accessToken"); JSONObject jsonObject = (JSONObject) o;
redisCache.setCacheObject("accessToken",accessToken); DcWarning dcWarning = new DcWarning();
redisCache.expire("accessToken", UniversalEnum.THREE.getNumber() * UniversalEnum.TWENTY_FOUR.getNumber() * UniversalEnum.THREE_THOUSAND_SIX_HUNDRED.getNumber());//设置过期时间s秒 dcWarning.setStakeMark(jsonObject.getString("stakeMark"));
} dcWarning.setDirection(jsonObject.getString("direction"));
Date warningTime = new Date(jsonObject.getLong("warningTime"));
// http://jsgl.sdgsbim.com:8616/api/RoadMajorPlatform/GetMeasurePointList?proCode=JHGKJ&unitCode=60-01.0002.00.00 dcWarning.setWarningTime(warningTime);
Map<String, String> header = new HashMap<>(); dcWarning.setOtherConfig(jsonObject.toJSONString());
header.put("Authorization", "Bearer "+accessToken.toString()); dcWarning.setWarningType(99);
Response response // 请求响应 dcWarning.setWarningSubclass("99-1");
= okHttp dcWarning.setWarningSource(8);
.headers(header) dcWarning.setWarningState(1);
.url(configService.selectConfigByKey("GetMeasurePointListAPI")) // 请求地址 String timeStr = simpleDateFormat.format(warningTime);
.data(requestParams) // 请求参数 String direction = jsonObject.getString("direction").equals("1") ? "菏泽方向" : "济南方向";
.get(); // 请求方法 String warningLevel = "";
if (jsonObject.getString("direction").equals("1")){
if (response.body() != null) { warningLevel = "黄色预警";
jsonResult = JSONObject.parseObject(response.body().string()); } else if (jsonObject.getString("direction").equals("2")){
} warningLevel = "橙色预警";
// 正确解析响应体中的JSON数据 } else if (jsonObject.getString("direction").equals("3")){
return AjaxResult.success(jsonResult) ; warningLevel = "红色预警";
} }
JSONObject warningValueInfo = jsonObject.getJSONObject("warningValueInfo");
/** String warningInfo = "";
* 获取长历史数据 for (String key : warningValueInfo.keySet()) {
* @return warningInfo += key + ":" + warningValueInfo.getString(key) + jsonObject.getString("unit") + ",";
* @throws IOException }
* @throws HttpException if (warningInfo.length() > 0){
*/ warningInfo = warningInfo.substring(0,warningInfo.length() -1);
@ApiOperation("获取边坡历史数据") }
@GetMapping("/GetPointDataListAsync") //2024年10月23日 14:45:00 K066+666 菏泽方向 边坡监测GNSS位移计(JM)上报了一起黄色预警事件(水平位移:20.944475mm)
public AjaxResult GetPointDataListAsync( String remark = timeStr + " " + jsonObject.getString("stakeMark") + " " + direction
@ApiParam(value = "测点编号", name = "meaPointNum", required = true) String meaPointNum, + "边坡监测" + jsonObject.getString("seneorLabel") + "上报了一起" + warningLevel +"事件("
@ApiParam(value = "开始时间(时间戳)", name = "starttime", required = true) long starttime, + warningInfo + ")";
@ApiParam(value = "结束时间(时间戳)", name = "endtime", required = true) long endtime dcWarning.setRemark(remark);
dcWarningService.insertDcWarning(dcWarning);
) throws IOException, HttpException {
}
JSONObject jsonResult = null;
JSONArray jsonArray = null; }
OkHttp okHttp = new OkHttp();
public void deliveryComplete(IMqttDeliveryToken token) {
RequestParams requestParams = new RequestParams(); logger.info("deliveryComplete---------" + token.isComplete());
requestParams.put("projCode",configService.selectConfigByKey("proCode"));//项目编号 如 JHGKJ }
requestParams.put("unitCode", configService.selectConfigByKey("unitCode"));//项目单位工程编号 如 60-01.0002.00.00
requestParams.put("meaPointNum", meaPointNum);//测点编号如 PR-YLJ01-067441-05/05 });
requestParams.put("starttime", starttime);//开始时间如 1713369599000
requestParams.put("endtime", endtime);//结束时间 如 1713887999000 // 建立连接
try {
Object accessToken = redisCache.getCacheObject("accessToken"); client.connect(options);
if (accessToken==null){
System.out.println("null+++++++++++++++++++++++"); logger.info("边坡监测mqtt连接成功.");
JSONObject authenticate = authenticate();
accessToken = authenticate.getJSONObject("result").getString("accessToken"); client.subscribe(topicName, UniversalEnum.ONE.getNumber());
redisCache.setCacheObject("accessToken",accessToken); logger.info("边坡监测mqtt开始监听" + topicName);
redisCache.expire("accessToken", UniversalEnum.THREE.getNumber() * UniversalEnum.TWENTY_FOUR.getNumber() * UniversalEnum.THREE_THOUSAND_SIX_HUNDRED.getNumber());//设置过期时间s秒
} } catch (MqttException e) {
logger.warn("边坡监测mqtt连接异常");
// http://jsgl.sdgsbim.com:8616/api/RoadMajorPlatform/GetMeasurePointList?proCode=JHGKJ&unitCode=60-01.0002.00.00 mqttSubscription();
Map<String, String> header = new HashMap<>();
header.put("Authorization", "Bearer "+accessToken.toString());
Response response // 请求响应
= okHttp
.headers(header)
.url(configService.selectConfigByKey("GetPointDataListAsyncAPI")) // 请求地址
.data(requestParams) // 请求参数
.get(); // 请求方法
if (response.body() != null) {
jsonResult = JSONObject.parseObject(response.body().string());
String jsonObjec =jsonResult.getString("result").replace("\r\n", UniversalEnum.EMPTY_STRING.getValue());
jsonArray = extracted(jsonObjec);
}
// 正确解析响应体中的JSON数据
return AjaxResult.success(jsonArray);
}
/**
* Base64解码
*/
private JSONArray extracted(String base64EncodedCompressedData) throws IOException {
// Base64解码
byte[] compressedData = Base64.getDecoder().decode(base64EncodedCompressedData);
StringBuilder output = new StringBuilder();
// 解压缩
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
InputStreamReader inputStreamReader = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
output.append(line);
} }
} catch (Exception e) {
e.printStackTrace();
} }
// 将StringBuilder转换为字符串 return AjaxResult.success();
String jsonString = output.toString();
// 将字符串转换为JSONObject
JSONArray jsonObject = JSONArray.parseArray(jsonString);
return jsonObject;
} }
} }

Loading…
Cancel
Save