mengff
11 months ago
23 changed files with 644 additions and 28 deletions
@ -0,0 +1,26 @@ |
|||||
|
package com.zc.common.core.redis.stream; |
||||
|
|
||||
|
import org.springframework.beans.factory.annotation.Qualifier; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.core.StringRedisTemplate; |
||||
|
import org.springframework.data.redis.stream.StreamMessageListenerContainer; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
@Component |
||||
|
public class RedisStreamBeans { |
||||
|
|
||||
|
@Bean(name = "redisStream") |
||||
|
public RedisStream redisStream( |
||||
|
@Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate, |
||||
|
@Qualifier("streamMessageListenerContainer") StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) { |
||||
|
return new RedisStream(redisTemplate, listenerContainer); |
||||
|
} |
||||
|
|
||||
|
@Bean(name = "iotRedisStream") |
||||
|
public RedisStream iotRedisStream( |
||||
|
@Qualifier("iotStringRedisTemplate") StringRedisTemplate redisTemplate, |
||||
|
@Qualifier("iotStreamMessageListenerContainer") StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer) { |
||||
|
return new RedisStream(redisTemplate, listenerContainer); |
||||
|
} |
||||
|
} |
@ -0,0 +1,98 @@ |
|||||
|
package com.zc.framework.config; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonAutoDetect; |
||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
||||
|
import com.fasterxml.jackson.annotation.PropertyAccessor; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; |
||||
|
import com.ruoyi.framework.config.FastJson2JsonRedisSerializer; |
||||
|
import io.lettuce.core.ClientOptions; |
||||
|
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; |
||||
|
import org.springframework.beans.factory.annotation.Qualifier; |
||||
|
import org.springframework.cache.annotation.EnableCaching; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.data.redis.connection.RedisConnectionFactory; |
||||
|
import org.springframework.data.redis.connection.RedisStandaloneConfiguration; |
||||
|
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; |
||||
|
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; |
||||
|
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.data.redis.core.StringRedisTemplate; |
||||
|
import org.springframework.data.redis.serializer.StringRedisSerializer; |
||||
|
|
||||
|
/** |
||||
|
* redis配置 |
||||
|
* |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@EnableCaching |
||||
|
public class IotRedisConfig |
||||
|
{ |
||||
|
private final IotRedisConfigProperties redisConfigProperties; |
||||
|
|
||||
|
public IotRedisConfig(IotRedisConfigProperties redisConfigProperties) { |
||||
|
this.redisConfigProperties = redisConfigProperties; |
||||
|
} |
||||
|
|
||||
|
@Bean(name = "iotRedisConnectionFactory") |
||||
|
public RedisConnectionFactory iotRedisConnectionFactory() { |
||||
|
|
||||
|
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); |
||||
|
|
||||
|
standaloneConfig.setHostName(redisConfigProperties.getHost()); |
||||
|
standaloneConfig.setPort(redisConfigProperties.getPort()); |
||||
|
standaloneConfig.setDatabase(redisConfigProperties.getDatabase()); |
||||
|
standaloneConfig.setPassword(redisConfigProperties.getPassword()); |
||||
|
|
||||
|
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); |
||||
|
genericObjectPoolConfig.setMaxIdle(redisConfigProperties.getLettuce().getPool().getMaxIdle()); |
||||
|
genericObjectPoolConfig.setMinIdle(redisConfigProperties.getLettuce().getPool().getMinIdle()); |
||||
|
genericObjectPoolConfig.setMaxTotal(redisConfigProperties.getLettuce().getPool().getMaxActive()); |
||||
|
genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperties.getLettuce().getPool().getMaxWait().toMillis()); |
||||
|
|
||||
|
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() |
||||
|
.poolConfig(genericObjectPoolConfig) |
||||
|
.commandTimeout(redisConfigProperties.getTimeout()) |
||||
|
.clientOptions(ClientOptions.builder() |
||||
|
.autoReconnect(true) |
||||
|
.build()) |
||||
|
.build(); |
||||
|
|
||||
|
return new LettuceConnectionFactory(standaloneConfig, clientConfig); |
||||
|
} |
||||
|
|
||||
|
// 创建对应于从Redis数据源的RedisTemplate
|
||||
|
@Bean(name = "iotRedisTemplate") |
||||
|
public RedisTemplate<Object, Object> iotRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { |
||||
|
RedisTemplate<Object, Object> template = new RedisTemplate<>(); |
||||
|
template.setConnectionFactory(connectionFactory); |
||||
|
|
||||
|
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); |
||||
|
|
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); |
||||
|
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); |
||||
|
serializer.setObjectMapper(mapper); |
||||
|
|
||||
|
// 使用StringRedisSerializer来序列化和反序列化redis的key值
|
||||
|
template.setKeySerializer(new StringRedisSerializer()); |
||||
|
template.setValueSerializer(serializer); |
||||
|
|
||||
|
// Hash的key也采用StringRedisSerializer的序列化方式
|
||||
|
template.setHashKeySerializer(new StringRedisSerializer()); |
||||
|
template.setHashValueSerializer(serializer); |
||||
|
|
||||
|
template.afterPropertiesSet(); |
||||
|
return template; |
||||
|
} |
||||
|
|
||||
|
@Bean(name = "iotStringRedisTemplate") |
||||
|
public StringRedisTemplate stringRedisTemplate(@Qualifier("iotRedisConnectionFactory") RedisConnectionFactory connectionFactory) { |
||||
|
StringRedisTemplate template = new StringRedisTemplate(); |
||||
|
template.setConnectionFactory(connectionFactory); |
||||
|
// 可以根据需要进行更多的配置,例如设置序列化器、初始化参数等
|
||||
|
return template; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,20 @@ |
|||||
|
package com.zc.framework.config; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.boot.autoconfigure.data.redis.RedisProperties; |
||||
|
import org.springframework.boot.context.properties.ConfigurationProperties; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
import java.time.Duration; |
||||
|
|
||||
|
@Configuration |
||||
|
@ConfigurationProperties(prefix = "iot.redis") |
||||
|
@Data |
||||
|
public class IotRedisConfigProperties { |
||||
|
private String host; |
||||
|
private int port; |
||||
|
private int database; |
||||
|
private String password; |
||||
|
private Duration timeout; |
||||
|
private RedisProperties.Lettuce lettuce; |
||||
|
} |
@ -0,0 +1,71 @@ |
|||||
|
package com.zc.business.constant; |
||||
|
|
||||
|
/** |
||||
|
* redis Stream 消息队列、组、消费者定义 |
||||
|
*/ |
||||
|
public class RedisStreamConstants { |
||||
|
|
||||
|
/** |
||||
|
* 设备上线 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DeviceOnline { |
||||
|
public final static String KEY = "device:online"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 设备离线 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DeviceOffline { |
||||
|
public final static String KEY = "device:offline"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 设备事件 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DeviceEvent { |
||||
|
public final static String KEY = "device:event"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 设备属性上报 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DevicePropertyReport { |
||||
|
public final static String KEY = "device:property:report"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 读取属性回复 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DevicePropertyReadReply { |
||||
|
public final static String KEY = "device:property:read:reply"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 写属性回复 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DevicePropertyWriteReply { |
||||
|
public final static String KEY = "device:property:write:reply"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 调用功能回复 队列、组、消费者定义 |
||||
|
*/ |
||||
|
public static class DeviceFunctionReply { |
||||
|
public final static String KEY = "device:function:reply"; |
||||
|
public final static String GROUP = "group1"; |
||||
|
public final static String CONSUMER = "consumer1"; |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 设备消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class DeviceEventListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(DeviceEventListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 调用功能回复消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class DeviceFunctionReplyListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(DeviceFunctionReplyListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 读取属性回复消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class DevicePropertyReadReplyListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReadReplyListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 设备属性上报消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class DevicePropertyReportListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(DevicePropertyReportListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 写属性回复消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class DevicePropertyWriteReplyListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(DevicePropertyWriteReplyListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
@ -0,0 +1,100 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.business.constant.RedisStreamConstants; |
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.beans.factory.annotation.Qualifier; |
||||
|
import org.springframework.boot.CommandLineRunner; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import javax.annotation.Resource; |
||||
|
|
||||
|
/** |
||||
|
* 初始化器 |
||||
|
* 设备消息订阅 |
||||
|
* 1.在线消息 |
||||
|
* 2.离线消息 |
||||
|
* @author xiepufeng |
||||
|
*/ |
||||
|
@Component |
||||
|
public class MessageSubscription implements CommandLineRunner { |
||||
|
|
||||
|
|
||||
|
@Autowired |
||||
|
@Qualifier("iotRedisStream") |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Resource |
||||
|
private OnlineMessageListener onlineMessageListener; |
||||
|
@Resource |
||||
|
private OfflineMessageListener offlineMessageListener; |
||||
|
|
||||
|
@Resource |
||||
|
private DeviceEventListener deviceEventListener; |
||||
|
|
||||
|
@Resource |
||||
|
private DevicePropertyReportListener devicePropertyReportListener; |
||||
|
|
||||
|
@Resource |
||||
|
private DevicePropertyReadReplyListener devicePropertyReadReplyListener; |
||||
|
|
||||
|
@Resource |
||||
|
private DevicePropertyWriteReplyListener devicePropertyWriteReplyListener; |
||||
|
|
||||
|
@Resource |
||||
|
private DeviceFunctionReplyListener deviceFunctionReplyListener; |
||||
|
|
||||
|
@Override |
||||
|
public void run(String... args) { |
||||
|
|
||||
|
// 设备在线消息订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DeviceOnline.KEY, |
||||
|
RedisStreamConstants.DeviceOnline.GROUP, |
||||
|
RedisStreamConstants.DeviceOnline.CONSUMER, |
||||
|
onlineMessageListener); |
||||
|
|
||||
|
// 离线消息订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DeviceOffline.KEY, |
||||
|
RedisStreamConstants.DeviceOffline.GROUP, |
||||
|
RedisStreamConstants.DeviceOffline.CONSUMER, |
||||
|
offlineMessageListener); |
||||
|
|
||||
|
// 设备事件订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DeviceEvent.KEY, |
||||
|
RedisStreamConstants.DeviceEvent.GROUP, |
||||
|
RedisStreamConstants.DeviceEvent.CONSUMER, |
||||
|
deviceEventListener); |
||||
|
|
||||
|
// 设备属性上报消息订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DevicePropertyReport.KEY, |
||||
|
RedisStreamConstants.DevicePropertyReport.GROUP, |
||||
|
RedisStreamConstants.DevicePropertyReport.CONSUMER, |
||||
|
devicePropertyReportListener); |
||||
|
|
||||
|
// 读取属性回复消息订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DevicePropertyReadReply.KEY, |
||||
|
RedisStreamConstants.DevicePropertyReadReply.GROUP, |
||||
|
RedisStreamConstants.DevicePropertyReadReply.CONSUMER, |
||||
|
devicePropertyReadReplyListener); |
||||
|
|
||||
|
// 写属性回复消息订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DevicePropertyWriteReply.KEY, |
||||
|
RedisStreamConstants.DevicePropertyWriteReply.GROUP, |
||||
|
RedisStreamConstants.DevicePropertyWriteReply.CONSUMER, |
||||
|
devicePropertyWriteReplyListener); |
||||
|
|
||||
|
// 调用功能回复消息订阅
|
||||
|
redisStream.subscriptionAutoAck( |
||||
|
RedisStreamConstants.DeviceFunctionReply.KEY, |
||||
|
RedisStreamConstants.DeviceFunctionReply.GROUP, |
||||
|
RedisStreamConstants.DeviceFunctionReply.CONSUMER, |
||||
|
deviceFunctionReplyListener); |
||||
|
|
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 离线消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class OfflineMessageListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(OfflineMessageListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package com.zc.business.message.device; |
||||
|
|
||||
|
import com.zc.common.core.redis.stream.RedisStream; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
||||
|
import org.springframework.data.redis.connection.stream.RecordId; |
||||
|
import org.springframework.data.redis.stream.StreamListener; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 在线消息监听 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class OnlineMessageListener implements StreamListener<String, ObjectRecord<String, String>> |
||||
|
{ |
||||
|
private static final Logger log = LoggerFactory.getLogger(OnlineMessageListener.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisStream redisStream; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(ObjectRecord<String, String> message) { |
||||
|
String streamKay = message.getStream(); |
||||
|
RecordId recordId = message.getId(); |
||||
|
|
||||
|
// 消费完后直接删除消息
|
||||
|
redisStream.del(streamKay, String.valueOf(recordId)); |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue