欢迎来到飞鸟慕鱼博客,开始您的技术之旅!
当前位置: 首页知识笔记正文

spring websocket stomp,springboot websocket redis

墨初 知识笔记 99阅读

引入maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

 WebScoket配置处理器

import org.springframework.boot.web.servlet.ServletContextInitializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.servlet.ServletContext;/** * WebScoket配置处理器 */Configurationpublic class WebSocketConfig implements ServletContextInitializer { /**     * ServerEndpointExporter 作用     *     * 这个Bean会自动注册使用ServerEndpoint注解声明的websocket endpoint     *     * return     */Bean    public ServerEndpointExporter serverEndpointExporter() {        return new ServerEndpointExporter();    }    //设置websocket发送内容长度    Override    public void onStartup(ServletContext servletContext)  {        servletContext.setInitParameter(org.apache.tomcat.websocket.textBufferSize,22428800);    }}

webScoket消息对象

import com.alibaba.fastjson.annotation.JSONField;import lombok.Data;import java.util.Date;/*** author ws* date 20223/10/26 15:59* Description WebSocketMessage*/Datapublic class WebSocketMessage {/*** 用户ID*/private String fromId;/*** 对方ID*/private String toOtherId;//消息内容private String message;//发送时间JSONField(formatyyyy-MM-dd HH:mm:ss)public Date date;}

WebSocket操作类

import cn.hutool.core.collection.ListUtil;import com.alibaba.fastjson.JSON;import com.ws.wxyinghang.entity.WebSocketMessage;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.Iterator;import java.util.List;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArraySet;/** * author ws * date 20223/10/26 15:59 * Description WebSocket操作类 */ServerEndpoint(/websocket/{userId})ComponentSlf4jpublic class WebSocketSever {    // 与某个客户端的连接会话需要通过它来给客户端发送数据    private Session session;    private String userId;    // session集合,存放对应的session    private static ConcurrentHashMap<String, Session> sessionPool  new ConcurrentHashMap<>();    // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。    private static CopyOnWriteArraySet<WebSocketSever> webSocketSet  new CopyOnWriteArraySet<>();    // 用于存放离线消息    private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap  new ConcurrentHashMap();    /**     * 建立WebSocket连接     *     * param session     * param userId  用户ID     */    OnOpen    public void onOpen(Session session, PathParam(value  userId) String userId) {        log.info(WebSocket建立连接中,连接用户ID{},ip{}, userId,ip);        try {            Session historySession  sessionPool.get(userId);            // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象            if (historySession ! null) {                webSocketSet.remove(historySession);                historySession.close();            }        } catch (IOException e) {            log.error(重复登录异常,错误信息  e.getMessage(), e);        }        // 建立连接        this.session  session;        this.userId  userId;        webSocketSet.add(this);        sessionPool.put(userId, session);        //从离线消息队列里面获取消息        if (offlineMessageMap.containsKey(userId)) {            List<WebSocketMessage> list  offlineMessageMap.get(userId);            Iterator it  list.iterator();            while (it.hasNext()) {                Object x  it.next();                //离线消息接收成功后删除消息                Boolean bb  sendOfflineMessageByUser(JSON.toJSONString(x));                if (bb) {                    System.out.println(从队列中删除离线消息  x);                    it.remove();                }            }            offlineMessageMap.remove(userId);        }        log.info(建立连接完成,当前在线人数为{}, webSocketSet.size());    }    /**     * 发生错误     *     * param throwable e     */    OnError    public void onError(Throwable throwable) {        throwable.printStackTrace();    }    /**     * 连接关闭     */    OnClose    public void onClose() {        webSocketSet.remove(this);        sessionPool.remove(this.userId);        log.info(连接断开,当前在线人数为{}, webSocketSet.size());    }    /**     * 接收客户端消息     *     * param message 接收的消息     */    OnMessage    public void onMessage(String message) {        log.info(收到客户端发来的消息{}, message);        sendMessageByUser(message);    }    /**     * 推送消息到指定用户     *     * param message 发送的消息     */    public static Boolean sendMessageByUser(String message) {        WebSocketMessage msg  JSON.parseObject(message, WebSocketMessage.class);        log.info(用户ID  msg.getToOtherId()  ,推送内容  message);        Session session  sessionPool.get(msg.getToOtherId());        //判断session是否正常        if (session  null || !session.isOpen()) {            log.info(用户ID  msg.getToOtherId()  ,离线放入离线消息队列中);            if (offlineMessageMap.containsKey(msg.getToOtherId())) {                List<WebSocketMessage> list  offlineMessageMap.get(msg.getToOtherId());                list.add(msg);                offlineMessageMap.put(msg.getToOtherId(), list);            } else {                offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));            }        }//发送消息        else {            try {                session.getBasicRemote().sendText(message);            } catch (IOException e) {                log.error(推送消息到指定用户发生错误  e.getMessage(), e);                return false;            }        }        return true;    }    //发送离线消息    public static Boolean sendOfflineMessageByUser(String message) {        WebSocketMessage msg  JSON.parseObject(message, WebSocketMessage.class);        log.info(用户ID  msg.getToOtherId()  ,推送内容  message);        Session session  sessionPool.get(msg.getToOtherId());        try {            session.getBasicRemote().sendText(message);        } catch (IOException e) {            log.error(推送消息到指定用户发生错误  e.getMessage(), e);            return false;        }        return true;    }    /**     * 群发消息     *     * param message 发送的消息     */    public static void sendAllMessage(String message) {        log.info(发送消息{}, message);        for (WebSocketSever webSocket : webSocketSet) {            try {                webSocket.session.getBasicRemote().sendText(message);            } catch (IOException e) {                log.error(群发消息发生错误  e.getMessage(), e);            }        }    }}

启动项目使用apiFox测试新建webScoket接口

新建websocket1连接后发送消息 

 

新建webScoket2 可以看到连接后接收到了消息 

 

如果webScoket2断开连接后 webScoket1继续发送消息等webScoket2连接后就会收到离线的消息。

标签:
声明:无特别说明,转载请标明本文来源!