You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
140 lines
3.6 KiB
140 lines
3.6 KiB
package com.zc.websocket.handler;
|
|
|
|
import com.alibaba.fastjson.JSONValidator;
|
|
import com.google.gson.Gson;
|
|
import com.google.gson.JsonObject;
|
|
import com.google.gson.reflect.TypeToken;
|
|
import com.ruoyi.common.constant.Constants;
|
|
import com.ruoyi.common.core.domain.model.LoginUser;
|
|
import com.ruoyi.common.core.redis.RedisCache;
|
|
import com.ruoyi.common.utils.spring.SpringUtils;
|
|
import com.zc.common.constant.RedisStreamConstants;
|
|
import com.zc.common.core.redis.stream.RedisStream;
|
|
import com.zc.common.core.websocket.pojo.CmdMsg;
|
|
import com.zc.websocket.dto.param.EventParam;
|
|
import com.zc.websocket.constant.WebSocketMsgEvent;
|
|
import io.netty.channel.Channel;
|
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
|
import com.zc.websocket.util.MsgUtil;
|
|
|
|
import java.lang.reflect.Type;
|
|
import java.util.logging.Logger;
|
|
|
|
/**
|
|
* 给客户端发送消息
|
|
* @author Athena-xiepufeng
|
|
*/
|
|
public class WebSocketService
|
|
{
|
|
|
|
private static final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
|
private static final Logger log = Logger.getLogger(WebSocketService.class.getName());
|
|
|
|
private static final RedisStream redisStream = SpringUtils.getBean(RedisStream.class);
|
|
|
|
/**
|
|
* 给指定客户端发送消息
|
|
* @param subscriber
|
|
* @param content
|
|
* @return
|
|
*/
|
|
public static boolean postEvent(String subscriber, String content)
|
|
{
|
|
|
|
if (subscriber == null || subscriber.isEmpty())
|
|
{
|
|
return false;
|
|
}
|
|
|
|
Channel channel = MsgUtil.getChannelByTokenSN(subscriber);
|
|
|
|
if (channel == null)
|
|
{
|
|
log.warning("发布事件失败, 因为事件订阅者 channel:" + subscriber + " 不存在");
|
|
cancelSubscription(subscriber, content);
|
|
return false;
|
|
}
|
|
|
|
channel.writeAndFlush(new TextWebSocketFrame(content));
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
/**
|
|
* 群发消息
|
|
* @param content
|
|
* @return
|
|
*/
|
|
public static boolean postEvent(String content)
|
|
{
|
|
if (content == null || content.isEmpty())
|
|
{
|
|
return false;
|
|
}
|
|
|
|
MsgUtil.channels.writeAndFlush(new TextWebSocketFrame(content));
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
/**
|
|
* 取消存在的消息订阅
|
|
* @param subscriber
|
|
* @param content
|
|
*/
|
|
private static void cancelSubscription(String subscriber, String content) {
|
|
if (subscriber == null || content == null) {
|
|
return;
|
|
}
|
|
|
|
if (!JSONValidator.from(content).validate()) {
|
|
return;
|
|
}
|
|
|
|
Type type = new TypeToken<CmdMsg<EventParam<String>>>()
|
|
{
|
|
}.getType();
|
|
|
|
Gson gson = new Gson();
|
|
|
|
CmdMsg<EventParam<String>> cmdMsg;
|
|
|
|
try {
|
|
cmdMsg = gson.fromJson(content, type);
|
|
}catch (Exception e) {
|
|
return;
|
|
}
|
|
|
|
EventParam<String> eventParam = cmdMsg.getParams();
|
|
|
|
if (eventParam == null) {
|
|
return;
|
|
}
|
|
|
|
if (!WebSocketMsgEvent.SUB.equals(eventParam.getSubEvent())) {
|
|
return;
|
|
}
|
|
|
|
LoginUser user = redisCache.getCacheObject(Constants.LOGIN_TOKEN_KEY + subscriber);
|
|
|
|
if (user != null) {
|
|
return;
|
|
}
|
|
|
|
EventParam<JsonObject> param = new EventParam<>();
|
|
|
|
param.setSubEvent(WebSocketMsgEvent.UNSUB);
|
|
param.setTokenSN(subscriber);
|
|
param.setContent(new JsonObject());
|
|
|
|
String msg = gson.toJson(param, new TypeToken<EventParam<JsonObject>>()
|
|
{
|
|
}.getType());
|
|
|
|
redisStream.add(RedisStreamConstants.WebSocketStreamEvent.KEY, "", msg);
|
|
}
|
|
|
|
}
|
|
|