From 0b860edd0b27b6fa5580ce8cd3b0e30163305e76 Mon Sep 17 00:00:00 2001 From: xiepufeng <1072271977@qq.com> Date: Fri, 12 Jan 2024 18:21:20 +0800 Subject: [PATCH] =?UTF-8?q?1.redis=E5=A4=9A=E6=95=B0=E6=8D=AE=E6=BA=90?= =?UTF-8?q?=E6=94=AF=E6=8C=81=202.=E8=AE=A2=E9=98=85=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/monitor/CacheController.java | 6 +- .../src/main/resources/application.yml | 24 ++++- .../common/core/redis/pubsub/RedisPubSub.java | 3 +- .../common/core/redis/stream/RedisStream.java | 16 +-- .../core/redis/stream/RedisStreamBeans.java | 26 +++++ .../core/websocket/WebSocketService.java | 2 +- .../ruoyi/framework/config/RedisConfig.java | 55 +++++++++- .../zc/framework/config/IotRedisConfig.java | 98 +++++++++++++++++ .../config/IotRedisConfigProperties.java | 20 ++++ .../framework/config/RedisPubSubConfig.java | 7 +- .../framework/config/RedisStreamConfig.java | 18 +++- .../constant/RedisStreamConstants.java | 71 +++++++++++++ .../message/device/DeviceEventListener.java | 31 ++++++ .../device/DeviceFunctionReplyListener.java | 31 ++++++ .../DevicePropertyReadReplyListener.java | 31 ++++++ .../device/DevicePropertyReportListener.java | 31 ++++++ .../DevicePropertyWriteReplyListener.java | 31 ++++++ .../message/device/MessageSubscription.java | 100 ++++++++++++++++++ .../device/OfflineMessageListener.java | 31 ++++++ .../message/device/OnlineMessageListener.java | 31 ++++++ .../websocket/handler/WebSocketService.java | 2 +- .../handler/event/EventCmdHandler.java | 3 +- 22 files changed, 642 insertions(+), 26 deletions(-) create mode 100644 ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java create mode 100644 ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java create mode 100644 ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java create mode 100644 zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java index 5205d4ca..c248adf3 100644 --- a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java @@ -3,6 +3,7 @@ package com.ruoyi.web.controller.monitor; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.security.access.prepost.PreAuthorize; @@ -14,14 +15,15 @@ import java.util.*; /** * 缓存监控 - * + * */ @RestController @RequestMapping("/monitor/cache") public class CacheController { @Autowired - private RedisTemplate redisTemplate; + @Qualifier("redisTemplate") + private RedisTemplate redisTemplate; @PreAuthorize("@ss.hasPermi('monitor:cache:list')") @GetMapping() diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 593e5a78..a89bbdc6 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -154,7 +154,29 @@ aj: interference-options: 2 -# 物联平台ip地址 iot: + # 物联平台地址 address: 127.0.0.1:8080 + # redis 配置 + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 10 + # 密码 + password: + # 连接超时时间 + timeout: 10s + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 0 + # 连接池中的最大空闲连接 + max-idle: 8 + # 连接池的最大数据库连接数 + max-active: 8 + # #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms diff --git a/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java b/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java index 6223325a..7bafc887 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java @@ -1,7 +1,7 @@ package com.zc.common.core.redis.pubsub; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @@ -21,6 +21,7 @@ public class RedisPubSub @Autowired private MessageListenerAdapter messageListenerAdapter; @Autowired + @Qualifier("stringRedisTemplate") private StringRedisTemplate redisTemplate; /** diff --git a/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java index 660a50c7..36c1a7cd 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java @@ -1,11 +1,9 @@ package com.zc.common.core.redis.stream; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.List; @@ -16,15 +14,17 @@ import java.util.Objects; * 对redis stream命令的一些实现,可单独使用 * @author xiepufeng */ -@Component public class RedisStream { - @Autowired - private StringRedisTemplate redisTemplate; + private final StringRedisTemplate redisTemplate; + + private final StreamMessageListenerContainer> listenerContainer; + + public RedisStream(StringRedisTemplate redisTemplate, StreamMessageListenerContainer> listenerContainer) { + this.redisTemplate = redisTemplate; + this.listenerContainer = listenerContainer; + } - @Autowired - private StreamMessageListenerContainer> listenerContainer; - /** * 订阅消息 * @param key 队列 diff --git a/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java new file mode 100644 index 00000000..98c81c0a --- /dev/null +++ b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java @@ -0,0 +1,26 @@ +package com.zc.common.core.redis.stream; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.stereotype.Component; + +@Component +public class RedisStreamBeans { + + @Bean(name = "redisStream") + public RedisStream redisStream( + @Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate, + @Qualifier("streamMessageListenerContainer") StreamMessageListenerContainer> listenerContainer) { + return new RedisStream(redisTemplate, listenerContainer); + } + + @Bean(name = "iotRedisStream") + public RedisStream iotRedisStream( + @Qualifier("iotStringRedisTemplate") StringRedisTemplate redisTemplate, + @Qualifier("iotStreamMessageListenerContainer") StreamMessageListenerContainer> listenerContainer) { + return new RedisStream(redisTemplate, listenerContainer); + } +} diff --git a/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java b/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java index 096b6252..d2912915 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java @@ -25,7 +25,7 @@ public class WebSocketService private static final String EVENT = "event"; - private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class); + private static final RedisStream redisStream = SpringUtils.getBean("redisStream"); /** * 给指定客户端发送消息 diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java index 251c7e6d..31db2e7e 100644 --- a/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java @@ -1,11 +1,21 @@ package com.ruoyi.framework.config; +import io.lettuce.core.ClientOptions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -14,17 +24,51 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import java.time.Duration; + /** * redis配置 - * + * */ @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { - @Bean + + @Primary + @Bean(name = "redisConnectionFactory") + public RedisConnectionFactory connectionFactory(RedisProperties redisProperties){ + + + RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); + + standaloneConfig.setHostName(redisProperties.getHost()); + standaloneConfig.setPort(redisProperties.getPort()); + standaloneConfig.setDatabase(redisProperties.getDatabase()); + standaloneConfig.setPassword(redisProperties.getPassword()); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle()); + genericObjectPoolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle()); + genericObjectPoolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive()); + genericObjectPoolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().toMillis()); + + + LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() + .poolConfig(genericObjectPoolConfig) + .commandTimeout(redisProperties.getTimeout()) + .clientOptions(ClientOptions.builder() + .autoReconnect(true) + .build()) + .build(); + + return new LettuceConnectionFactory(standaloneConfig, clientConfig); + } + + @Primary + @Bean(name = "redisTemplate") @SuppressWarnings(value = { "unchecked", "rawtypes" }) - public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) + public RedisTemplate redisTemplate(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) { RedisTemplate template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); @@ -48,6 +92,11 @@ public class RedisConfig extends CachingConfigurerSupport return template; } + @Bean(name = "stringRedisTemplate") + StringRedisTemplate stringRedisTemplate(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) { + return new StringRedisTemplate(connectionFactory); + } + @Bean public DefaultRedisScript limitScript() { diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java new file mode 100644 index 00000000..1a492ca6 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java @@ -0,0 +1,98 @@ +package com.zc.framework.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import com.ruoyi.framework.config.FastJson2JsonRedisSerializer; +import io.lettuce.core.ClientOptions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * redis配置 + * + */ +@Configuration +@EnableCaching +public class IotRedisConfig +{ + private final IotRedisConfigProperties redisConfigProperties; + + public IotRedisConfig(IotRedisConfigProperties redisConfigProperties) { + this.redisConfigProperties = redisConfigProperties; + } + + @Bean(name = "iotRedisConnectionFactory") + public RedisConnectionFactory iotRedisConnectionFactory() { + + RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); + + standaloneConfig.setHostName(redisConfigProperties.getHost()); + standaloneConfig.setPort(redisConfigProperties.getPort()); + standaloneConfig.setDatabase(redisConfigProperties.getDatabase()); + standaloneConfig.setPassword(redisConfigProperties.getPassword()); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(redisConfigProperties.getLettuce().getPool().getMaxIdle()); + genericObjectPoolConfig.setMinIdle(redisConfigProperties.getLettuce().getPool().getMinIdle()); + genericObjectPoolConfig.setMaxTotal(redisConfigProperties.getLettuce().getPool().getMaxActive()); + genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperties.getLettuce().getPool().getMaxWait().toMillis()); + + LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() + .poolConfig(genericObjectPoolConfig) + .commandTimeout(redisConfigProperties.getTimeout()) + .clientOptions(ClientOptions.builder() + .autoReconnect(true) + .build()) + .build(); + + return new LettuceConnectionFactory(standaloneConfig, clientConfig); + } + + // 创建对应于从Redis数据源的RedisTemplate + @Bean(name = "iotRedisTemplate") + public RedisTemplate iotRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(connectionFactory); + + FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); + + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); + serializer.setObjectMapper(mapper); + + // 使用StringRedisSerializer来序列化和反序列化redis的key值 + template.setKeySerializer(new StringRedisSerializer()); + template.setValueSerializer(serializer); + + // Hash的key也采用StringRedisSerializer的序列化方式 + template.setHashKeySerializer(new StringRedisSerializer()); + template.setHashValueSerializer(serializer); + + template.afterPropertiesSet(); + return template; + } + + @Bean(name = "iotStringRedisTemplate") + public StringRedisTemplate stringRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { + StringRedisTemplate template = new StringRedisTemplate(); + template.setConnectionFactory(connectionFactory); + // 可以根据需要进行更多的配置,例如设置序列化器、初始化参数等 + return template; + } + +} diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java new file mode 100644 index 00000000..2ae248e0 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java @@ -0,0 +1,20 @@ +package com.zc.framework.config; + +import lombok.Data; +import org.springframework.boot.autoconfigure.data.redis.RedisProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import java.time.Duration; + +@Configuration +@ConfigurationProperties(prefix = "iot.redis") +@Data +public class IotRedisConfigProperties { + private String host; + private int port; + private int database; + private String password; + private Duration timeout; + private RedisProperties.Lettuce lettuce; +} diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java index 77425531..f87142b0 100644 --- a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java @@ -1,6 +1,7 @@ package com.zc.framework.config; import com.zc.common.core.redis.pubsub.RedisReceiver; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -21,7 +22,7 @@ public class RedisPubSubConfig { * @return */ @Bean - RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); @@ -38,8 +39,4 @@ public class RedisPubSubConfig { return new MessageListenerAdapter(receiver, "onMessage"); } - @Bean - StringRedisTemplate template(RedisConnectionFactory connectionFactory) { - return new StringRedisTemplate(connectionFactory); - } } diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java index b690d846..98925d81 100644 --- a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java @@ -1,5 +1,6 @@ package com.zc.framework.config; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -32,8 +33,21 @@ public class RedisStreamConfig { * 创建Stream消息监听容器 * @return */ - @Bean - public StreamMessageListenerContainer> streamMessageListenerContainer(RedisConnectionFactory factory, StreamMessageListenerContainer + @Bean(name = "streamMessageListenerContainer") + public StreamMessageListenerContainer> streamMessageListenerContainer(@Qualifier("redisConnectionFactory") RedisConnectionFactory factory, StreamMessageListenerContainer + .StreamMessageListenerContainerOptions> options) { + // 创建Stream消息监听容器 + return StreamMessageListenerContainer.create(factory, options); + } + + /** + * 创建Stream消息监听容器(iot) + * + * @return + */ + @Bean(name = "iotStreamMessageListenerContainer") + public StreamMessageListenerContainer> iotStreamMessageListenerContainer( + @Qualifier("iotRedisConnectionFactory") RedisConnectionFactory factory, StreamMessageListenerContainer .StreamMessageListenerContainerOptions> options) { // 创建Stream消息监听容器 return StreamMessageListenerContainer.create(factory, options); diff --git a/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java b/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java new file mode 100644 index 00000000..a6245b0d --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java @@ -0,0 +1,71 @@ +package com.zc.business.constant; + +/** + * redis Stream 消息队列、组、消费者定义 + */ +public class RedisStreamConstants { + + /** + * 设备上线 队列、组、消费者定义 + */ + public static class DeviceOnline { + public final static String KEY = "device:online"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 设备离线 队列、组、消费者定义 + */ + public static class DeviceOffline { + public final static String KEY = "device:offline"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 设备事件 队列、组、消费者定义 + */ + public static class DeviceEvent { + public final static String KEY = "device:event"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 设备属性上报 队列、组、消费者定义 + */ + public static class DevicePropertyReport { + public final static String KEY = "device:property:report"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 读取属性回复 队列、组、消费者定义 + */ + public static class DevicePropertyReadReply { + public final static String KEY = "device:property:read:reply"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + + /** + * 写属性回复 队列、组、消费者定义 + */ + public static class DevicePropertyWriteReply { + public final static String KEY = "device:property:write:reply"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 调用功能回复 队列、组、消费者定义 + */ + public static class DeviceFunctionReply { + public final static String KEY = "device:function:reply"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java b/zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java new file mode 100644 index 00000000..7ed643d8 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 设备消息监听 + */ +@Component +public class DeviceEventListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java new file mode 100644 index 00000000..a4234d5d --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 调用功能回复消息监听 + */ +@Component +public class DeviceFunctionReplyListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java new file mode 100644 index 00000000..54a656d6 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 读取属性回复消息监听 + */ +@Component +public class DevicePropertyReadReplyListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java new file mode 100644 index 00000000..019ec402 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 设备属性上报消息监听 + */ +@Component +public class DevicePropertyReportListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java new file mode 100644 index 00000000..0d8157cc --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 写属性回复消息监听 + */ +@Component +public class DevicePropertyWriteReplyListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java b/zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java new file mode 100644 index 00000000..2ad5c0b0 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java @@ -0,0 +1,100 @@ +package com.zc.business.message.device; + +import com.zc.business.constant.RedisStreamConstants; +import com.zc.common.core.redis.stream.RedisStream; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 初始化器 + * 设备消息订阅 + * 1.在线消息 + * 2.离线消息 + * @author xiepufeng + */ +@Component +public class MessageSubscription implements CommandLineRunner { + + + @Autowired + @Qualifier("iotRedisStream") + private RedisStream redisStream; + + @Resource + private OnlineMessageListener onlineMessageListener; + @Resource + private OfflineMessageListener offlineMessageListener; + + @Resource + private DeviceEventListener deviceEventListener; + + @Resource + private DevicePropertyReportListener devicePropertyReportListener; + + @Resource + private DevicePropertyReadReplyListener devicePropertyReadReplyListener; + + @Resource + private DevicePropertyWriteReplyListener devicePropertyWriteReplyListener; + + @Resource + private DeviceFunctionReplyListener deviceFunctionReplyListener; + + @Override + public void run(String... args) { + + // 设备在线消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceOnline.KEY, + RedisStreamConstants.DeviceOnline.GROUP, + RedisStreamConstants.DeviceOnline.CONSUMER, + onlineMessageListener); + + // 离线消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceOffline.KEY, + RedisStreamConstants.DeviceOffline.GROUP, + RedisStreamConstants.DeviceOffline.CONSUMER, + offlineMessageListener); + + // 设备事件订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceEvent.KEY, + RedisStreamConstants.DeviceEvent.GROUP, + RedisStreamConstants.DeviceEvent.CONSUMER, + deviceEventListener); + + // 设备属性上报消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DevicePropertyReport.KEY, + RedisStreamConstants.DevicePropertyReport.GROUP, + RedisStreamConstants.DevicePropertyReport.CONSUMER, + devicePropertyReportListener); + + // 读取属性回复消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DevicePropertyReadReply.KEY, + RedisStreamConstants.DevicePropertyReadReply.GROUP, + RedisStreamConstants.DevicePropertyReadReply.CONSUMER, + devicePropertyReadReplyListener); + + // 写属性回复消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DevicePropertyWriteReply.KEY, + RedisStreamConstants.DevicePropertyWriteReply.GROUP, + RedisStreamConstants.DevicePropertyWriteReply.CONSUMER, + devicePropertyWriteReplyListener); + + // 调用功能回复消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceFunctionReply.KEY, + RedisStreamConstants.DeviceFunctionReply.GROUP, + RedisStreamConstants.DeviceFunctionReply.CONSUMER, + deviceFunctionReplyListener); + + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java new file mode 100644 index 00000000..a773a5ce --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 离线消息监听 + */ +@Component +public class OfflineMessageListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java new file mode 100644 index 00000000..73e897e6 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 在线消息监听 + */ +@Component +public class OnlineMessageListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java b/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java index 4756f165..73611cf5 100644 --- a/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java +++ b/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java @@ -31,7 +31,7 @@ public class WebSocketService private static final Logger log = Logger.getLogger(WebSocketService.class.getName()); - private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class); + private static final RedisStream redisStream = SpringUtils.getBean("redisStream"); /** * 给指定客户端发送消息 diff --git a/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java b/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java index b7d81f63..a00cce5f 100644 --- a/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java +++ b/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java @@ -19,7 +19,6 @@ import com.zc.websocket.dto.result.JsonMsg; import com.zc.websocket.enums.ChannelStatus; import com.zc.websocket.enums.ErrorCodeEnum; import com.zc.websocket.handler.CmdHandler; -import org.apache.poi.ss.formula.functions.T; import java.lang.reflect.Type; import java.util.logging.Logger; @@ -33,7 +32,7 @@ public class EventCmdHandler implements CmdHandler private static final Logger log = Logger.getLogger(EventCmdHandler.class.getName()); - private final RedisStream redisStream = SpringUtils.getBean(RedisStream.class); + private final RedisStream redisStream = SpringUtils.getBean("redisStream"); /* * 消息通道处理器