WebSocket
一、WebSocket介绍
1、简介
WebSocket协议通过在客户端和服务端之间提供全双工通信来进行Web和服务器的交互功能。在WebSocket应用程序中,服务器发布WebSocket端点,客户端使用url连接到服务器。建立连接后,服务器和客户端就可以互相发送消息。客户端通常连接到一台服务器,服务器接受多个客户端的连接。
2、优势
HTPP协议是基于请求响应模式,并且无状态的。HTTP通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
如果我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)
3、服务端注解
@ServerEndpoint(“/websocket/{uid}”)
申明这是一个websocket服务;
需要指定访问该服务的地址,在地址中可以指定参数,需要通过{}进行占位;
@OnOpen
用法:public void onOpen(Session session, @PathParam(“uid”) String uid) throws IOException{}
该方法将在建立连接后执行,会传入session对象,就是客户端与服务端建立的长连接通道,通过@PathParam获取url中声明的参数;
@OnClose
用法:public void onClose() {}
该方法是在连接关闭后执行;
@OnMessage
用法:public void onMessage(String message, Session session) throws IOException {}
该方法用于接收客户端发送的消息;
message:发来的消息数据;
session:会话对象(也是长连接通道);
发送消息到客户端;
用法:session.getBasicRemote().sendText(“hello,websocket.”);
通过session进行消息发送;
二、springboot整合
1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2、配置
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configurationpublic class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
3、业务代码
>>群聊
import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;@Component@Slf4j@ServerEndpoint(value = "/api/pushMessageMulti/{roomId}/{userId}",encoders = { ServerEncoder.class })public class WebSocketServerMulti { private static int onlineCount = 0; private static Map<String, WebSocketServerMulti> userMap = new ConcurrentHashMap<>(); //存放房间对象 private static Map<String, Set<WebSocketServerMulti>> roomMap = new ConcurrentHashMap<>(); private Session session; private String userId = ""; //存出当前群聊在线的人数(使用原因:roomMap中无法获取到当前用户id) private static Map<String, List<String>> multiUser = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session,@PathParam("roomId") String roomId , @PathParam("userId") String userId) throws IOException, EncodeException { synchronized (session){ try { this.session = session; this.userId=userId; userMap.put(userId,this); if (!roomMap.containsKey(roomId)) { Set<WebSocketServerMulti> set = new HashSet<>(); set.add(userMap.get(userId)); roomMap.put(roomId,set); List<String> dd = new LinkedList<>(); dd.add(userId); multiUser.put(roomId,dd); } else { if(multiUser.get(roomId).contains(userId)){ log.info("存在:房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size()); }else{ multiUser.get(roomId).add(userId); roomMap.get(roomId).add(this); } } System.out.println(multiUser.get(roomId).size()); log.info("房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size()); Map<String,Object> map = new HashMap<>(); map.put("online_num",multiUser.get(roomId).size());//在线人数 map.put("online_list",roomMap.get(roomId));//人员列表 map.put("roomId",roomId);//群id map.put("message","用户"+***+"加入群聊");//消息//自定义发送消息 sendMessageObject(map,roomId); }catch (Exception e){ e.printStackTrace(); } } } @OnClose public void onClose( @PathParam("roomId") String roomId,@PathParam("userId") String userId) { try { if (roomMap.containsKey(roomId)) { Set<WebSocketServerMulti> set = roomMap.get(roomId); Iterator<WebSocketServerMulti> it = set.iterator(); while (it.hasNext()) { if (it.next().userId.equals(userId)) { it.remove(); } } multiUser.get(roomId).remove(userId); log.info("房间号:"+roomId+"用户退出:"+userId+",当前在线人数为:" + multiUser.get(roomId).size()); Map<String,Object> map = new HashMap<>(); map.put("online_num",multiUser.get(roomId).size());//在线人数 map.put("online_list",roomMap.get(roomId));//人员列表 map.put("roomId",roomId);//群id map.put("message","用户"+***+"加入群聊");//消息//自定义发送消息 sendMessageObject(map,roomId); } }catch (Exception e){ e.printStackTrace(); } } @OnMessage public void onMessage(String message) { //注意,要给session加上同步锁,否则会出现多个线程同时往同一个session写数据,导致报错的情况。 synchronized (session){ //可以群发消息 if(StringUtils.isNotBlank(message)){ try { //解析发送的报文 JSONObject jsonObject = JSONObject.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId",this.userId); int chatType=jsonObject.getInteger("chatType"); String myUserId=jsonObject.getString("myUserId"); String toRoomId=jsonObject.getString("toRoomId"); log.info("房间号:"+toRoomId+"用户消息:"+userId+",报文:"+message); sendMessageTo(message,toRoomId); }catch (Exception e){ e.printStackTrace(); } } } } public void sendMessageTo(String message , String roomId) throws IOException { if (roomMap.containsKey(roomId)) { for (WebSocketServerMulti item : roomMap.get(roomId)) { item.session.getAsyncRemote().sendText(message); } } } @OnError public SystemResult onError(Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); ChatError chatError = new ChatError(); chatError.setUserId(Integer.valueOf(this.userId)); chatError.setDetails(error.getMessage()); chatError.setAddTime(new Date()); chatErrorMapper.insert(chatError); SystemResult systemResult = new SystemResult(); return systemResult.error(error.getMessage()); } public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } public void sendMessageObject(Object message,String roomId) { try { if (roomMap.containsKey(roomId)) { for (WebSocketServerMulti item : roomMap.get(roomId)) { item.session.getBasicRemote().sendObject(message); } } } catch (Exception e) { e.printStackTrace(); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServerMulti.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServerMulti.onlineCount--; }}
>>单人聊天
import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;@Component@Slf4j@ServerEndpoint("/api/pushMessageSolo/{userId}")public class WebSocketServerSolo { private static int onlineCount = 0; private static ConcurrentHashMap<String, WebSocketServerSolo> webSocketMap = new ConcurrentHashMap<>(); private Session session; private String userId = ""; @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId=userId; if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //加入set中 webSocketMap.put(userId,this); }else{ //加入set中 webSocketMap.put(userId,this); //在线数加1 addOnlineCount(); } log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount()); sendMessage("连接成功"); } @OnClose public void onClose() { if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount()); } @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:"+userId+",报文:"+message); //可以群发消息 //消息保存到数据库、redis if(StringUtils.isNotBlank(message)){ try { //解析发送的报文 JSONObject jsonObject = JSONObject.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId",this.userId); String toUserId=jsonObject.getString("toUserId"); String myUserId=jsonObject.getString("myUserId"); //传送给对应toUserId用户的websocket if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){ webSocketMap.get(toUserId).sendMessage(message); }else{ //否则不在这个服务器上,发送到mysql或者redis log.error("请求的userId:"+toUserId+"不在该服务器上"); } if(StringUtils.isNotBlank(myUserId)&&webSocketMap.containsKey(myUserId)){ webSocketMap.get(myUserId).sendMessage(message); }else{ //否则不在这个服务器上,发送到mysql或者redis log.error("请求的userId:"+myUserId+"不在该服务器上"); } }catch (Exception e){ e.printStackTrace(); } } } @OnError public void onError(Session session, Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); } public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } public static void sendInfo(String message, String userId) { log.info("发送消息到:"+userId+",报文:"+message); if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage(message); }else{ log.error("用户"+userId+",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServerSolo.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServerSolo.onlineCount--; }}
三、部署websocket项目问题
1、webSocket功能失效
本地开发的时候都可以正常使用,但是在部署到nginx代理服务器的时候发现报了错误,连不上,报错:Error in connection establishment: net::ERR_NAME_NOT_RESOLVED
发现是nginx服务器默认是不打开webSocket的功能的,这需要我们在nginx服务器上配置:
location /test/ { proxy_pass http://test.com; proxy_redirect default; proxy_set_header Upgrade $http_upgrade; # allow websockets proxy_set_header Connection "upgrade"; proxy_http_version 1.1; }
2、断线重连
如果nginx没有设置读取超时时间,websocket会一直断线重连,大约一分钟重连一次
可以设置长时间得超时时间,避免一直断线重连,避免消耗内存
location /test{ proxy_pass http://test.com; proxy_set_header Upgrade $http_upgrade; # allow websockets proxy_set_header Connection "upgrade"; proxy_http_version 1.1; proxy_connect_timeout 60s;#l连接超时时间,不能设置太长会浪费连接资源 proxy_read_timeout 500s;#读超时时间 proxy_send_timeout 500s;#写超时时间 index index.html index.htm; }
来源地址:https://blog.csdn.net/xujx321/article/details/131833738