|
|
@ -1,5 +1,8 @@ |
|
|
|
package com.zc.business.message.device; |
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.zc.business.domain.DcDevice; |
|
|
|
import com.zc.business.service.IDcDeviceService; |
|
|
|
import com.zc.common.core.redis.stream.RedisStream; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
@ -7,8 +10,13 @@ import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|
|
|
import org.springframework.data.redis.connection.stream.RecordId; |
|
|
|
import org.springframework.data.redis.stream.StreamListener; |
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import javax.annotation.Resource; |
|
|
|
import java.util.List; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
/** |
|
|
|
* 在线消息监听 |
|
|
|
*/ |
|
|
@ -20,12 +28,40 @@ public class OnlineMessageListener implements StreamListener<String, ObjectRecor |
|
|
|
@Autowired |
|
|
|
private RedisStream redisStream; |
|
|
|
|
|
|
|
@Resource |
|
|
|
private IDcDeviceService dcDeviceService; |
|
|
|
|
|
|
|
@Resource |
|
|
|
private ThreadPoolTaskExecutor threadPoolTaskExecutor; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onMessage(ObjectRecord<String, String> message) { |
|
|
|
String streamKay = message.getStream(); |
|
|
|
RecordId recordId = message.getId(); |
|
|
|
|
|
|
|
threadPoolTaskExecutor.execute(() -> { |
|
|
|
List<String> list = JSON.parseArray(message.getValue(), String.class); |
|
|
|
this.handle(list); |
|
|
|
}); |
|
|
|
|
|
|
|
// 消费完后直接删除消息
|
|
|
|
redisStream.del(streamKay, String.valueOf(recordId)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 处理在线消息 |
|
|
|
* @param msg 在线消息 |
|
|
|
*/ |
|
|
|
private void handle(List<String> msg) { |
|
|
|
List<DcDevice> dcDevices = msg.stream().map(iotDeviceId -> { |
|
|
|
DcDevice dcDevice = new DcDevice(); |
|
|
|
dcDevice.setDeviceState(DcDevice.ONLINE); |
|
|
|
dcDevice.setIotDeviceId(iotDeviceId); |
|
|
|
return dcDevice; |
|
|
|
}).collect(Collectors.toList()); |
|
|
|
|
|
|
|
// 批量更新设备状态
|
|
|
|
dcDeviceService.batchUpdate(dcDevices); |
|
|
|
} |
|
|
|
} |
|
|
|