Browse Source

Merge remote-tracking branch 'origin/develop' into develop

develop
王兴琳 11 months ago
parent
commit
ce12c7d550
  1. 14
      Dockerfile
  2. 53
      Jenkinsfile
  3. 6
      ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java
  4. 4
      ruoyi-admin/src/main/resources/application-druid.yml
  5. 30
      ruoyi-admin/src/main/resources/application.yml
  6. 2
      ruoyi-admin/src/main/resources/logback.xml
  7. 3
      ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java
  8. 16
      ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java
  9. 26
      ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java
  10. 2
      ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java
  11. 55
      ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java
  12. 98
      ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java
  13. 20
      ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java
  14. 7
      ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java
  15. 18
      ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java
  16. 71
      zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java
  17. 31
      zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java
  18. 31
      zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java
  19. 31
      zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java
  20. 31
      zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java
  21. 31
      zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java
  22. 100
      zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java
  23. 31
      zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java
  24. 31
      zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java
  25. 2
      zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java
  26. 3
      zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java

14
Dockerfile

@ -1,14 +0,0 @@
# FROM java:8
FROM anapsix/alpine-java:8_server-jre_unlimited
# 将当前目录下的jar包复制到docker容器的/目录下
COPY *.jar /app.jar
# 运行过程中创建一个xx.jar文件
RUN touch /app.jar
ENV TZ=Asia/Shanghai JAVA_OPTS="-Xms128m -Xmx256m -Djava.security.egd=file:/dev/./urandom"
ENV PARAMS="--spring.profiles.active=druid"
# 声明服务运行在8080端口
EXPOSE 8088
# 指定docker容器启动时运行jar包
ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -jar /app.jar $PARAMS" ]

53
Jenkinsfile

@ -1,53 +0,0 @@
pipeline{
agent any
environment {
IMAGE_NAME = "ruoyi-admin"
WS = "${WORKSPACE}"
}
//定义流水线的加工流程
stages {
//流水线的所有阶段
stage('1.环境检查'){
steps {
sh 'pwd && ls -alh'
sh 'printenv'
sh 'docker version'
sh 'java -version'
sh 'git --version'
}
}
stage('2.编译'){
agent {
docker {
image 'maven:3-alpine'
args '-v maven-repository:/root/.m2'
}
}
steps {
sh 'pwd && ls -alh'
sh 'mvn -v'
sh 'cd ${WS} && mvn clean package -s "/var/jenkins_home/appconfig/maven/settings.xml" -Dmaven.test.skip=true'
}
}
stage('3.打包'){
steps {
sh 'pwd && ls -alh'
sh 'echo ${WS}'
// sh 'mv ${WS}/${IMAGE_NAME}/target/*.jar ${WS}/${IMAGE_NAME}.jar && pwd && ls -alh && docker build -t ${IMAGE_NAME} .'
sh 'docker build -t ${IMAGE_NAME} -f Dockerfile ${WS}/${IMAGE_NAME}/target/'
}
}
stage('4.部署'){
// 删除容器和虚悬镜像
steps {
sh 'pwd && ls -alh'
sh 'docker rm -f ${IMAGE_NAME} || true && docker rmi $(docker images -q -f dangling=true) || true'
sh 'docker run -d -p 8888:8080 --name ${IMAGE_NAME} -v /mydata/logs/${IMAGE_NAME}:/logs/${IMAGE_NAME} ${IMAGE_NAME}'
}
}
}
}

6
ruoyi-admin/src/main/java/com/ruoyi/web/controller/monitor/CacheController.java

