|
|
@ -1,7 +1,8 @@ |
|
|
|
package com.zc.business.message.device; |
|
|
|
package com.zc.business.message.device.listener; |
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.zc.business.domain.DcDevice; |
|
|
|
import com.zc.business.message.device.handler.DeviceMessageHandler; |
|
|
|
import com.zc.business.service.IDcDeviceService; |
|
|
|
import com.zc.common.core.redis.stream.RedisStream; |
|
|
|
import org.slf4j.Logger; |
|
|
@ -15,7 +16,6 @@ import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import javax.annotation.Resource; |
|
|
|
import java.util.List; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
/** |
|
|
|
* 在线消息监听 |
|
|
@ -29,7 +29,7 @@ public class OnlineMessageListener implements StreamListener<String, ObjectRecor |
|
|
|
private RedisStream redisStream; |
|
|
|
|
|
|
|
@Resource |
|
|
|
private IDcDeviceService dcDeviceService; |
|
|
|
private DeviceMessageHandler deviceMessageHandler; |
|
|
|
|
|
|
|
@Resource |
|
|
|
private ThreadPoolTaskExecutor threadPoolTaskExecutor; |
|
|
@ -40,28 +40,11 @@ public class OnlineMessageListener implements StreamListener<String, ObjectRecor |
|
|
|
RecordId recordId = message.getId(); |
|
|
|
|
|
|
|
threadPoolTaskExecutor.execute(() -> { |
|
|
|
List<String> list = JSON.parseArray(message.getValue(), String.class); |
|
|
|
this.handle(list); |
|
|
|
List<String> data = JSON.parseArray(message.getValue(), String.class); |
|
|
|
deviceMessageHandler.updateDeviceState(data, DcDevice.ONLINE); |
|
|
|
}); |
|
|
|
|
|
|
|
// 消费完后直接删除消息
|
|
|
|
redisStream.del(streamKay, String.valueOf(recordId)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 处理在线消息 |
|
|
|
* @param msg 在线消息 |
|
|
|
*/ |
|
|
|
private void handle(List<String> msg) { |
|
|
|
List<DcDevice> dcDevices = msg.stream().map(iotDeviceId -> { |
|
|
|
DcDevice dcDevice = new DcDevice(); |
|
|
|
dcDevice.setDeviceState(DcDevice.ONLINE); |
|
|
|
dcDevice.setIotDeviceId(iotDeviceId); |
|
|
|
return dcDevice; |
|
|
|
}).collect(Collectors.toList()); |
|
|
|
|
|
|
|
// 批量更新设备状态
|
|
|
|
dcDeviceService.batchUpdate(dcDevices); |
|
|
|
} |
|
|
|
} |