spring websocket stomp,springboot websocket redis
墨初 知识笔记 99阅读
引入maven依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
WebScoket配置处理器

import org.springframework.boot.web.servlet.ServletContextInitializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.servlet.ServletContext;/** * WebScoket配置处理器 */Configurationpublic class WebSocketConfig implements ServletContextInitializer { /** * ServerEndpointExporter 作用 * * 这个Bean会自动注册使用ServerEndpoint注解声明的websocket endpoint * * return */Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } //设置websocket发送内容长度 Override public void onStartup(ServletContext servletContext) { servletContext.setInitParameter(org.apache.tomcat.websocket.textBufferSize,22428800); }}
webScoket消息对象
import com.alibaba.fastjson.annotation.JSONField;import lombok.Data;import java.util.Date;/*** author ws* date 20223/10/26 15:59* Description WebSocketMessage*/Datapublic class WebSocketMessage {/*** 用户ID*/private String fromId;/*** 对方ID*/private String toOtherId;//消息内容private String message;//发送时间JSONField(formatyyyy-MM-dd HH:mm:ss)public Date date;}
WebSocket操作类

import cn.hutool.core.collection.ListUtil;import com.alibaba.fastjson.JSON;import com.ws.wxyinghang.entity.WebSocketMessage;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.Iterator;import java.util.List;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArraySet;/** * author ws * date 20223/10/26 15:59 * Description WebSocket操作类 */ServerEndpoint(/websocket/{userId})ComponentSlf4jpublic class WebSocketSever { // 与某个客户端的连接会话需要通过它来给客户端发送数据 private Session session; private String userId; // session集合,存放对应的session private static ConcurrentHashMap<String, Session> sessionPool new ConcurrentHashMap<>(); // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 private static CopyOnWriteArraySet<WebSocketSever> webSocketSet new CopyOnWriteArraySet<>(); // 用于存放离线消息 private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap new ConcurrentHashMap(); /** * 建立WebSocket连接 * * param session * param userId 用户ID */ OnOpen public void onOpen(Session session, PathParam(value userId) String userId) { log.info(WebSocket建立连接中,连接用户ID{},ip{}, userId,ip); try { Session historySession sessionPool.get(userId); // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象 if (historySession ! null) { webSocketSet.remove(historySession); historySession.close(); } } catch (IOException e) { log.error(重复登录异常,错误信息 e.getMessage(), e); } // 建立连接 this.session session; this.userId userId; webSocketSet.add(this); sessionPool.put(userId, session); //从离线消息队列里面获取消息 if (offlineMessageMap.containsKey(userId)) { List<WebSocketMessage> list offlineMessageMap.get(userId); Iterator it list.iterator(); while (it.hasNext()) { Object x it.next(); //离线消息接收成功后删除消息 Boolean bb sendOfflineMessageByUser(JSON.toJSONString(x)); if (bb) { System.out.println(从队列中删除离线消息 x); it.remove(); } } offlineMessageMap.remove(userId); } log.info(建立连接完成,当前在线人数为{}, webSocketSet.size()); } /** * 发生错误 * * param throwable e */ OnError public void onError(Throwable throwable) { throwable.printStackTrace(); } /** * 连接关闭 */ OnClose public void onClose() { webSocketSet.remove(this); sessionPool.remove(this.userId); log.info(连接断开,当前在线人数为{}, webSocketSet.size()); } /** * 接收客户端消息 * * param message 接收的消息 */ OnMessage public void onMessage(String message) { log.info(收到客户端发来的消息{}, message); sendMessageByUser(message); } /** * 推送消息到指定用户 * * param message 发送的消息 */ public static Boolean sendMessageByUser(String message) { WebSocketMessage msg JSON.parseObject(message, WebSocketMessage.class); log.info(用户ID msg.getToOtherId() ,推送内容 message); Session session sessionPool.get(msg.getToOtherId()); //判断session是否正常 if (session null || !session.isOpen()) { log.info(用户ID msg.getToOtherId() ,离线放入离线消息队列中); if (offlineMessageMap.containsKey(msg.getToOtherId())) { List<WebSocketMessage> list offlineMessageMap.get(msg.getToOtherId()); list.add(msg); offlineMessageMap.put(msg.getToOtherId(), list); } else { offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg)); } }//发送消息 else { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error(推送消息到指定用户发生错误 e.getMessage(), e); return false; } } return true; } //发送离线消息 public static Boolean sendOfflineMessageByUser(String message) { WebSocketMessage msg JSON.parseObject(message, WebSocketMessage.class); log.info(用户ID msg.getToOtherId() ,推送内容 message); Session session sessionPool.get(msg.getToOtherId()); try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error(推送消息到指定用户发生错误 e.getMessage(), e); return false; } return true; } /** * 群发消息 * * param message 发送的消息 */ public static void sendAllMessage(String message) { log.info(发送消息{}, message); for (WebSocketSever webSocket : webSocketSet) { try { webSocket.session.getBasicRemote().sendText(message); } catch (IOException e) { log.error(群发消息发生错误 e.getMessage(), e); } } }}
启动项目使用apiFox测试新建webScoket接口
新建websocket1连接后发送消息
新建webScoket2 可以看到连接后接收到了消息
如果webScoket2断开连接后 webScoket1继续发送消息等webScoket2连接后就会收到离线的消息。
标签: