文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

java整合WebSocket

2023-10-03 19:58

关注

WebSocket

一、WebSocket介绍

1、简介

WebSocket协议通过在客户端和服务端之间提供全双工通信来进行Web和服务器的交互功能。在WebSocket应用程序中,服务器发布WebSocket端点,客户端使用url连接到服务器。建立连接后,服务器和客户端就可以互相发送消息。客户端通常连接到一台服务器,服务器接受多个客户端的连接。

2、优势

HTPP协议是基于请求响应模式,并且无状态的。HTTP通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
如果我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)

3、服务端注解

@ServerEndpoint(“/websocket/{uid}”)
申明这是一个websocket服务;
需要指定访问该服务的地址,在地址中可以指定参数,需要通过{}进行占位;

@OnOpen
用法:public void onOpen(Session session, @PathParam(“uid”) String uid) throws IOException{}
该方法将在建立连接后执行,会传入session对象,就是客户端与服务端建立的长连接通道,通过@PathParam获取url中声明的参数;

@OnClose
用法:public void onClose() {}
该方法是在连接关闭后执行;

@OnMessage
用法:public void onMessage(String message, Session session) throws IOException {}
该方法用于接收客户端发送的消息;
message:发来的消息数据;
session:会话对象(也是长连接通道);
发送消息到客户端;
用法:session.getBasicRemote().sendText(“hello,websocket.”);
通过session进行消息发送;

二、springboot整合

1、引入依赖

<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-websocket</artifactId>        </dependency>

2、配置

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configurationpublic class WebSocketConfig {    @Bean    public ServerEndpointExporter serverEndpointExporter() {        return new ServerEndpointExporter();    }}

3、业务代码

>>群聊

