diff --git a/zc-business/src/main/java/com/zc/business/domain/DcDevice.java b/zc-business/src/main/java/com/zc/business/domain/DcDevice.java index 3843bfed..54e75d83 100644 --- a/zc-business/src/main/java/com/zc/business/domain/DcDevice.java +++ b/zc-business/src/main/java/com/zc/business/domain/DcDevice.java @@ -13,6 +13,12 @@ public class DcDevice { public static final Integer UNUSEDSTATE = 0; public static final Integer USEOFSTATE = 1; + + // 离线状态 + public static final String OFFLINE = "0"; + // 在线状态 + public static final String ONLINE = "1"; + @ApiModelProperty("ID") private Long id; @ApiModelProperty("物联设备ID") diff --git a/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java index ebd7bded..4104bd6f 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java @@ -1,6 +1,8 @@ package com.zc.business.message.device; import com.alibaba.fastjson.JSON; +import com.zc.business.domain.DcDevice; +import com.zc.business.service.IDcDeviceService; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,6 +14,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; +import java.util.stream.Collectors; /** * 离线消息监听 @@ -24,6 +27,9 @@ public class OfflineMessageListener implements StreamListener msg) { - System.out.println(msg); + + List dcDevices = msg.stream().map(iotDeviceId -> { + DcDevice dcDevice = new DcDevice(); + dcDevice.setDeviceState(DcDevice.OFFLINE); + dcDevice.setIotDeviceId(iotDeviceId); + return dcDevice; + }).collect(Collectors.toList()); + + // 批量更新设备状态 + dcDeviceService.batchUpdate(dcDevices); } } diff --git a/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java index 73e897e6..e2a90c2d 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java +++ b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java @@ -1,5 +1,8 @@ package com.zc.business.message.device; +import com.alibaba.fastjson.JSON; +import com.zc.business.domain.DcDevice; +import com.zc.business.service.IDcDeviceService; import com.zc.common.core.redis.stream.RedisStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,8 +10,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; + /** * 在线消息监听 */ @@ -20,12 +28,40 @@ public class OnlineMessageListener implements StreamListener message) { String streamKay = message.getStream(); RecordId recordId = message.getId(); + threadPoolTaskExecutor.execute(() -> { + List list = JSON.parseArray(message.getValue(), String.class); + this.handle(list); + }); + // 消费完后直接删除消息 redisStream.del(streamKay, String.valueOf(recordId)); } + + + /** + * 处理在线消息 + * @param msg 在线消息 + */ + private void handle(List msg) { + List dcDevices = msg.stream().map(iotDeviceId -> { + DcDevice dcDevice = new DcDevice(); + dcDevice.setDeviceState(DcDevice.ONLINE); + dcDevice.setIotDeviceId(iotDeviceId); + return dcDevice; + }).collect(Collectors.toList()); + + // 批量更新设备状态 + dcDeviceService.batchUpdate(dcDevices); + } }