From 8b065298957be28a42b5a2cd44fca086b746ebd5 Mon Sep 17 00:00:00 2001 From: xiepufeng <1072271977@qq.com> Date: Wed, 31 Jan 2024 15:02:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E8=AE=BE=E5=A4=87=E7=8A=B6?= =?UTF-8?q?=E6=80=81=EF=BC=88=E6=95=B0=E6=8D=AE=E6=9D=A5=E6=BA=90=E4=BA=8E?= =?UTF-8?q?=E7=89=A9=E8=81=94=E7=BD=91=E5=B9=B3=E5=8F=B0=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zc/business/domain/DcDevice.java | 6 ++++ .../device/OfflineMessageListener.java | 18 +++++++++- .../message/device/OnlineMessageListener.java | 36 +++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/zc-business/src/main/java/com/zc/business/domain/DcDevice.java b/zc-business/src/main/java/com/zc/business/domain/DcDevice.java index 3843bfed..54e75d83 100644 --- a/zc-business/src/main/java/com/zc/business/domain/DcDevice.java +++ b/zc-business/src/main/java/com/zc/business/domain/DcDevice.java @@ -13,6 +13,12 @@ public class DcDevice { public static final Integer UNUSEDSTATE = 0; public static final Integer USEOFSTATE = 1; + + // 离线状态 + public static final String OFFLINE = "0"; + // 在线状态 + public static final String ONLINE = "1"; + @ApiModelProperty("ID") private Long id; @ApiModelProperty("物联设备ID") diff --git a/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java index ebd7bded..4104bd6f 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java @@ -1,6 +1,8 @@ package com.zc.business.message.device; import com.alibaba.fastjson.JSON; +import com.zc.business.domain.DcDevice; +import com.zc.business.service.IDcDeviceService; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,6 +14,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; +import java.util.stream.Collectors; /** * 离线消息监听 @@ -24,6 +27,9 @@ public class OfflineMessageListener implements StreamListener msg) { - System.out.println(msg); + + List dcDevices = msg.stream().map(iotDeviceId -> { + DcDevice dcDevice = new DcDevice(); + dcDevice.setDeviceState(DcDevice.OFFLINE); + dcDevice.setIotDeviceId(iotDeviceId); + return dcDevice; + }).collect(Collectors.toList()); + + // 批量更新设备状态 + dcDeviceService.batchUpdate(dcDevices); } } diff --git a/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java index 73e897e6..e2a90c2d 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java @@ -1,5 +1,8 @@ package com.zc.business.message.device; +import com.alibaba.fastjson.JSON; +import com.zc.business.domain.DcDevice; +import com.zc.business.service.IDcDeviceService; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,8 +10,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; + /** * 在线消息监听 */ @@ -20,12 +28,40 @@ public class OnlineMessageListener implements StreamListener message) { String streamKay = message.getStream(); RecordId recordId = message.getId(); + threadPoolTaskExecutor.execute(() -> { + List list = JSON.parseArray(message.getValue(), String.class); + this.handle(list); + }); + // 消费完后直接删除消息 redisStream.del(streamKay, String.valueOf(recordId)); } + + + /** + * 处理在线消息 + * @param msg 在线消息 + */ + private void handle(List msg) { + List dcDevices = msg.stream().map(iotDeviceId -> { + DcDevice dcDevice = new DcDevice(); + dcDevice.setDeviceState(DcDevice.ONLINE); + dcDevice.setIotDeviceId(iotDeviceId); + return dcDevice; + }).collect(Collectors.toList()); + + // 批量更新设备状态 + dcDeviceService.batchUpdate(dcDevices); + } }