From 7d3e6b7484651cda7f3d44dc844018494544a630 Mon Sep 17 00:00:00 2001 From: lau572 <1010031226@qq.com> Date: Sun, 4 Feb 2024 11:22:11 +0800 Subject: [PATCH] =?UTF-8?q?redis=E6=B6=88=E6=81=AF=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E6=B0=94=E8=B1=A1=E7=9B=91=E6=B5=8B=E5=99=A8=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DevicePropertyReportListener.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java index 9f1de3c8..d7370696 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/DevicePropertyReportListener.java @@ -1,5 +1,10 @@ package com.zc.business.message.device.listener; +import com.alibaba.fastjson.JSON; +import com.google.gson.JsonObject; +import com.zc.business.domain.DcDevice; +import com.zc.business.domain.DcMeteorologicalDetectorData; +import com.zc.business.service.IDcMeteorologicalDetectorDataService; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,8 +12,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * 设备属性上报消息监听 */ @@ -19,12 +30,32 @@ public class DevicePropertyReportListener implements StreamListener message) { String streamKay = message.getStream(); RecordId recordId = message.getId(); + threadPoolTaskExecutor.execute(() -> { + Map data = JSON.parseObject(message.getValue(), HashMap.class); + Map headers = (Map) data.get("headers"); + if (headers.get("productId") != null){ + String productId = headers.get("productId").toString(); + //气象检测器 + if ("zc-meteorological".equals(productId)){ + DcMeteorologicalDetectorData meteorologicalDetectorData = (DcMeteorologicalDetectorData) data.get("properties"); + meteorologicalDetectorData.setIotDeviceId(data.get("deviceId").toString()); + meteorologicalDetectorDataService.insertDcMeteorologicalDetectorData(meteorologicalDetectorData); + } else if ("zc-yzsqkdc-3131".equals(productId)){ + //交调 + + } + } + }); // 消费完后直接删除消息 redisStream.del(streamKay, String.valueOf(recordId)); }