xiepufeng
11 months ago
22 changed files with 642 additions and 26 deletions
@ -0,0 +1,26 @@ |
|||
package com.zc.common.core.redis.stream; |
|||
|
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.data.redis.stream.StreamMessageListenerContainer; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Component |
|||
public class RedisStreamBeans { |
|||
|
|||
@Bean(name = "redisStream") |
|||
public RedisStream redisStream( |
|||
@Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate, |
|||
@Qualifier("streamMessageListenerContainer") StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) { |
|||
return new RedisStream(redisTemplate, listenerContainer); |
|||
} |
|||
|
|||
@Bean(name = "iotRedisStream") |
|||
public RedisStream iotRedisStream( |
|||
@Qualifier("iotStringRedisTemplate") StringRedisTemplate redisTemplate, |
|||
@Qualifier("iotStreamMessageListenerContainer") StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) { |
|||
return new RedisStream(redisTemplate, listenerContainer); |
|||
} |
|||
} |
@ -0,0 +1,98 @@ |
|||
package com.zc.framework.config; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonAutoDetect; |
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
|||
import com.fasterxml.jackson.annotation.PropertyAccessor; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; |
|||
import com.ruoyi.framework.config.FastJson2JsonRedisSerializer; |
|||
import io.lettuce.core.ClientOptions; |
|||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.cache.annotation.EnableCaching; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration; |
|||
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; |
|||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; |
|||
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.data.redis.serializer.StringRedisSerializer; |
|||
|
|||
/** |
|||
* redis配置 |
|||
* |
|||
*/ |
|||
@Configuration |
|||
@EnableCaching |
|||
public class IotRedisConfig |
|||
{ |
|||
private final IotRedisConfigProperties redisConfigProperties; |
|||
|
|||
public IotRedisConfig(IotRedisConfigProperties redisConfigProperties) { |
|||
this.redisConfigProperties = redisConfigProperties; |
|||
} |
|||
|
|||
@Bean(name = "iotRedisConnectionFactory") |
|||
public RedisConnectionFactory iotRedisConnectionFactory() { |
|||
|
|||
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); |
|||
|
|||
standaloneConfig.setHostName(redisConfigProperties.getHost()); |
|||
standaloneConfig.setPort(redisConfigProperties.getPort()); |
|||
standaloneConfig.setDatabase(redisConfigProperties.getDatabase()); |
|||
standaloneConfig.setPassword(redisConfigProperties.getPassword()); |
|||
|
|||
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); |
|||
genericObjectPoolConfig.setMaxIdle(redisConfigProperties.getLettuce().getPool().getMaxIdle()); |
|||
genericObjectPoolConfig.setMinIdle(redisConfigProperties.getLettuce().getPool().getMinIdle()); |
|||
genericObjectPoolConfig.setMaxTotal(redisConfigProperties.getLettuce().getPool().getMaxActive()); |
|||
genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperties.getLettuce().getPool().getMaxWait().toMillis()); |
|||
|
|||
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() |
|||
.poolConfig(genericObjectPoolConfig) |
|||
.commandTimeout(redisConfigProperties.getTimeout()) |
|||
.clientOptions(ClientOptions.builder() |
|||
.autoReconnect(true) |
|||
.build()) |
|||
.build(); |
|||
|
|||
return new LettuceConnectionFactory(standaloneConfig, clientConfig); |
|||
} |
|||
|
|||
// 创建对应于从Redis数据源的RedisTemplate
|
|||
@Bean(name = "iotRedisTemplate") |
|||
public RedisTemplate<Object, Object> iotRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { |
|||
RedisTemplate<Object, Object> template = new RedisTemplate<>(); |
|||
template.setConnectionFactory(connectionFactory); |
|||
|
|||
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); |
|||
|
|||
ObjectMapper mapper = new ObjectMapper(); |
|||
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); |
|||
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); |
|||
serializer.setObjectMapper(mapper); |
|||
|
|||
// 使用StringRedisSerializer来序列化和反序列化redis的key值
|
|||
template.setKeySerializer(new StringRedisSerializer()); |
|||
template.setValueSerializer(serializer); |
|||
|
|||
// Hash的key也采用StringRedisSerializer的序列化方式
|
|||
template.setHashKeySerializer(new StringRedisSerializer()); |
|||
template.setHashValueSerializer(serializer); |
|||
|
|||
template.afterPropertiesSet(); |
|||
return template; |
|||
} |
|||
|
|||
@Bean(name = "iotStringRedisTemplate") |
|||
public StringRedisTemplate stringRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { |
|||
StringRedisTemplate template = new StringRedisTemplate(); |
|||
template.setConnectionFactory(connectionFactory); |
|||
// 可以根据需要进行更多的配置,例如设置序列化器、初始化参数等
|
|||
return template; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,20 @@ |
|||
package com.zc.framework.config; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.autoconfigure.data.redis.RedisProperties; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.time.Duration; |
|||
|
|||
@Configuration |
|||
@ConfigurationProperties(prefix = "iot.redis") |
|||
@Data |
|||
public class IotRedisConfigProperties { |
|||
private String host; |
|||
private int port; |
|||
private int database; |
|||
private String password; |
|||
private Duration timeout; |
|||
private RedisProperties.Lettuce lettuce; |
|||
} |
@ -0,0 +1,71 @@ |
|||
package com.zc.business.constant; |
|||
|
|||
/** |
|||
* redis Stream 消息队列、组、消费者定义 |
|||
*/ |
|||
public class RedisStreamConstants { |
|||
|
|||
/** |
|||
* 设备上线 队列、组、消费者定义 |
|||
*/ |
|||
public static class DeviceOnline { |
|||
public final static String KEY = "device:online"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
|
|||
/** |
|||
* 设备离线 队列、组、消费者定义 |
|||
*/ |
|||
public static class DeviceOffline { |
|||
public final static String KEY = "device:offline"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
|
|||
/** |
|||
* 设备事件 队列、组、消费者定义 |
|||
*/ |
|||
public static class DeviceEvent { |
|||
public final static String KEY = "device:event"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
|
|||
/** |
|||
* 设备属性上报 队列、组、消费者定义 |
|||
*/ |
|||
public static class DevicePropertyReport { |
|||
public final static String KEY = "device:property:report"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
|
|||
/** |
|||
* 读取属性回复 队列、组、消费者定义 |
|||
*/ |
|||
public static class DevicePropertyReadReply { |
|||
public final static String KEY = "device:property:read:reply"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 写属性回复 队列、组、消费者定义 |
|||
*/ |
|||
public static class DevicePropertyWriteReply { |
|||
public final static String KEY = "device:property:write:reply"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
|
|||
/** |
|||
* 调用功能回复 队列、组、消费者定义 |
|||
*/ |
|||
public static class DeviceFunctionReply { |
|||
public final static String KEY = "device:function:reply"; |
|||
public final static String GROUP = "group1"; |
|||
public final static String CONSUMER = "consumer1"; |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 设备消息监听 |
|||
*/ |
|||
@Component |
|||
public class DeviceEventListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 调用功能回复消息监听 |
|||
*/ |
|||
@Component |
|||
public class DeviceFunctionReplyListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 读取属性回复消息监听 |
|||
*/ |
|||
@Component |
|||
public class DevicePropertyReadReplyListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 设备属性上报消息监听 |
|||
*/ |
|||
@Component |
|||
public class DevicePropertyReportListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 写属性回复消息监听 |
|||
*/ |
|||
@Component |
|||
public class DevicePropertyWriteReplyListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
@ -0,0 +1,100 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.business.constant.RedisStreamConstants; |
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.boot.CommandLineRunner; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* 初始化器 |
|||
* 设备消息订阅 |
|||
* 1.在线消息 |
|||
* 2.离线消息 |
|||
* @author xiepufeng |
|||
*/ |
|||
@Component |
|||
public class MessageSubscription implements CommandLineRunner { |
|||
|
|||
|
|||
@Autowired |
|||
@Qualifier("iotRedisStream") |
|||
private RedisStream redisStream; |
|||
|
|||
@Resource |
|||
private OnlineMessageListener onlineMessageListener; |
|||
@Resource |
|||
private OfflineMessageListener offlineMessageListener; |
|||
|
|||
@Resource |
|||
private DeviceEventListener deviceEventListener; |
|||
|
|||
@Resource |
|||
private DevicePropertyReportListener devicePropertyReportListener; |
|||
|
|||
@Resource |
|||
private DevicePropertyReadReplyListener devicePropertyReadReplyListener; |
|||
|
|||
@Resource |
|||
private DevicePropertyWriteReplyListener devicePropertyWriteReplyListener; |
|||
|
|||
@Resource |
|||
private DeviceFunctionReplyListener deviceFunctionReplyListener; |
|||
|
|||
@Override |
|||
public void run(String... args) { |
|||
|
|||
// 设备在线消息订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DeviceOnline.KEY, |
|||
RedisStreamConstants.DeviceOnline.GROUP, |
|||
RedisStreamConstants.DeviceOnline.CONSUMER, |
|||
onlineMessageListener); |
|||
|
|||
// 离线消息订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DeviceOffline.KEY, |
|||
RedisStreamConstants.DeviceOffline.GROUP, |
|||
RedisStreamConstants.DeviceOffline.CONSUMER, |
|||
offlineMessageListener); |
|||
|
|||
// 设备事件订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DeviceEvent.KEY, |
|||
RedisStreamConstants.DeviceEvent.GROUP, |
|||
RedisStreamConstants.DeviceEvent.CONSUMER, |
|||
deviceEventListener); |
|||
|
|||
// 设备属性上报消息订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DevicePropertyReport.KEY, |
|||
RedisStreamConstants.DevicePropertyReport.GROUP, |
|||
RedisStreamConstants.DevicePropertyReport.CONSUMER, |
|||
devicePropertyReportListener); |
|||
|
|||
// 读取属性回复消息订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DevicePropertyReadReply.KEY, |
|||
RedisStreamConstants.DevicePropertyReadReply.GROUP, |
|||
RedisStreamConstants.DevicePropertyReadReply.CONSUMER, |
|||
devicePropertyReadReplyListener); |
|||
|
|||
// 写属性回复消息订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DevicePropertyWriteReply.KEY, |
|||
RedisStreamConstants.DevicePropertyWriteReply.GROUP, |
|||
RedisStreamConstants.DevicePropertyWriteReply.CONSUMER, |
|||
devicePropertyWriteReplyListener); |
|||
|
|||
// 调用功能回复消息订阅
|
|||
redisStream.subscriptionAutoAck( |
|||
RedisStreamConstants.DeviceFunctionReply.KEY, |
|||
RedisStreamConstants.DeviceFunctionReply.GROUP, |
|||
RedisStreamConstants.DeviceFunctionReply.CONSUMER, |
|||
deviceFunctionReplyListener); |
|||
|
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 离线消息监听 |
|||
*/ |
|||
@Component |
|||
public class OfflineMessageListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.zc.business.message.device; |
|||
|
|||
import com.zc.common.core.redis.stream.RedisStream; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|||
import org.springframework.data.redis.connection.stream.RecordId; |
|||
import org.springframework.data.redis.stream.StreamListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 在线消息监听 |
|||
*/ |
|||
@Component |
|||
public class OnlineMessageListener implements StreamListener<String, ObjectRecord<String, String>> |
|||
{ |
|||
private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class); |
|||
|
|||
@Autowired |
|||
private RedisStream redisStream; |
|||
|
|||
@Override |
|||
public void onMessage(ObjectRecord<String, String> message) { |
|||
String streamKay = message.getStream(); |
|||
RecordId recordId = message.getId(); |
|||
|
|||
// 消费完后直接删除消息
|
|||
redisStream.del(streamKay, String.valueOf(recordId)); |
|||
} |
|||
} |
Loading…
Reference in new issue