Browse Source

添加产品类型枚举

develop
xiepufeng 10 months ago
parent
commit
9e90d400f0
  1. 50
      zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
  2. 15
      zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java

50
zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java

@ -1,5 +1,7 @@
package com.zc.business.message.device.handler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zc.business.domain.DcDevice;
import com.zc.business.service.IDcDeviceService;
import org.springframework.stereotype.Service;
@ -14,11 +16,42 @@ import java.util.stream.Collectors;
@Service
public class DeviceMessageHandler {
enum ProductType {
// 摄像头检测事件
CAMERA_DETECTION_EVENT("video-event"),
// 情报板
VARIABLE_MESSAGE_SIGN("7877"),
// 行车诱导
TRAFFIC_GUIDANCE("8866"),
// 气象检测器
WEATHER_DETECTOR("zc-meteorological"),
// 设备箱
ZC_SHE_BEI_XIANG("zc-shebeixiang-1883"),
// 一站式情况调查产品
STANDARD_JTT("zc-yzsqkdc-3131")
;
private final String value;
ProductType(String value) {
this.value = value;
}
public String value() {
return this.value;
}
}
@Resource
private IDcDeviceService dcDeviceService;
/**
* 更新设备状态
*
* @param msg 物联设备id集合
* @param state 设备状态 0 异常 1 正常
*/
@ -34,4 +67,21 @@ public class DeviceMessageHandler {
// 批量更新设备状态
dcDeviceService.batchUpdate(dcDevices);
}
/**
* 消息处理
* 1-事件消息
* 2-属性读取回复
*/
public void handle(String msg) {
JSONObject data = JSON.parseObject(msg, JSONObject.class);
// 产品id
String productId = data.getJSONObject("headers").getString("productId");
// 摄像头检测事件
if (productId.equals(ProductType.CAMERA_DETECTION_EVENT.value)) {
return;
}
}
}

15
zc-business/src/main/java/com/zc/business/message/device/listener/DeviceEventListener.java

@ -2,6 +2,7 @@ package com.zc.business.message.device.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zc.business.message.device.handler.DeviceMessageHandler;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,21 +29,15 @@ public class DeviceEventListener implements StreamListener<String, ObjectRecord<
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private DeviceMessageHandler deviceMessageHandler;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
threadPoolTaskExecutor.execute(() -> {
JSONObject data = JSON.parseObject(message.getValue(), JSONObject.class);
this.handle(data);
});
threadPoolTaskExecutor.execute(() -> deviceMessageHandler.handle(message.getValue()));
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
private void handle(JSONObject data) {
System.out.println(data);
}
}

Loading…
Cancel
Save