Browse Source

redis消息订阅气象监测器数据

develop
lau572 1 year ago
parent
commit
7d3e6b7484
  1. 31
      zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java

31
zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java

@ -1,5 +1,10 @@
package com.zc.business.message.device.listener; package com.zc.business.message.device.listener;
import com.alibaba.fastjson.JSON;
import com.google.gson.JsonObject;
import com.zc.business.domain.DcDevice;
import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.service.IDcMeteorologicalDetectorDataService;
import com.zc.common.core.redis.stream.RedisStream; import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -7,8 +12,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* 设备属性上报消息监听 * 设备属性上报消息监听
*/ */
@ -19,12 +30,32 @@ public class DevicePropertyReportListener implements StreamListener<String, Obje
@Autowired @Autowired
private RedisStream redisStream; private RedisStream redisStream;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Override @Override
public void onMessage(ObjectRecord<String, String> message) { public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream(); String streamKay = message.getStream();
RecordId recordId = message.getId(); RecordId recordId = message.getId();
threadPoolTaskExecutor.execute(() -> {
Map data = JSON.parseObject(message.getValue(), HashMap.class);
Map<String,Object> headers = (Map<String, Object>) data.get("headers");
if (headers.get("productId") != null){
String productId = headers.get("productId").toString();
//气象检测器
if ("zc-meteorological".equals(productId)){
DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) data.get("properties");
meteorologicalDetectorData.setIotDeviceId(data.get("deviceId").toString());
meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData);
} else if ("zc-yzsqkdc-3131".equals(productId)){
//交调
}
}
});
// 消费完后直接删除消息 // 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId)); redisStream.del(streamKay, String.valueOf(recordId));
} }

Loading…
Cancel
Save