diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 056e0e2e..00000000 --- a/Dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -# FROM java:8 -FROM anapsix/alpine-java:8_server-jre_unlimited -# 将当前目录下的jar包复制到docker容器的/目录下 -COPY *.jar /app.jar -# 运行过程中创建一个xx.jar文件 -RUN touch /app.jar - -ENV TZ=Asia/Shanghai JAVA_OPTS="-Xms128m -Xmx256m -Djava.security.egd=file:/dev/./urandom" -ENV PARAMS="--spring.profiles.active=druid" - -# 声明服务运行在8080端口 -EXPOSE 8088 -# 指定docker容器启动时运行jar包 -ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -jar /app.jar $PARAMS" ] \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile deleted file mode 100644 index cd2b9ad5..00000000 --- a/Jenkinsfile +++ /dev/null @@ -1,53 +0,0 @@ -pipeline{ - agent any - environment { - IMAGE_NAME = "ruoyi-admin" - WS = "${WORKSPACE}" - } - - //定义流水线的加工流程 - stages { - //流水线的所有阶段 - stage('1.环境检查'){ - steps { - sh 'pwd && ls -alh' - sh 'printenv' - sh 'docker version' - sh 'java -version' - sh 'git --version' - } - } - - stage('2.编译'){ - agent { - docker { - image 'maven:3-alpine' - args '-v maven-repository:/root/.m2' - } - } - steps { - sh 'pwd && ls -alh' - sh 'mvn -v' - sh 'cd ${WS} && mvn clean package -s "/var/jenkins_home/appconfig/maven/settings.xml" -Dmaven.test.skip=true' - } - } - - stage('3.打包'){ - steps { - sh 'pwd && ls -alh' - sh 'echo ${WS}' - // sh 'mv ${WS}/${IMAGE_NAME}/target/*.jar ${WS}/${IMAGE_NAME}.jar && pwd && ls -alh && docker build -t ${IMAGE_NAME} .' - sh 'docker build -t ${IMAGE_NAME} -f Dockerfile ${WS}/${IMAGE_NAME}/target/' - } - } - - stage('4.部署'){ - // 删除容器和虚悬镜像 - steps { - sh 'pwd && ls -alh' - sh 'docker rm -f ${IMAGE_NAME} || true && docker rmi $(docker images -q -f dangling=true) || true' - sh 'docker run -d -p 8888:8080 --name ${IMAGE_NAME} -v /mydata/logs/${IMAGE_NAME}:/logs/${IMAGE_NAME} ${IMAGE_NAME}' - } - } - } -} \ No newline at end of file diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java index 5205d4ca..c248adf3 100644 --- a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java @@ -3,6 +3,7 @@ package com.ruoyi.web.controller.monitor; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.security.access.prepost.PreAuthorize; @@ -14,14 +15,15 @@ import java.util.*; /** * 缓存监控 - * + * */ @RestController @RequestMapping("/monitor/cache") public class CacheController { @Autowired - private RedisTemplate redisTemplate; + @Qualifier("redisTemplate") + private RedisTemplate redisTemplate; @PreAuthorize("@ss.hasPermi('monitor:cache:list')") @GetMapping() diff --git a/ruoyi-admin/src/main/resources/application-druid.yml b/ruoyi-admin/src/main/resources/application-druid.yml index 86e6f7da..ea56f034 100644 --- a/ruoyi-admin/src/main/resources/application-druid.yml +++ b/ruoyi-admin/src/main/resources/application-druid.yml @@ -8,10 +8,10 @@ spring: master: # 公司数据库地址 # url: jdbc:mysql://10.168.3.169:3306/athena?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8 - url: jdbc:mysql://39.106.31.193:3306/jihe-dc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true + url: jdbc:mysql://127.0.0.1:3306/jihe-hs?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true username: root # password: Platform123!@# - password: 123456 + password: root # 从库数据源 slave: # 从数据源开关/默认关闭 diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 3f1b7d15..0c5d4452 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -9,7 +9,7 @@ ruoyi: # 实例演示开关 demoEnabled: true # 文件路径 示例( Windows配置D:/ruoyi/uploadPath,Linux配置 /home/ruoyi/uploadPath) - profile: /home/ruoyi/uploadPath + profile: /home/jhdc/uploadPath # 获取ip地址开关 addressEnabled: false # 验证码类型 math 数组计算 char 字符验证 behavioral 行为验证码(拖动式与点触式) @@ -18,7 +18,7 @@ ruoyi: # 开发环境配置 server: # 服务器的HTTP端口,默认为8080 - port: 8088 + port: 8080 servlet: # 应用的访问路径 context-path: / @@ -69,7 +69,7 @@ spring: # 数据库索引 database: 0 # 密码 - password: 123456 + password: Redis123!@# # 连接超时时间 timeout: 10s lettuce: @@ -154,7 +154,29 @@ aj: interference-options: 2 -# 物联平台ip地址 iot: + # 物联平台地址 address: http://127.0.0.1:8080 + # redis 配置 + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 10 + # 密码 + password: + # 连接超时时间 + timeout: 10s + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 0 + # 连接池中的最大空闲连接 + max-idle: 8 + # 连接池的最大数据库连接数 + max-active: 8 + # #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms diff --git a/ruoyi-admin/src/main/resources/logback.xml b/ruoyi-admin/src/main/resources/logback.xml index a360583f..50f0f7d9 100644 --- a/ruoyi-admin/src/main/resources/logback.xml +++ b/ruoyi-admin/src/main/resources/logback.xml @@ -1,7 +1,7 @@ - + diff --git a/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java b/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java index 6223325a..7bafc887 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java @@ -1,7 +1,7 @@ package com.zc.common.core.redis.pubsub; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @@ -21,6 +21,7 @@ public class RedisPubSub @Autowired private MessageListenerAdapter messageListenerAdapter; @Autowired + @Qualifier("stringRedisTemplate") private StringRedisTemplate redisTemplate; /** diff --git a/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java index 660a50c7..36c1a7cd 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java @@ -1,11 +1,9 @@ package com.zc.common.core.redis.stream; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.List; @@ -16,15 +14,17 @@ import java.util.Objects; * 对redis stream命令的一些实现,可单独使用 * @author xiepufeng */ -@Component public class RedisStream { - @Autowired - private StringRedisTemplate redisTemplate; + private final StringRedisTemplate redisTemplate; + + private final StreamMessageListenerContainer> listenerContainer; + + public RedisStream(StringRedisTemplate redisTemplate, StreamMessageListenerContainer> listenerContainer) { + this.redisTemplate = redisTemplate; + this.listenerContainer = listenerContainer; + } - @Autowired - private StreamMessageListenerContainer> listenerContainer; - /** * 订阅消息 * @param key 队列 diff --git a/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java new file mode 100644 index 00000000..98c81c0a --- /dev/null +++ b/ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java @@ -0,0 +1,26 @@ +package com.zc.common.core.redis.stream; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.stereotype.Component; + +@Component +public class RedisStreamBeans { + + @Bean(name = "redisStream") + public RedisStream redisStream( + @Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate, + @Qualifier("streamMessageListenerContainer") StreamMessageListenerContainer> listenerContainer) { + return new RedisStream(redisTemplate, listenerContainer); + } + + @Bean(name = "iotRedisStream") + public RedisStream iotRedisStream( + @Qualifier("iotStringRedisTemplate") StringRedisTemplate redisTemplate, + @Qualifier("iotStreamMessageListenerContainer") StreamMessageListenerContainer> listenerContainer) { + return new RedisStream(redisTemplate, listenerContainer); + } +} diff --git a/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java b/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java index 096b6252..d2912915 100644 --- a/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java +++ b/ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java @@ -25,7 +25,7 @@ public class WebSocketService private static final String EVENT = "event"; - private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class); + private static final RedisStream redisStream = SpringUtils.getBean("redisStream"); /** * 给指定客户端发送消息 diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java index 251c7e6d..31db2e7e 100644 --- a/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java @@ -1,11 +1,21 @@ package com.ruoyi.framework.config; +import io.lettuce.core.ClientOptions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -14,17 +24,51 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import java.time.Duration; + /** * redis配置 - * + * */ @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { - @Bean + + @Primary + @Bean(name = "redisConnectionFactory") + public RedisConnectionFactory connectionFactory(RedisProperties redisProperties){ + + + RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); + + standaloneConfig.setHostName(redisProperties.getHost()); + standaloneConfig.setPort(redisProperties.getPort()); + standaloneConfig.setDatabase(redisProperties.getDatabase()); + standaloneConfig.setPassword(redisProperties.getPassword()); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle()); + genericObjectPoolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle()); + genericObjectPoolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive()); + genericObjectPoolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().toMillis()); + + + LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() + .poolConfig(genericObjectPoolConfig) + .commandTimeout(redisProperties.getTimeout()) + .clientOptions(ClientOptions.builder() + .autoReconnect(true) + .build()) + .build(); + + return new LettuceConnectionFactory(standaloneConfig, clientConfig); + } + + @Primary + @Bean(name = "redisTemplate") @SuppressWarnings(value = { "unchecked", "rawtypes" }) - public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) + public RedisTemplate redisTemplate(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) { RedisTemplate template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); @@ -48,6 +92,11 @@ public class RedisConfig extends CachingConfigurerSupport return template; } + @Bean(name = "stringRedisTemplate") + StringRedisTemplate stringRedisTemplate(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) { + return new StringRedisTemplate(connectionFactory); + } + @Bean public DefaultRedisScript limitScript() { diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java new file mode 100644 index 00000000..1a492ca6 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java @@ -0,0 +1,98 @@ +package com.zc.framework.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import com.ruoyi.framework.config.FastJson2JsonRedisSerializer; +import io.lettuce.core.ClientOptions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * redis配置 + * + */ +@Configuration +@EnableCaching +public class IotRedisConfig +{ + private final IotRedisConfigProperties redisConfigProperties; + + public IotRedisConfig(IotRedisConfigProperties redisConfigProperties) { + this.redisConfigProperties = redisConfigProperties; + } + + @Bean(name = "iotRedisConnectionFactory") + public RedisConnectionFactory iotRedisConnectionFactory() { + + RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); + + standaloneConfig.setHostName(redisConfigProperties.getHost()); + standaloneConfig.setPort(redisConfigProperties.getPort()); + standaloneConfig.setDatabase(redisConfigProperties.getDatabase()); + standaloneConfig.setPassword(redisConfigProperties.getPassword()); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(redisConfigProperties.getLettuce().getPool().getMaxIdle()); + genericObjectPoolConfig.setMinIdle(redisConfigProperties.getLettuce().getPool().getMinIdle()); + genericObjectPoolConfig.setMaxTotal(redisConfigProperties.getLettuce().getPool().getMaxActive()); + genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperties.getLettuce().getPool().getMaxWait().toMillis()); + + LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() + .poolConfig(genericObjectPoolConfig) + .commandTimeout(redisConfigProperties.getTimeout()) + .clientOptions(ClientOptions.builder() + .autoReconnect(true) + .build()) + .build(); + + return new LettuceConnectionFactory(standaloneConfig, clientConfig); + } + + // 创建对应于从Redis数据源的RedisTemplate + @Bean(name = "iotRedisTemplate") + public RedisTemplate iotRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(connectionFactory); + + FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); + + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); + serializer.setObjectMapper(mapper); + + // 使用StringRedisSerializer来序列化和反序列化redis的key值 + template.setKeySerializer(new StringRedisSerializer()); + template.setValueSerializer(serializer); + + // Hash的key也采用StringRedisSerializer的序列化方式 + template.setHashKeySerializer(new StringRedisSerializer()); + template.setHashValueSerializer(serializer); + + template.afterPropertiesSet(); + return template; + } + + @Bean(name = "iotStringRedisTemplate") + public StringRedisTemplate stringRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { + StringRedisTemplate template = new StringRedisTemplate(); + template.setConnectionFactory(connectionFactory); + // 可以根据需要进行更多的配置,例如设置序列化器、初始化参数等 + return template; + } + +} diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java new file mode 100644 index 00000000..2ae248e0 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java @@ -0,0 +1,20 @@ +package com.zc.framework.config; + +import lombok.Data; +import org.springframework.boot.autoconfigure.data.redis.RedisProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import java.time.Duration; + +@Configuration +@ConfigurationProperties(prefix = "iot.redis") +@Data +public class IotRedisConfigProperties { + private String host; + private int port; + private int database; + private String password; + private Duration timeout; + private RedisProperties.Lettuce lettuce; +} diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java index 77425531..f87142b0 100644 --- a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java @@ -1,6 +1,7 @@ package com.zc.framework.config; import com.zc.common.core.redis.pubsub.RedisReceiver; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -21,7 +22,7 @@ public class RedisPubSubConfig { * @return */ @Bean - RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); @@ -38,8 +39,4 @@ public class RedisPubSubConfig { return new MessageListenerAdapter(receiver, "onMessage"); } - @Bean - StringRedisTemplate template(RedisConnectionFactory connectionFactory) { - return new StringRedisTemplate(connectionFactory); - } } diff --git a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java index b690d846..98925d81 100644 --- a/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java +++ b/ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java @@ -1,5 +1,6 @@ package com.zc.framework.config; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -32,8 +33,21 @@ public class RedisStreamConfig { * 创建Stream消息监听容器 * @return */ - @Bean - public StreamMessageListenerContainer> streamMessageListenerContainer(RedisConnectionFactory factory, StreamMessageListenerContainer + @Bean(name = "streamMessageListenerContainer") + public StreamMessageListenerContainer> streamMessageListenerContainer(@Qualifier("redisConnectionFactory") RedisConnectionFactory factory, StreamMessageListenerContainer + .StreamMessageListenerContainerOptions> options) { + // 创建Stream消息监听容器 + return StreamMessageListenerContainer.create(factory, options); + } + + /** + * 创建Stream消息监听容器(iot) + * + * @return + */ + @Bean(name = "iotStreamMessageListenerContainer") + public StreamMessageListenerContainer> iotStreamMessageListenerContainer( + @Qualifier("iotRedisConnectionFactory") RedisConnectionFactory factory, StreamMessageListenerContainer .StreamMessageListenerContainerOptions> options) { // 创建Stream消息监听容器 return StreamMessageListenerContainer.create(factory, options); diff --git a/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java b/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java new file mode 100644 index 00000000..a6245b0d --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java @@ -0,0 +1,71 @@ +package com.zc.business.constant; + +/** + * redis Stream 消息队列、组、消费者定义 + */ +public class RedisStreamConstants { + + /** + * 设备上线 队列、组、消费者定义 + */ + public static class DeviceOnline { + public final static String KEY = "device:online"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 设备离线 队列、组、消费者定义 + */ + public static class DeviceOffline { + public final static String KEY = "device:offline"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 设备事件 队列、组、消费者定义 + */ + public static class DeviceEvent { + public final static String KEY = "device:event"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 设备属性上报 队列、组、消费者定义 + */ + public static class DevicePropertyReport { + public final static String KEY = "device:property:report"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 读取属性回复 队列、组、消费者定义 + */ + public static class DevicePropertyReadReply { + public final static String KEY = "device:property:read:reply"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + + /** + * 写属性回复 队列、组、消费者定义 + */ + public static class DevicePropertyWriteReply { + public final static String KEY = "device:property:write:reply"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 调用功能回复 队列、组、消费者定义 + */ + public static class DeviceFunctionReply { + public final static String KEY = "device:function:reply"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java b/zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java new file mode 100644 index 00000000..7ed643d8 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 设备消息监听 + */ +@Component +public class DeviceEventListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java new file mode 100644 index 00000000..a4234d5d --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 调用功能回复消息监听 + */ +@Component +public class DeviceFunctionReplyListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java new file mode 100644 index 00000000..54a656d6 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 读取属性回复消息监听 + */ +@Component +public class DevicePropertyReadReplyListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java new file mode 100644 index 00000000..019ec402 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 设备属性上报消息监听 + */ +@Component +public class DevicePropertyReportListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java new file mode 100644 index 00000000..0d8157cc --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 写属性回复消息监听 + */ +@Component +public class DevicePropertyWriteReplyListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java b/zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java new file mode 100644 index 00000000..2ad5c0b0 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java @@ -0,0 +1,100 @@ +package com.zc.business.message.device; + +import com.zc.business.constant.RedisStreamConstants; +import com.zc.common.core.redis.stream.RedisStream; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 初始化器 + * 设备消息订阅 + * 1.在线消息 + * 2.离线消息 + * @author xiepufeng + */ +@Component +public class MessageSubscription implements CommandLineRunner { + + + @Autowired + @Qualifier("iotRedisStream") + private RedisStream redisStream; + + @Resource + private OnlineMessageListener onlineMessageListener; + @Resource + private OfflineMessageListener offlineMessageListener; + + @Resource + private DeviceEventListener deviceEventListener; + + @Resource + private DevicePropertyReportListener devicePropertyReportListener; + + @Resource + private DevicePropertyReadReplyListener devicePropertyReadReplyListener; + + @Resource + private DevicePropertyWriteReplyListener devicePropertyWriteReplyListener; + + @Resource + private DeviceFunctionReplyListener deviceFunctionReplyListener; + + @Override + public void run(String... args) { + + // 设备在线消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceOnline.KEY, + RedisStreamConstants.DeviceOnline.GROUP, + RedisStreamConstants.DeviceOnline.CONSUMER, + onlineMessageListener); + + // 离线消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceOffline.KEY, + RedisStreamConstants.DeviceOffline.GROUP, + RedisStreamConstants.DeviceOffline.CONSUMER, + offlineMessageListener); + + // 设备事件订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceEvent.KEY, + RedisStreamConstants.DeviceEvent.GROUP, + RedisStreamConstants.DeviceEvent.CONSUMER, + deviceEventListener); + + // 设备属性上报消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DevicePropertyReport.KEY, + RedisStreamConstants.DevicePropertyReport.GROUP, + RedisStreamConstants.DevicePropertyReport.CONSUMER, + devicePropertyReportListener); + + // 读取属性回复消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DevicePropertyReadReply.KEY, + RedisStreamConstants.DevicePropertyReadReply.GROUP, + RedisStreamConstants.DevicePropertyReadReply.CONSUMER, + devicePropertyReadReplyListener); + + // 写属性回复消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DevicePropertyWriteReply.KEY, + RedisStreamConstants.DevicePropertyWriteReply.GROUP, + RedisStreamConstants.DevicePropertyWriteReply.CONSUMER, + devicePropertyWriteReplyListener); + + // 调用功能回复消息订阅 + redisStream.subscriptionAutoAck( + RedisStreamConstants.DeviceFunctionReply.KEY, + RedisStreamConstants.DeviceFunctionReply.GROUP, + RedisStreamConstants.DeviceFunctionReply.CONSUMER, + deviceFunctionReplyListener); + + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java new file mode 100644 index 00000000..a773a5ce --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 离线消息监听 + */ +@Component +public class OfflineMessageListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java new file mode 100644 index 00000000..73e897e6 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java @@ -0,0 +1,31 @@ +package com.zc.business.message.device; + +import com.zc.common.core.redis.stream.RedisStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 在线消息监听 + */ +@Component +public class OnlineMessageListener implements StreamListener> +{ + private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class); + + @Autowired + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java b/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java index 4756f165..73611cf5 100644 --- a/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java +++ b/zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java @@ -31,7 +31,7 @@ public class WebSocketService private static final Logger log = Logger.getLogger(WebSocketService.class.getName()); - private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class); + private static final RedisStream redisStream = SpringUtils.getBean("redisStream"); /** * 给指定客户端发送消息 diff --git a/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java b/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java index b7d81f63..a00cce5f 100644 --- a/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java +++ b/zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java @@ -19,7 +19,6 @@ import com.zc.websocket.dto.result.JsonMsg; import com.zc.websocket.enums.ChannelStatus; import com.zc.websocket.enums.ErrorCodeEnum; import com.zc.websocket.handler.CmdHandler; -import org.apache.poi.ss.formula.functions.T; import java.lang.reflect.Type; import java.util.logging.Logger; @@ -33,7 +32,7 @@ public class EventCmdHandler implements CmdHandler private static final Logger log = Logger.getLogger(EventCmdHandler.class.getName()); - private final RedisStream redisStream = SpringUtils.getBean(RedisStream.class); + private final RedisStream redisStream = SpringUtils.getBean("redisStream"); /* * 消息通道处理器