import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;@Component@Slf4j@ServerEndpoint(value = "/api/pushMessageMulti/{roomId}/{userId}",encoders = { ServerEncoder.class })public class WebSocketServerMulti {        private static int onlineCount = 0;        private static Map<String, WebSocketServerMulti> userMap = new ConcurrentHashMap<>();    //存放房间对象    private static Map<String, Set<WebSocketServerMulti>> roomMap = new ConcurrentHashMap<>();        private Session session;        private String userId = "";    //存出当前群聊在线的人数(使用原因:roomMap中无法获取到当前用户id)    private static Map<String, List<String>> multiUser = new ConcurrentHashMap<>();        @OnOpen    public void onOpen(Session session,@PathParam("roomId") String roomId , @PathParam("userId") String userId) throws IOException, EncodeException {        synchronized (session){            try {                this.session = session;                this.userId=userId;                userMap.put(userId,this);                if (!roomMap.containsKey(roomId)) {                    Set<WebSocketServerMulti> set = new HashSet<>();                    set.add(userMap.get(userId));                    roomMap.put(roomId,set);                    List<String> dd = new LinkedList<>();                    dd.add(userId);                    multiUser.put(roomId,dd);                } else {                    if(multiUser.get(roomId).contains(userId)){                        log.info("存在:房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());                    }else{                        multiUser.get(roomId).add(userId);                        roomMap.get(roomId).add(this);                    }                }                System.out.println(multiUser.get(roomId).size());                log.info("房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());                Map<String,Object> map = new HashMap<>();                map.put("online_num",multiUser.get(roomId).size());//在线人数                map.put("online_list",roomMap.get(roomId));//人员列表                map.put("roomId",roomId);//群id                map.put("message","用户"+***+"加入群聊");//消息//自定义发送消息                sendMessageObject(map,roomId);            }catch (Exception e){                e.printStackTrace();            }        }    }        @OnClose    public void onClose( @PathParam("roomId") String roomId,@PathParam("userId") String userId) {        try {            if (roomMap.containsKey(roomId)) {                Set<WebSocketServerMulti> set = roomMap.get(roomId);                Iterator<WebSocketServerMulti> it = set.iterator();                while (it.hasNext()) {                    if (it.next().userId.equals(userId)) {                        it.remove();                    }                }                multiUser.get(roomId).remove(userId);                log.info("房间号:"+roomId+"用户退出:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());                Map<String,Object> map = new HashMap<>();                map.put("online_num",multiUser.get(roomId).size());//在线人数                map.put("online_list",roomMap.get(roomId));//人员列表                map.put("roomId",roomId);//群id                map.put("message","用户"+***+"加入群聊");//消息//自定义发送消息                sendMessageObject(map,roomId);            }        }catch (Exception e){            e.printStackTrace();        }    }        @OnMessage    public void onMessage(String message) {        //注意,要给session加上同步锁,否则会出现多个线程同时往同一个session写数据,导致报错的情况。        synchronized (session){            //可以群发消息            if(StringUtils.isNotBlank(message)){                try {                    //解析发送的报文                    JSONObject jsonObject = JSONObject.parseObject(message);                    //追加发送人(防止串改)                    jsonObject.put("fromUserId",this.userId);                    int chatType=jsonObject.getInteger("chatType");                    String myUserId=jsonObject.getString("myUserId");                    String toRoomId=jsonObject.getString("toRoomId");                    log.info("房间号:"+toRoomId+"用户消息:"+userId+",报文:"+message);                    sendMessageTo(message,toRoomId);                }catch (Exception e){                    e.printStackTrace();                }            }        }    }        public void sendMessageTo(String message , String roomId) throws IOException {        if (roomMap.containsKey(roomId)) {            for (WebSocketServerMulti item : roomMap.get(roomId)) {                    item.session.getAsyncRemote().sendText(message);            }        }    }        @OnError    public SystemResult onError(Throwable error) {        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());        ChatError chatError = new ChatError();        chatError.setUserId(Integer.valueOf(this.userId));        chatError.setDetails(error.getMessage());        chatError.setAddTime(new Date());        chatErrorMapper.insert(chatError);        SystemResult systemResult = new SystemResult();        return systemResult.error(error.getMessage());    }        public void sendMessage(String message) {        try {            this.session.getBasicRemote().sendText(message);        } catch (IOException e) {            e.printStackTrace();        }    }        public void sendMessageObject(Object message,String roomId) {        try {            if (roomMap.containsKey(roomId)) {                for (WebSocketServerMulti item : roomMap.get(roomId)) {                    item.session.getBasicRemote().sendObject(message);                }            }        } catch (Exception e) {            e.printStackTrace();        }    }        public static synchronized int getOnlineCount() {        return onlineCount;    }        public static synchronized void addOnlineCount() {        WebSocketServerMulti.onlineCount++;    }        public static synchronized void subOnlineCount() {        WebSocketServerMulti.onlineCount--;    }}

>>单人聊天

import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;@Component@Slf4j@ServerEndpoint("/api/pushMessageSolo/{userId}")public class WebSocketServerSolo {        private static int onlineCount = 0;        private static ConcurrentHashMap<String, WebSocketServerSolo> webSocketMap = new ConcurrentHashMap<>();        private Session session;        private String userId = "";        @OnOpen    public void onOpen(Session session, @PathParam("userId") String userId) {        this.session = session;        this.userId=userId;        if(webSocketMap.containsKey(userId)){            webSocketMap.remove(userId);            //加入set中            webSocketMap.put(userId,this);        }else{            //加入set中            webSocketMap.put(userId,this);            //在线数加1            addOnlineCount();        }        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());        sendMessage("连接成功");    }        @OnClose    public void onClose() {        if(webSocketMap.containsKey(userId)){            webSocketMap.remove(userId);            //从set中删除            subOnlineCount();        }        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());    }        @OnMessage    public void onMessage(String message, Session session) {        log.info("用户消息:"+userId+",报文:"+message);        //可以群发消息        //消息保存到数据库、redis        if(StringUtils.isNotBlank(message)){            try {                //解析发送的报文                JSONObject jsonObject = JSONObject.parseObject(message);                //追加发送人(防止串改)                jsonObject.put("fromUserId",this.userId);                String toUserId=jsonObject.getString("toUserId");                String myUserId=jsonObject.getString("myUserId");                //传送给对应toUserId用户的websocket                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){                    webSocketMap.get(toUserId).sendMessage(message);                }else{                    //否则不在这个服务器上,发送到mysql或者redis                    log.error("请求的userId:"+toUserId+"不在该服务器上");                }                if(StringUtils.isNotBlank(myUserId)&&webSocketMap.containsKey(myUserId)){                    webSocketMap.get(myUserId).sendMessage(message);                }else{                    //否则不在这个服务器上,发送到mysql或者redis                    log.error("请求的userId:"+myUserId+"不在该服务器上");                }            }catch (Exception e){                e.printStackTrace();            }        }    }        @OnError    public void onError(Session session, Throwable error) {        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());        error.printStackTrace();    }        public void sendMessage(String message) {        try {            this.session.getBasicRemote().sendText(message);        } catch (IOException e) {            e.printStackTrace();        }    }        public static void sendInfo(String message, String userId) {        log.info("发送消息到:"+userId+",报文:"+message);        if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){            webSocketMap.get(userId).sendMessage(message);        }else{            log.error("用户"+userId+",不在线!");        }    }        public static synchronized int getOnlineCount() {        return onlineCount;    }        public static synchronized void addOnlineCount() {        WebSocketServerSolo.onlineCount++;    }        public static synchronized void subOnlineCount() {        WebSocketServerSolo.onlineCount--;    }}

三、部署websocket项目问题

1、webSocket功能失效

本地开发的时候都可以正常使用,但是在部署到nginx代理服务器的时候发现报了错误,连不上,报错:Error in connection establishment: net::ERR_NAME_NOT_RESOLVED
发现是nginx服务器默认是不打开webSocket的功能的,这需要我们在nginx服务器上配置:

 location /test/ {                proxy_pass http://test.com;                proxy_redirect default;                proxy_set_header Upgrade $http_upgrade; # allow websockets                proxy_set_header Connection "upgrade";                proxy_http_version 1.1;                }

2、断线重连

如果nginx没有设置读取超时时间,websocket会一直断线重连,大约一分钟重连一次
可以设置长时间得超时时间,避免一直断线重连,避免消耗内存

location /test{    proxy_pass  http://test.com;    proxy_set_header Upgrade $http_upgrade; # allow websockets    proxy_set_header Connection "upgrade";    proxy_http_version 1.1;        proxy_connect_timeout 60s;#l连接超时时间,不能设置太长会浪费连接资源    proxy_read_timeout 500s;#读超时时间    proxy_send_timeout 500s;#写超时时间            index  index.html index.htm;        }

来源地址:https://blog.csdn.net/xujx321/article/details/131833738

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