From 443d5e4be765a4d46f68c2157369eecac4253f2d Mon Sep 17 00:00:00 2001 From: xiepufeng <1072271977@qq.com> Date: Tue, 23 Apr 2024 17:46:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=97=A8=E6=9E=B6=E4=BA=A4?= =?UTF-8?q?=E6=98=93=E6=B5=81=E6=B0=B4=E3=80=81=E9=97=A8=E6=9E=B6=E7=89=8C?= =?UTF-8?q?=E8=AF=86=E3=80=81=E6=94=B6=E8=B4=B9=E5=85=A5=E5=8F=A3=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E3=80=81=E6=94=B6=E8=B4=B9=E5=87=BA=E5=8F=A3=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E7=9A=84redis=E6=95=B0=E6=8D=AE=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constant/RedisStreamConstants.java | 38 ++++++++++++ .../listener/TrafficGantryPlatesListener.java | 29 +++++++++ .../TrafficGantryTransactionsListener.java | 29 +++++++++ .../TrafficTollStationEntranceListener.java | 29 +++++++++ .../TrafficTollStationExitListener.java | 29 +++++++++ .../device/subscribe/MessageSubscription.java | 59 ++++++++++++++++--- 6 files changed, 206 insertions(+), 7 deletions(-) create mode 100644 zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryPlatesListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryTransactionsListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationEntranceListener.java create mode 100644 zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationExitListener.java diff --git a/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java b/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java index a6245b0d..b3d83098 100644 --- a/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java +++ b/zc-business/src/main/java/com/zc/business/constant/RedisStreamConstants.java @@ -68,4 +68,42 @@ public class RedisStreamConstants { public final static String GROUP = "group1"; public final static String CONSUMER = "consumer1"; } + + /** + * 门架交易流水 + */ + public static class TrafficGantryTransactions { + public final static String KEY = "traffic:gantry:transactions"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 门架牌识 + */ + public static class TrafficGantryPlates { + public final static String KEY = "traffic:gantry:plates"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + /** + * 收费入口数据 + */ + public static class TrafficTollStationEntrance { + public final static String KEY = "traffic:tollStation:entrance"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + + + /** + * 收费出口数据 + */ + public static class TrafficTollStationExit { + public final static String KEY = "traffic:tollStation:exit"; + public final static String GROUP = "group1"; + public final static String CONSUMER = "consumer1"; + } + } diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryPlatesListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryPlatesListener.java new file mode 100644 index 00000000..e0988902 --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryPlatesListener.java @@ -0,0 +1,29 @@ +package com.zc.business.message.device.listener; + +import com.zc.common.core.redis.stream.RedisStream; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 门架牌识 + */ +@Component +public class TrafficGantryPlatesListener implements StreamListener> +{ + @Autowired + @Qualifier("iotRedisStream") + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryTransactionsListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryTransactionsListener.java new file mode 100644 index 00000000..8c612c1a --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficGantryTransactionsListener.java @@ -0,0 +1,29 @@ +package com.zc.business.message.device.listener; + +import com.zc.common.core.redis.stream.RedisStream; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 门架交易流水消息监听 + */ +@Component +public class TrafficGantryTransactionsListener implements StreamListener> +{ + @Autowired + @Qualifier("iotRedisStream") + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationEntranceListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationEntranceListener.java new file mode 100644 index 00000000..2b75166b --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationEntranceListener.java @@ -0,0 +1,29 @@ +package com.zc.business.message.device.listener; + +import com.zc.common.core.redis.stream.RedisStream; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 收费入口数据 + */ +@Component +public class TrafficTollStationEntranceListener implements StreamListener> +{ + @Autowired + @Qualifier("iotRedisStream") + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationExitListener.java b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationExitListener.java new file mode 100644 index 00000000..0872055e --- /dev/null +++ b/zc-business/src/main/java/com/zc/business/message/device/listener/TrafficTollStationExitListener.java @@ -0,0 +1,29 @@ +package com.zc.business.message.device.listener; + +import com.zc.common.core.redis.stream.RedisStream; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +/** + * 收费出口数据 + */ +@Component +public class TrafficTollStationExitListener implements StreamListener> +{ + @Autowired + @Qualifier("iotRedisStream") + private RedisStream redisStream; + + @Override + public void onMessage(ObjectRecord message) { + String streamKay = message.getStream(); + RecordId recordId = message.getId(); + + // 消费完后直接删除消息 + redisStream.del(streamKay, String.valueOf(recordId)); + } +} diff --git a/zc-business/src/main/java/com/zc/business/message/device/subscribe/MessageSubscription.java b/zc-business/src/main/java/com/zc/business/message/device/subscribe/MessageSubscription.java index bc124c8f..67017de0 100644 --- a/zc-business/src/main/java/com/zc/business/message/device/subscribe/MessageSubscription.java +++ b/zc-business/src/main/java/com/zc/business/message/device/subscribe/MessageSubscription.java @@ -23,6 +23,10 @@ public class MessageSubscription implements CommandLineRunner { @Autowired @Qualifier("iotRedisStream") + private RedisStream iotRedisStream; + + @Autowired + @Qualifier("redisStream") private RedisStream redisStream; @Resource @@ -45,57 +49,98 @@ public class MessageSubscription implements CommandLineRunner { @Resource private DeviceFunctionReplyListener deviceFunctionReplyListener; + @Resource + private TrafficGantryTransactionsListener trafficGantryTransactionsListener; + + @Resource + private TrafficGantryPlatesListener trafficGantryPlatesListener; + + @Resource + private TrafficTollStationEntranceListener trafficTollStationEntranceListener; + + @Resource + private TrafficTollStationExitListener trafficTollStationExitListener; + @Override public void run(String... args) { // 设备在线消息订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DeviceOnline.KEY, RedisStreamConstants.DeviceOnline.GROUP, RedisStreamConstants.DeviceOnline.CONSUMER, onlineMessageListener); // 离线消息订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DeviceOffline.KEY, RedisStreamConstants.DeviceOffline.GROUP, RedisStreamConstants.DeviceOffline.CONSUMER, offlineMessageListener); // 设备事件订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DeviceEvent.KEY, RedisStreamConstants.DeviceEvent.GROUP, RedisStreamConstants.DeviceEvent.CONSUMER, deviceEventListener); // 设备属性上报消息订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DevicePropertyReport.KEY, RedisStreamConstants.DevicePropertyReport.GROUP, RedisStreamConstants.DevicePropertyReport.CONSUMER, devicePropertyReportListener); // 读取属性回复消息订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DevicePropertyReadReply.KEY, RedisStreamConstants.DevicePropertyReadReply.GROUP, RedisStreamConstants.DevicePropertyReadReply.CONSUMER, devicePropertyReadReplyListener); // 写属性回复消息订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DevicePropertyWriteReply.KEY, RedisStreamConstants.DevicePropertyWriteReply.GROUP, RedisStreamConstants.DevicePropertyWriteReply.CONSUMER, devicePropertyWriteReplyListener); // 调用功能回复消息订阅 - redisStream.subscriptionAutoAck( + iotRedisStream.subscriptionAutoAck( RedisStreamConstants.DeviceFunctionReply.KEY, RedisStreamConstants.DeviceFunctionReply.GROUP, RedisStreamConstants.DeviceFunctionReply.CONSUMER, deviceFunctionReplyListener); + + // 门架交易流水 + redisStream.subscriptionAutoAck( + RedisStreamConstants.TrafficGantryTransactions.KEY, + RedisStreamConstants.TrafficGantryTransactions.GROUP, + RedisStreamConstants.TrafficGantryTransactions.CONSUMER, + trafficGantryTransactionsListener); + + // 门架牌识 + iotRedisStream.subscriptionAutoAck( + RedisStreamConstants.TrafficGantryPlates.KEY, + RedisStreamConstants.TrafficGantryPlates.GROUP, + RedisStreamConstants.TrafficGantryPlates.CONSUMER, + trafficGantryPlatesListener); + + // 收费入口数据 + redisStream.subscriptionAutoAck( + RedisStreamConstants.TrafficTollStationEntrance.KEY, + RedisStreamConstants.TrafficTollStationEntrance.GROUP, + RedisStreamConstants.TrafficTollStationEntrance.CONSUMER, + trafficTollStationEntranceListener); + + // 收费出口数据 + redisStream.subscriptionAutoAck( + RedisStreamConstants.TrafficTollStationExit.KEY, + RedisStreamConstants.TrafficTollStationExit.GROUP, + RedisStreamConstants.TrafficTollStationExit.CONSUMER, + trafficTollStationExitListener); + } }