@ -3,6 +3,7 @@ package com.ruoyi.web.controller.monitor;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.access.prepost.PreAuthorize;
@ -14,14 +15,15 @@ import java.util.*;
/**
* 缓存监控
*
*
*/
@RestController
@RequestMapping("/monitor/cache")
public class CacheController
{
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Qualifier("redisTemplate")
private RedisTemplate<Object, Object> redisTemplate;
@PreAuthorize("@ss.hasPermi('monitor:cache:list')")
@GetMapping()

4
ruoyi-admin/src/main/resources/application-druid.yml

@ -8,10 +8,10 @@ spring:
master:
# 公司数据库地址
# url: jdbc:mysql://10.168.3.169:3306/athena?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8
url: jdbc:mysql://39.106.31.193:3306/jihe-dc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
url: jdbc:mysql://127.0.0.1:3306/jihe-hs?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
username: root
# password: Platform123!@#
password: 123456
password: root
# 从库数据源
slave:
# 从数据源开关/默认关闭

30
ruoyi-admin/src/main/resources/application.yml

@ -9,7 +9,7 @@ ruoyi:
# 实例演示开关
demoEnabled: true
# 文件路径 示例( Windows配置D:/ruoyi/uploadPath,Linux配置 /home/ruoyi/uploadPath)
profile: /home/ruoyi/uploadPath
profile: /home/jhdc/uploadPath
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数组计算 char 字符验证 behavioral 行为验证码(拖动式与点触式)
@ -18,7 +18,7 @@ ruoyi:
# 开发环境配置
server:
# 服务器的HTTP端口,默认为8080
port: 8088
port: 8080
servlet:
# 应用的访问路径
context-path: /
@ -69,7 +69,7 @@ spring:
# 数据库索引
database: 0
# 密码
password: 123456
password: Redis123!@#
# 连接超时时间
timeout: 10s
lettuce:
@ -154,7 +154,29 @@ aj:
interference-options: 2
# 物联平台ip地址
iot:
# 物联平台地址
address: http://127.0.0.1:8080
# redis 配置
redis:
# 地址
host: localhost
# 端口,默认为6379
port: 6379
# 数据库索引
database: 10
# 密码
password:
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms

2
ruoyi-admin/src/main/resources/logback.xml

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志存放路径 -->
<property name="log.path" value="/home/ruoyi/logs" />
<property name="log.path" value="/home/jhdc/logs" />
<!-- 日志输出格式 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />

3
ruoyi-common/src/main/java/com/zc/common/core/redis/pubsub/RedisPubSub.java

@ -1,7 +1,7 @@
package com.zc.common.core.redis.pubsub;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@ -21,6 +21,7 @@ public class RedisPubSub
@Autowired
private MessageListenerAdapter messageListenerAdapter;
@Autowired
@Qualifier("stringRedisTemplate")
private StringRedisTemplate redisTemplate;
/**

16
ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStream.java

@ -1,11 +1,9 @@
package com.zc.common.core.redis.stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
@ -16,15 +14,17 @@ import java.util.Objects;
* 对redis stream命令的一些实现可单独使用
* @author xiepufeng
*/
@Component
public class RedisStream {
@Autowired
private StringRedisTemplate redisTemplate;
private final StringRedisTemplate redisTemplate;
private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer;
public RedisStream(StringRedisTemplate redisTemplate, StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) {
this.redisTemplate = redisTemplate;
this.listenerContainer = listenerContainer;
}
@Autowired
private StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer;
/**
* 订阅消息
* @param key 队列

26
ruoyi-common/src/main/java/com/zc/common/core/redis/stream/RedisStreamBeans.java

@ -0,0 +1,26 @@
package com.zc.common.core.redis.stream;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class RedisStreamBeans {
@Bean(name = "redisStream")
public RedisStream redisStream(
@Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate,
@Qualifier("streamMessageListenerContainer") StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) {
return new RedisStream(redisTemplate, listenerContainer);
}
@Bean(name = "iotRedisStream")
public RedisStream iotRedisStream(
@Qualifier("iotStringRedisTemplate") StringRedisTemplate redisTemplate,
@Qualifier("iotStreamMessageListenerContainer") StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) {
return new RedisStream(redisTemplate, listenerContainer);
}
}

2
ruoyi-common/src/main/java/com/zc/common/core/websocket/WebSocketService.java

@ -25,7 +25,7 @@ public class WebSocketService
private static final String EVENT = "event";
private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class);
private static final RedisStream redisStream = SpringUtils.getBean("redisStream");
/**
* 给指定客户端发送消息

55
ruoyi-framework/src/main/java/com/ruoyi/framework/config/RedisConfig.java

@ -1,11 +1,21 @@
package com.ruoyi.framework.config;
import io.lettuce.core.ClientOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@ -14,17 +24,51 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import java.time.Duration;
/**
* redis配置
*
*
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{
@Bean
@Primary
@Bean(name = "redisConnectionFactory")
public RedisConnectionFactory connectionFactory(RedisProperties redisProperties){
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();
standaloneConfig.setHostName(redisProperties.getHost());
standaloneConfig.setPort(redisProperties.getPort());
standaloneConfig.setDatabase(redisProperties.getDatabase());
standaloneConfig.setPassword(redisProperties.getPassword());
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle());
genericObjectPoolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle());
genericObjectPoolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive());
genericObjectPoolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().toMillis());
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(genericObjectPoolConfig)
.commandTimeout(redisProperties.getTimeout())
.clientOptions(ClientOptions.builder()
.autoReconnect(true)
.build())
.build();
return new LettuceConnectionFactory(standaloneConfig, clientConfig);
}
@Primary
@Bean(name = "redisTemplate")
@SuppressWarnings(value = { "unchecked", "rawtypes" })
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
public RedisTemplate<Object, Object> redisTemplate(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory)
{
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
@ -48,6 +92,11 @@ public class RedisConfig extends CachingConfigurerSupport
return template;
}
@Bean(name = "stringRedisTemplate")
StringRedisTemplate stringRedisTemplate(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public DefaultRedisScript<Long> limitScript()
{

98
ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfig.java

@ -0,0 +1,98 @@
package com.zc.framework.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.ruoyi.framework.config.FastJson2JsonRedisSerializer;
import io.lettuce.core.ClientOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redis配置
*
*/
@Configuration
@EnableCaching
public class IotRedisConfig
{
private final IotRedisConfigProperties redisConfigProperties;
public IotRedisConfig(IotRedisConfigProperties redisConfigProperties) {
this.redisConfigProperties = redisConfigProperties;
}
@Bean(name = "iotRedisConnectionFactory")
public RedisConnectionFactory iotRedisConnectionFactory() {
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();
standaloneConfig.setHostName(redisConfigProperties.getHost());
standaloneConfig.setPort(redisConfigProperties.getPort());
standaloneConfig.setDatabase(redisConfigProperties.getDatabase());
standaloneConfig.setPassword(redisConfigProperties.getPassword());
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(redisConfigProperties.getLettuce().getPool().getMaxIdle());
genericObjectPoolConfig.setMinIdle(redisConfigProperties.getLettuce().getPool().getMinIdle());
genericObjectPoolConfig.setMaxTotal(redisConfigProperties.getLettuce().getPool().getMaxActive());
genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperties.getLettuce().getPool().getMaxWait().toMillis());
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(genericObjectPoolConfig)
.commandTimeout(redisConfigProperties.getTimeout())
.clientOptions(ClientOptions.builder()
.autoReconnect(true)
.build())
.build();
return new LettuceConnectionFactory(standaloneConfig, clientConfig);
}
// 创建对应于从Redis数据源的RedisTemplate
@Bean(name = "iotRedisTemplate")
public RedisTemplate<Object, Object> iotRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
serializer.setObjectMapper(mapper);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
@Bean(name = "iotStringRedisTemplate")
public StringRedisTemplate stringRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(connectionFactory);
// 可以根据需要进行更多的配置,例如设置序列化器、初始化参数等
return template;
}
}

