Compare commits

...

2 Commits

  1. 38
      zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java
  2. 29
      zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryPlatesListener.java
  3. 29
      zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryTransactionsListener.java
  4. 29
      zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationEntranceListener.java
  5. 29
      zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationExitListener.java
  6. 59
      zc-business/src/main/java/com/zc/business/message/device/subscribe/MessageSubscription.java

38
zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java

@ -68,4 +68,42 @@ public class RedisStreamConstants {
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 门架交易流水
*/
public static class TrafficGantryTransactions {
public final static String KEY = "traffic:gantry:transactions";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 门架牌识
*/
public static class TrafficGantryPlates {
public final static String KEY = "traffic:gantry:plates";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 收费入口数据
*/
public static class TrafficTollStationEntrance {
public final static String KEY = "traffic:tollStation:entrance";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
/**
* 收费出口数据
*/
public static class TrafficTollStationExit {
public final static String KEY = "traffic:tollStation:exit";
public final static String GROUP = "group1";
public final static String CONSUMER = "consumer1";
}
}

29
zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryPlatesListener.java

@ -0,0 +1,29 @@
package com.zc.business.message.device.listener;
import com.zc.common.core.redis.stream.RedisStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 门架牌识
*/
@Component
public class TrafficGantryPlatesListener implements StreamListener<String, ObjectRecord<String, String>>
{
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

29
zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryTransactionsListener.java

@ -0,0 +1,29 @@
package com.zc.business.message.device.listener;
import com.zc.common.core.redis.stream.RedisStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 门架交易流水消息监听
*/
@Component
public class TrafficGantryTransactionsListener implements StreamListener<String, ObjectRecord<String, String>>
{
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

29
zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationEntranceListener.java

@ -0,0 +1,29 @@
package com.zc.business.message.device.listener;
import com.zc.common.core.redis.stream.RedisStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 收费入口数据
*/
@Component
public class TrafficTollStationEntranceListener implements StreamListener<String, ObjectRecord<String, String>>
{
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

29
zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationExitListener.java

@ -0,0 +1,29 @@
package com.zc.business.message.device.listener;
import com.zc.common.core.redis.stream.RedisStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* 收费出口数据
*/
@Component
public class TrafficTollStationExitListener implements StreamListener<String, ObjectRecord<String, String>>
{
@Autowired
@Qualifier("iotRedisStream")
private RedisStream redisStream;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String streamKay = message.getStream();
RecordId recordId = message.getId();
// 消费完后直接删除消息
redisStream.del(streamKay, String.valueOf(recordId));
}
}

59
zc-business/src/main/java/com/zc/business/message/device/subscribe/MessageSubscription.java

@ -23,6 +23,10 @@ public class MessageSubscription implements CommandLineRunner {
@Autowired
@Qualifier("iotRedisStream")
private RedisStream iotRedisStream;
@Autowired
@Qualifier("redisStream")
private RedisStream redisStream;
@Resource
@ -45,57 +49,98 @@ public class MessageSubscription implements CommandLineRunner {
@Resource
private DeviceFunctionReplyListener deviceFunctionReplyListener;
@Resource
private TrafficGantryTransactionsListener trafficGantryTransactionsListener;
@Resource
private TrafficGantryPlatesListener trafficGantryPlatesListener;
@Resource
private TrafficTollStationEntranceListener trafficTollStationEntranceListener;
@Resource
private TrafficTollStationExitListener trafficTollStationExitListener;
@Override
public void run(String... args) {
// 设备在线消息订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceOnline.KEY,
RedisStreamConstants.DeviceOnline.GROUP,
RedisStreamConstants.DeviceOnline.CONSUMER,
onlineMessageListener);
// 离线消息订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceOffline.KEY,
RedisStreamConstants.DeviceOffline.GROUP,
RedisStreamConstants.DeviceOffline.CONSUMER,
offlineMessageListener);
// 设备事件订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceEvent.KEY,
RedisStreamConstants.DeviceEvent.GROUP,
RedisStreamConstants.DeviceEvent.CONSUMER,
deviceEventListener);
// 设备属性上报消息订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DevicePropertyReport.KEY,
RedisStreamConstants.DevicePropertyReport.GROUP,
RedisStreamConstants.DevicePropertyReport.CONSUMER,
devicePropertyReportListener);
// 读取属性回复消息订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DevicePropertyReadReply.KEY,
RedisStreamConstants.DevicePropertyReadReply.GROUP,
RedisStreamConstants.DevicePropertyReadReply.CONSUMER,
devicePropertyReadReplyListener);
// 写属性回复消息订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DevicePropertyWriteReply.KEY,
RedisStreamConstants.DevicePropertyWriteReply.GROUP,
RedisStreamConstants.DevicePropertyWriteReply.CONSUMER,
devicePropertyWriteReplyListener);
// 调用功能回复消息订阅
redisStream.subscriptionAutoAck(
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.DeviceFunctionReply.KEY,
RedisStreamConstants.DeviceFunctionReply.GROUP,
RedisStreamConstants.DeviceFunctionReply.CONSUMER,
deviceFunctionReplyListener);
// 门架交易流水
redisStream.subscriptionAutoAck(
RedisStreamConstants.TrafficGantryTransactions.KEY,
RedisStreamConstants.TrafficGantryTransactions.GROUP,
RedisStreamConstants.TrafficGantryTransactions.CONSUMER,
trafficGantryTransactionsListener);
// 门架牌识
iotRedisStream.subscriptionAutoAck(
RedisStreamConstants.TrafficGantryPlates.KEY,
RedisStreamConstants.TrafficGantryPlates.GROUP,
RedisStreamConstants.TrafficGantryPlates.CONSUMER,
trafficGantryPlatesListener);
// 收费入口数据
redisStream.subscriptionAutoAck(
RedisStreamConstants.TrafficTollStationEntrance.KEY,
RedisStreamConstants.TrafficTollStationEntrance.GROUP,
RedisStreamConstants.TrafficTollStationEntrance.CONSUMER,
trafficTollStationEntranceListener);
// 收费出口数据
redisStream.subscriptionAutoAck(
RedisStreamConstants.TrafficTollStationExit.KEY,
RedisStreamConstants.TrafficTollStationExit.GROUP,
RedisStreamConstants.TrafficTollStationExit.CONSUMER,
trafficTollStationExitListener);
}
}

Loading…
Cancel
Save