Browse Source

kafka推送扫码报警,设备状态,气象数据

develop
王兴琳 2 weeks ago
parent
commit
e5318b5bfa
  1. 10
      zc-business/src/main/java/com/zc/business/controller/CodeScanningAlarmController.java
  2. 24
      zc-business/src/main/java/com/zc/business/enums/KafkEnum.java
  3. 25
      zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
  4. 6
      zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java

10
zc-business/src/main/java/com/zc/business/controller/CodeScanningAlarmController.java

@ -2,13 +2,17 @@ package com.zc.business.controller;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.ruoyi.common.core.controller.BaseController; import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.StringUtils; import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.zc.business.domain.DcEventProcess; import com.zc.business.domain.DcEventProcess;
import com.zc.business.domain.DcWarning; import com.zc.business.domain.DcWarning;
import com.zc.business.enums.KafkEnum;
import com.zc.business.enums.UniversalEnum; import com.zc.business.enums.UniversalEnum;
import com.zc.business.mapper.DcEventProcessMapper; import com.zc.business.mapper.DcEventProcessMapper;
import com.zc.business.message.device.subscribe.KafkaTopicProducer;
import com.zc.business.service.IDcWarningService; import com.zc.business.service.IDcWarningService;
import com.zc.common.core.websocket.WebSocketService; import com.zc.common.core.websocket.WebSocketService;
import com.zc.common.core.websocket.constant.WebSocketEvent; import com.zc.common.core.websocket.constant.WebSocketEvent;
@ -98,6 +102,12 @@ public class CodeScanningAlarmController extends BaseController {
dcWarning.setRemark(simpleDateFormat.format(dcWarning.getWarningTime()) + warningTitle); dcWarning.setRemark(simpleDateFormat.format(dcWarning.getWarningTime()) + warningTitle);
dcWarning.setWarningSource(UniversalEnum.FIVE.getNumber()); dcWarning.setWarningSource(UniversalEnum.FIVE.getNumber());
dcWarning.setWarningState(UniversalEnum.ONE.getNumber()); dcWarning.setWarningState(UniversalEnum.ONE.getNumber());
//todo 推送kafak扫码报警事件
Gson gson = new Gson();
String jsonString = gson.toJson(dcWarning);
KafkaTopicProducer kafkaTopicProducer = SpringUtils.getBean(KafkaTopicProducer.class);
kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.scanAlarm.value());
return AjaxResult.success(dcWarningService.insertDcWarning(dcWarning)); return AjaxResult.success(dcWarningService.insertDcWarning(dcWarning));
} else { } else {
dcWarning.setId(oldData.get("id").toString()); dcWarning.setId(oldData.get("id").toString());

24
zc-business/src/main/java/com/zc/business/enums/KafkEnum.java

@ -0,0 +1,24 @@
package com.zc.business.enums;
/**
*
*/
public enum KafkEnum {
//气象数据
weather("weatherData"),
// 扫码报警
scanAlarm("scanCodeData"),
//设备状态
deviceStatus("deviceState"),
// 视频AI
videoAI("eventAi");
private final String value;
KafkEnum(String value) {
this.value = value;
}
public String value() {
return this.value;
}
}

25
zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java

@ -1,14 +1,11 @@
package com.zc.business.message.device.handler; package com.zc.business.message.device.handler;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.NamedThreadFactory;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.utils.SecurityUtils;
import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.common.utils.spring.SpringUtils;
import com.zc.business.constant.RedisKeyConstants; import com.zc.business.constant.RedisKeyConstants;
import com.zc.business.domain.DcDevice; import com.zc.business.domain.DcDevice;
@ -20,7 +17,6 @@ import com.zc.business.message.device.subscribe.KafkaTopicProducer;
import com.zc.business.service.*; import com.zc.business.service.*;
import com.zc.common.core.websocket.WebSocketService; import com.zc.common.core.websocket.WebSocketService;
import com.zc.common.core.websocket.constant.WebSocketEvent; import com.zc.common.core.websocket.constant.WebSocketEvent;
import org.apache.catalina.security.SecurityUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -68,6 +64,7 @@ public class DeviceMessageHandler {
@Autowired @Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
KafkaTopicProducer kafkaTopicProducer = SpringUtils.getBean(KafkaTopicProducer.class);
/** /**
@ -96,7 +93,13 @@ public class DeviceMessageHandler {
}); });
WebSocketService.broadcast(WebSocketEvent.DEVICE_STATE, dcDevices); //推送设备状态更新 WebSocketService.broadcast(WebSocketEvent.DEVICE_STATE, dcDevices); //推送设备状态更新
// 批量更新中间库设备状态 //kafka消息推送
//state 设备状态 0 异常 1 正常
Gson gson = new Gson();
String jsonString = gson.toJson(dcDevices);
kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.deviceStatus.value());
// 批量更新中间库设备状态
middleDatabaseService.updateMiddleDatabaseDeviceByList(dcDevices); middleDatabaseService.updateMiddleDatabaseDeviceByList(dcDevices);
} }
@ -361,7 +364,6 @@ public class DeviceMessageHandler {
combinedData.put("address", otherConfig);*/ combinedData.put("address", otherConfig);*/
dcWarning.setOtherConfig(otherConfig.toString()); dcWarning.setOtherConfig(otherConfig.toString());
//异常天气等级 过滤 //异常天气等级 过滤
KafkaTopicProducer kafkaTopicProducer = SpringUtils.getBean(KafkaTopicProducer.class);
if (data.getInteger("warningType") == VISIBILITY_LEVEL) { if (data.getInteger("warningType") == VISIBILITY_LEVEL) {
int WarningLevel = data.getInteger("visibilityLevel"); int WarningLevel = data.getInteger("visibilityLevel");
if (WarningLevel != UniversalEnum.ZERO.getNumber()) { if (WarningLevel != UniversalEnum.ZERO.getNumber()) {
@ -370,7 +372,7 @@ public class DeviceMessageHandler {
// 使用Gson将对象转换为JSON字符串 // 使用Gson将对象转换为JSON字符串
Gson gson = new Gson(); Gson gson = new Gson();
String jsonString = gson.toJson(dcWarning); String jsonString = gson.toJson(dcWarning);
kafkaTopicProducer.KafkaTopicProducer(jsonString); kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.videoAI.value());
} }
} else { } else {
dcWarningService.insertDcWarning(dcWarning); dcWarningService.insertDcWarning(dcWarning);
@ -378,7 +380,7 @@ public class DeviceMessageHandler {
// 使用Gson将对象转换为JSON字符串 // 使用Gson将对象转换为JSON字符串
Gson gson = new Gson(); Gson gson = new Gson();
String jsonString = gson.toJson(dcWarning); String jsonString = gson.toJson(dcWarning);
kafkaTopicProducer.KafkaTopicProducer(jsonString); kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.videoAI.value());
} }
@ -716,6 +718,13 @@ public class DeviceMessageHandler {
meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString()); meteorologicalDetectorData.setIotDeviceId(msg.get("deviceId").toString());
meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData);
//todo 推送kafak事件消息 气象检测器
//kafka消息推送
// 使用Gson将对象转换为JSON字符串
Gson gson = new Gson();
String jsonString = gson.toJson(meteorologicalDetectorData);
kafkaTopicProducer.KafkaTopicProducer(jsonString, KafkEnum.weather.value());
//设计院中间库 插入设备数据 //设计院中间库 插入设备数据
MdDeviceData mdDeviceData = new MdDeviceData(); MdDeviceData mdDeviceData = new MdDeviceData();
mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId()); mdDeviceData.setDevNo(meteorologicalDetectorData.getIotDeviceId());

6
zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java

@ -23,9 +23,9 @@ public class KafkaTopicProducer {
private String keySerializer; private String keySerializer;
@Value("${kafka.producer.value-serializer}") @Value("${kafka.producer.value-serializer}")
private String valueSerializer; private String valueSerializer;
@Value("${kafka.producer.topic}") /* @Value("${kafka.producer.topic}")
private String topic; private String topic;*/
public void KafkaTopicProducer(String event) { public void KafkaTopicProducer(String event,String topic) {
// 配置生产者的属性 // 配置生产者的属性
Properties props = new Properties(); Properties props = new Properties();
props.put("bootstrap.servers", servers); // Kafka broker地址 props.put("bootstrap.servers", servers); // Kafka broker地址

Loading…
Cancel
Save