diff --git a/zc-business/src/main/java/com/zc/business/controller/SideSlopeController.java b/zc-business/src/main/java/com/zc/business/controller/SideSlopeController.java index b9e6cdca..08cd4995 100644 --- a/zc-business/src/main/java/com/zc/business/controller/SideSlopeController.java +++ b/zc-business/src/main/java/com/zc/business/controller/SideSlopeController.java @@ -2,203 +2,161 @@ package com.zc.business.controller; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.common.core.controller.BaseController; import com.ruoyi.common.core.domain.AjaxResult; -import com.ruoyi.common.core.redis.RedisCache; +import com.ruoyi.common.utils.uuid.IdUtils; import com.ruoyi.system.service.ISysConfigService; +import com.zc.business.domain.DcWarning; import com.zc.business.enums.UniversalEnum; -import com.zc.common.core.httpclient.OkHttp; -import com.zc.common.core.httpclient.exception.HttpException; -import com.zc.common.core.httpclient.request.RequestParams; +import com.zc.business.service.IDcWarningService; import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import okhttp3.*; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.zip.GZIPInputStream; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** + * @Description 边坡报警Controller + * mqtt订阅 路基和边坡的报警数据 * + * @author liuwenge + * @date 2024/10/23 09:38 */ -@Api(tags = "边坡数据接口") +@Api(tags = "边坡报警") @RestController @Component @RequestMapping("/sideSlope") public class SideSlopeController extends BaseController { + @Resource - private RedisCache redisCache; - public static final MediaType JSON = MediaType.get("application/json; charset=utf-8"); + private IDcWarningService dcWarningService; @Autowired private ISysConfigService configService; - @ApiOperation("获取token") - @PostMapping("/authenticate") - public JSONObject authenticate() throws IOException { - OkHttpClient client = new OkHttpClient(); - - // 构造请求体 - Map map = new LinkedHashMap<>(); - map.put("UserNameOrEmailAddress", configService.selectConfigByKey("UserNameOrEmailAddress"));//用户名 - map.put("Password", configService.selectConfigByKey("Password"));//登录密码 - ObjectMapper objectMapper = new ObjectMapper(); - String jsonInput = objectMapper.writeValueAsString(map); - RequestBody requestBody = RequestBody.create(JSON, jsonInput); - - // 创建请求 - Request request = new Request.Builder() - .url(configService.selectConfigByKey("accessTokenApi")) - .post(requestBody) - .build(); - - // 发送请求并处理响应 - try (Response response = client.newCall(request).execute()) { - if (!response.isSuccessful()) { - throw new IOException("Unexpected code " + response); - } - // 正确解析响应体中的JSON数据 - String responseBody = response.body().string(); - JSONObject jsonResult = JSONObject.parseObject(responseBody); - return jsonResult; - } - } - - /** - * 获取测点列表 - * @return - * @throws IOException - * @throws HttpException - */ - @ApiOperation("获取边坡测点列表") - - @GetMapping("/GetMeasurePointList") - public AjaxResult GetMeasurePointList() throws IOException, HttpException { - JSONObject jsonResult = null; - OkHttp okHttp = new OkHttp(); - - RequestParams requestParams = new RequestParams(); - requestParams.put("proCode", configService.selectConfigByKey("proCode")); - requestParams.put("unitCode", configService.selectConfigByKey("unitCode")); - - Object accessToken = redisCache.getCacheObject("accessToken"); - if (accessToken==null){ - JSONObject authenticate = authenticate(); - accessToken = authenticate.getJSONObject("result").getString("accessToken"); - redisCache.setCacheObject("accessToken",accessToken); - redisCache.expire("accessToken", UniversalEnum.THREE.getNumber() * UniversalEnum.TWENTY_FOUR.getNumber() * UniversalEnum.THREE_THOUSAND_SIX_HUNDRED.getNumber());//设置过期时间s秒 - } - - // http://jsgl.sdgsbim.com:8616/api/RoadMajorPlatform/GetMeasurePointList?proCode=JHGKJ&unitCode=60-01.0002.00.00 - Map header = new HashMap<>(); - header.put("Authorization", "Bearer "+accessToken.toString()); - Response response // 请求响应 - = okHttp - .headers(header) - .url(configService.selectConfigByKey("GetMeasurePointListAPI")) // 请求地址 - .data(requestParams) // 请求参数 - .get(); // 请求方法 - - if (response.body() != null) { - jsonResult = JSONObject.parseObject(response.body().string()); - } - // 正确解析响应体中的JSON数据 - return AjaxResult.success(jsonResult) ; - } - - /** - * 获取长历史数据 - * @return - * @throws IOException - * @throws HttpException - */ - @ApiOperation("获取边坡历史数据") - @GetMapping("/GetPointDataListAsync") - public AjaxResult GetPointDataListAsync( - @ApiParam(value = "测点编号", name = "meaPointNum", required = true) String meaPointNum, - @ApiParam(value = "开始时间(时间戳)", name = "starttime", required = true) long starttime, - @ApiParam(value = "结束时间(时间戳)", name = "endtime", required = true) long endtime - - ) throws IOException, HttpException { - - JSONObject jsonResult = null; - JSONArray jsonArray = null; - OkHttp okHttp = new OkHttp(); - - RequestParams requestParams = new RequestParams(); - requestParams.put("projCode",configService.selectConfigByKey("proCode"));//项目编号 如 JHGKJ - requestParams.put("unitCode", configService.selectConfigByKey("unitCode"));//项目单位工程编号 如 60-01.0002.00.00 - requestParams.put("meaPointNum", meaPointNum);//测点编号如 PR-YLJ01-067441-05/05 - requestParams.put("starttime", starttime);//开始时间如 1713369599000 - requestParams.put("endtime", endtime);//结束时间 如 1713887999000 - - Object accessToken = redisCache.getCacheObject("accessToken"); - if (accessToken==null){ - System.out.println("null+++++++++++++++++++++++"); - JSONObject authenticate = authenticate(); - accessToken = authenticate.getJSONObject("result").getString("accessToken"); - redisCache.setCacheObject("accessToken",accessToken); - redisCache.expire("accessToken", UniversalEnum.THREE.getNumber() * UniversalEnum.TWENTY_FOUR.getNumber() * UniversalEnum.THREE_THOUSAND_SIX_HUNDRED.getNumber());//设置过期时间s秒 - } - - // http://jsgl.sdgsbim.com:8616/api/RoadMajorPlatform/GetMeasurePointList?proCode=JHGKJ&unitCode=60-01.0002.00.00 - Map header = new HashMap<>(); - header.put("Authorization", "Bearer "+accessToken.toString()); - Response response // 请求响应 - = okHttp - .headers(header) - .url(configService.selectConfigByKey("GetPointDataListAsyncAPI")) // 请求地址 - .data(requestParams) // 请求参数 - .get(); // 请求方法 - - if (response.body() != null) { - jsonResult = JSONObject.parseObject(response.body().string()); - String jsonObjec =jsonResult.getString("result").replace("\r\n", UniversalEnum.EMPTY_STRING.getValue()); - jsonArray = extracted(jsonObjec); - } - // 正确解析响应体中的JSON数据 - return AjaxResult.success(jsonArray); - } - /** - * Base64解码 - */ - private JSONArray extracted(String base64EncodedCompressedData) throws IOException { - // Base64解码 - byte[] compressedData = Base64.getDecoder().decode(base64EncodedCompressedData); - StringBuilder output = new StringBuilder(); - // 解压缩 - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData); - GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); - InputStreamReader inputStreamReader = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8); - BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { - - String line; - while ((line = bufferedReader.readLine()) != null) { - output.append(line); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + + + @PostConstruct + private AjaxResult mqttSubscription() throws Exception { + + JSONObject sideSlopeConfig = JSONObject.parseObject(configService.selectConfigByKey("sideSlope")); + + String clientId = "jhgs_"+ IdUtils.fastUUID(); + String serverURI = sideSlopeConfig.getString("serverURI"); + String userName = sideSlopeConfig.getString("userName"); + String password = sideSlopeConfig.getString("password"); + String topicName = sideSlopeConfig.getString("topic"); + try { + // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 + MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence()); + // MQTT的连接设置 + MqttConnectOptions options = new MqttConnectOptions(); + // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 + options.setCleanSession(true); + // 设置连接的用户名 + options.setUserName(userName); + // 设置连接的密码 + options.setPassword(password.toCharArray()); + // 设置超时时间 单位为秒 + options.setConnectionTimeout(UniversalEnum.TEN.getNumber()); + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 + options.setKeepAliveInterval(UniversalEnum.TWENTY.getNumber()); + // 设置回调函数 + client.setCallback(new MqttCallback() { + + public void connectionLost(Throwable cause) { + logger.info("边坡监测mqtt断开连接.........."); + // 延迟重连 + scheduler.schedule(() -> { + try { + mqttSubscription(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, UniversalEnum.TEN.getNumber(), TimeUnit.SECONDS); + } + + public void messageArrived(String topic, MqttMessage message) { + logger.info("======边坡监测--监听到来自[" + topic + "]的消息======"); + String jsonObjString = new String(message.getPayload()); + JSONArray jsonArray = JSONObject.parseArray(jsonObjString); + + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss"); + for (Object o : jsonArray) { + JSONObject jsonObject = (JSONObject) o; + DcWarning dcWarning = new DcWarning(); + dcWarning.setStakeMark(jsonObject.getString("stakeMark")); + dcWarning.setDirection(jsonObject.getString("direction")); + Date warningTime = new Date(jsonObject.getLong("warningTime")); + dcWarning.setWarningTime(warningTime); + dcWarning.setOtherConfig(jsonObject.toJSONString()); + dcWarning.setWarningType(99); + dcWarning.setWarningSubclass("99-1"); + dcWarning.setWarningSource(8); + dcWarning.setWarningState(1); + String timeStr = simpleDateFormat.format(warningTime); + String direction = jsonObject.getString("direction").equals("1") ? "菏泽方向" : "济南方向"; + String warningLevel = ""; + if (jsonObject.getString("direction").equals("1")){ + warningLevel = "黄色预警"; + } else if (jsonObject.getString("direction").equals("2")){ + warningLevel = "橙色预警"; + } else if (jsonObject.getString("direction").equals("3")){ + warningLevel = "红色预警"; + } + JSONObject warningValueInfo = jsonObject.getJSONObject("warningValueInfo"); + String warningInfo = ""; + for (String key : warningValueInfo.keySet()) { + warningInfo += key + ":" + warningValueInfo.getString(key) + jsonObject.getString("unit") + ","; + } + if (warningInfo.length() > 0){ + warningInfo = warningInfo.substring(0,warningInfo.length() -1); + } + //2024年10月23日 14:45:00 K066+666 菏泽方向 边坡监测GNSS位移计(JM)上报了一起黄色预警事件(水平位移:20.944475mm) + String remark = timeStr + " " + jsonObject.getString("stakeMark") + " " + direction + + "边坡监测" + jsonObject.getString("seneorLabel") + "上报了一起" + warningLevel +"事件(" + + warningInfo + ")"; + dcWarning.setRemark(remark); + dcWarningService.insertDcWarning(dcWarning); + + } + + } + + public void deliveryComplete(IMqttDeliveryToken token) { + logger.info("deliveryComplete---------" + token.isComplete()); + } + + }); + + // 建立连接 + try { + client.connect(options); + + logger.info("边坡监测mqtt连接成功."); + + client.subscribe(topicName, UniversalEnum.ONE.getNumber()); + logger.info("边坡监测mqtt开始监听" + topicName); + + } catch (MqttException e) { + logger.warn("边坡监测mqtt连接异常"); + mqttSubscription(); } + } catch (Exception e) { + e.printStackTrace(); } - // 将StringBuilder转换为字符串 - String jsonString = output.toString(); -// 将字符串转换为JSONObject - JSONArray jsonObject = JSONArray.parseArray(jsonString); - return jsonObject; + return AjaxResult.success(); } } - - -