20
ruoyi-framework/src/main/java/com/zc/framework/config/IotRedisConfigProperties.java

@ -0,0 +1,20 @@
package com.zc.framework.config;
import lombok.Data;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
@ConfigurationProperties(prefix = "iot.redis")
@Data
public class IotRedisConfigProperties {
private String host;
private int port;
private int database;
private String password;
private Duration timeout;
private RedisProperties.Lettuce lettuce;
}

7
ruoyi-framework/src/main/java/com/zc/framework/config/RedisPubSubConfig.java

@ -1,6 +1,7 @@
package com.zc.framework.config;
import com.zc.common.core.redis.pubsub.RedisReceiver;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -21,7 +22,7 @@ public class RedisPubSubConfig {
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
@ -38,8 +39,4 @@ public class RedisPubSubConfig {
return new MessageListenerAdapter(receiver, "onMessage");
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}

18
ruoyi-framework/src/main/java/com/zc/framework/config/RedisStreamConfig.java

@ -1,5 +1,6 @@
package com.zc.framework.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -32,8 +33,21 @@ public class RedisStreamConfig {
* 创建Stream消息监听容器
* @return
*/
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisConnectionFactory factory, StreamMessageListenerContainer
@Bean(name = "streamMessageListenerContainer")
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(@Qualifier("redisConnectionFactory") RedisConnectionFactory factory, StreamMessageListenerContainer
.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options) {
// 创建Stream消息监听容器
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 创建Stream消息监听容器iot
*
* @return
*/
@Bean(name = "iotStreamMessageListenerContainer")
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> iotStreamMessageListenerContainer(
@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory factory, StreamMessageListenerContainer
.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options) {
// 创建Stream消息监听容器
return StreamMessageListenerContainer.create(factory, options);

71
zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java

@ -0,0 +1,71 @@
package com.zc.business.constant;
/**
* redis Stream 消息队列消费者定义
*/
public class RedisStreamConstants {
/**
* 设备上线 队列消费者定义
*/
public static class DeviceOnline {
public final static String KEY = "device:online";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 设备离线 队列消费者定义
*/
public static class DeviceOffline {
public final static String KEY = "device:offline";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 设备事件 队列消费者定义
*/
public static class DeviceEvent {
public final static String KEY = "device:event";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 设备属性上报 队列消费者定义
*/
public static class DevicePropertyReport {
public final static String KEY = "device:property:report";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 读取属性回复 队列消费者定义
*/
public static class DevicePropertyReadReply {
public final static String KEY = "device:property:read:reply";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 写属性回复 队列消费者定义
*/
public static class DevicePropertyWriteReply {
public final static String KEY = "device:property:write:reply";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 调用功能回复 队列消费者定义
*/
public static class DeviceFunctionReply {
public final static String KEY = "device:function:reply";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
}

31
zc-business/src/main/java/com/zc/business/message/device/DeviceEventListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 设备消息监听
*/
@Component
public class DeviceEventListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

31
zc-business/src/main/java/com/zc/business/message/device/DeviceFunctionReplyListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 调用功能回复消息监听
*/
@Component
public class DeviceFunctionReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

31
zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReadReplyListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 读取属性回复消息监听
*/
@Component
public class DevicePropertyReadReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

31
zc-business/src/main/java/com/zc/business/message/device/DevicePropertyReportListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 设备属性上报消息监听
*/
@Component
public class DevicePropertyReportListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

31
zc-business/src/main/java/com/zc/business/message/device/DevicePropertyWriteReplyListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 写属性回复消息监听
*/
@Component
public class DevicePropertyWriteReplyListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

100
zc-business/src/main/java/com/zc/business/message/device/MessageSubscription.java

@ -0,0 +1,100 @@
package com.zc.business.message.device;
import com.zc.business.constant.RedisStreamConstants;
import com.zc.common.core.redis.stream.RedisStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 初始化器
* 设备消息订阅
* 1.在线消息
* 2.离线消息
* @author xiepufeng
*/
@Component
public class MessageSubscription implements CommandLineRunner {
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Resource
private OnlineMessageListener onlineMessageListener;
@Resource
private OfflineMessageListener offlineMessageListener;
@Resource
private DeviceEventListener deviceEventListener;
@Resource
private DevicePropertyReportListener devicePropertyReportListener;
@Resource
private DevicePropertyReadReplyListener devicePropertyReadReplyListener;
@Resource
private DevicePropertyWriteReplyListener devicePropertyWriteReplyListener;
@Resource
private DeviceFunctionReplyListener deviceFunctionReplyListener;
@Override
public void run(String... args) {
// 设备在线消息订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceOnline.KEY,
RedisStreamConstants.DeviceOnline.GROUP,
RedisStreamConstants.DeviceOnline.CONSUMER,
onlineMessageListener);
// 离线消息订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceOffline.KEY,
RedisStreamConstants.DeviceOffline.GROUP,
RedisStreamConstants.DeviceOffline.CONSUMER,
offlineMessageListener);
// 设备事件订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceEvent.KEY,
RedisStreamConstants.DeviceEvent.GROUP,
RedisStreamConstants.DeviceEvent.CONSUMER,
deviceEventListener);
// 设备属性上报消息订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DevicePropertyReport.KEY,
RedisStreamConstants.DevicePropertyReport.GROUP,
RedisStreamConstants.DevicePropertyReport.CONSUMER,
devicePropertyReportListener);
// 读取属性回复消息订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DevicePropertyReadReply.KEY,
RedisStreamConstants.DevicePropertyReadReply.GROUP,
RedisStreamConstants.DevicePropertyReadReply.CONSUMER,
devicePropertyReadReplyListener);
// 写属性回复消息订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DevicePropertyWriteReply.KEY,
RedisStreamConstants.DevicePropertyWriteReply.GROUP,
RedisStreamConstants.DevicePropertyWriteReply.CONSUMER,
devicePropertyWriteReplyListener);
// 调用功能回复消息订阅
redisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceFunctionReply.KEY,
RedisStreamConstants.DeviceFunctionReply.GROUP,
RedisStreamConstants.DeviceFunctionReply.CONSUMER,
deviceFunctionReplyListener);
}
}

31
zc-business/src/main/java/com/zc/business/message/device/OfflineMessageListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 离线消息监听
*/
@Component
public class OfflineMessageListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

31
zc-business/src/main/java/com/zc/business/message/device/OnlineMessageListener.java

@ -0,0 +1,31 @@
package com.zc.business.message.device;
import com.zc.common.core.redis.stream.RedisStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 在线消息监听
*/
@Component
public class OnlineMessageListener implements StreamListener<String, ObjectRecord<String, String>>
{
private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class);
@Autowired
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

2
zc-websocket/src/main/java/com/zc/websocket/handler/WebSocketService.java

@ -31,7 +31,7 @@ public class WebSocketService
private static final Logger log = Logger.getLogger(WebSocketService.class.getName());
private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class);
private static final RedisStream redisStream = SpringUtils.getBean("redisStream");
/**
* 给指定客户端发送消息

3
zc-websocket/src/main/java/com/zc/websocket/handler/event/EventCmdHandler.java

@ -19,7 +19,6 @@ import com.zc.websocket.dto.result.JsonMsg;
import com.zc.websocket.enums.ChannelStatus;
import com.zc.websocket.enums.ErrorCodeEnum;
import com.zc.websocket.handler.CmdHandler;
import org.apache.poi.ss.formula.functions.T;
import java.lang.reflect.Type;
import java.util.logging.Logger;
@ -33,7 +32,7 @@ public class EventCmdHandler implements CmdHandler
private static final Logger log = Logger.getLogger(EventCmdHandler.class.getName());
private final RedisStream redisStream = SpringUtils.getBean(RedisStream.class);
private final RedisStream redisStream = SpringUtils.getBean("redisStream");
/*
* 消息通道处理器

Loading…
Cancel
Save