java 实现 websocket的五种方式

分类: bet3365.com 2025-07-10 17:34:24 作者: admin

转载自

1. 前言

本文主要讲述在Java技术领域实现websocket服务的五种方式.

2. 第一种使用Java原生代码实现websocket

使用Java原生代码实现websocket服务的方法, 此方法需要引入一个第三方库java-websocket.jar. 截至目前2023/01/01最新版本为1.5.3.

项目源代码位于: https://github.com/TooTallNate/Java-WebSocket

示例代码位于: https://github.com/TooTallNate/Java-WebSocket/tree/master/src/main/example

2.1. 首先在项目中引入依赖

如果你的项目使用gradle作为管理工具, 可以添加以下gradle依赖

implementation group: 'org.java-websocket', name: 'Java-WebSocket', version: '1.5.3'

如果你的项目使用maven进行管理, 可以添加以下maven依赖

mven依赖

org.java-websocket

Java-WebSocket

1.5.3

2.2. 创建WebsocketServer类

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.InetSocketAddress;

import java.net.UnknownHostException;

import org.java_websocket.WebSocket;

import org.java_websocket.handshake.ClientHandshake;

import org.java_websocket.server.WebSocketServer;

public class SocketServer extends WebSocketServer {

public SocketServer(int port) throws UnknownHostException {

super(new InetSocketAddress(port));

}

public SocketServer(InetSocketAddress address) {

super(address);

}

@Override

public void onOpen(WebSocket conn, ClientHandshake handshake) {

conn.send("Welcome to the server!"); // This method sends a message to the new client

broadcast("new connection: " + handshake

.getResourceDescriptor()); // This method sends a message to all clients connected

System.out.println(

conn.getRemoteSocketAddress().getAddress().getHostAddress() + " entered the room!");

}

@Override

public void onClose(WebSocket conn, int code, String reason, boolean remote) {

broadcast(conn + " has left the room!");

System.out.println(conn + " has left the room!");

}

@Override

public void onMessage(WebSocket conn, String message) {

broadcast(message);

System.out.println(conn + ": " + message);

}

@Override

public void onError(WebSocket conn, Exception ex) {

ex.printStackTrace();

if (conn != null) {

// some errors like port binding failed may not be assignable to a specific

// websocket

}

}

@Override

public void onStart() {

System.out.println("Server started!");

setConnectionLostTimeout(0);

setConnectionLostTimeout(100);

}

}

2.3. 启动SocketServer

我们以及创建好了SocketServer, 这个时候我们可以启动它了, 启动代码如下.

public static void main(String[] args) throws InterruptedException, IOException {

int port = 8887; // 843 flash policy port

SocketServer s = new SocketServer(port);

s.start();

System.out.println("ChatServer started on port: " + s.getPort());

BufferedReader sysin = new BufferedReader(new InputStreamReader(System.in));

while (true) {

String in = sysin.readLine();

s.broadcast(in);

if (in.equals("exit")) {

s.stop(1000);

break;

}

}

}

写好main方法后, 我们可以启动它, 当控制台输出ChatServer started on port: 8887表示启动成功.

2.4. 测试web socket server

此时web socket server已经监听在了localhost:8887上. 我们可以使用websocket在线调试工具对其进行测试.

该工具主要是利用html5 的websocket去连接服务端的websocket,因此,无论你是内网还是外网都可使用!

打开工具在输入框中输入 ws://localhost:8887点击连接, 既可以看到服务器端的反馈, 同时web socket server的控制台也会输出日志信息.

3. 使用Java原生+SpringBoot混合

在此种方式中, SocketServer依然使用原生的java代码编写, 但是SocketServer实例化过程由spring来管理.

此时我们需要引入spring-boot-starter-websocket, 上一节中的依赖包Java-WebSocket已经不需要了. 两种方式采用了不同的机制.

3.1. 引入依赖

plugins {

id 'org.springframework.boot' version '2.7.7'

id 'io.spring.dependency-management' version '1.0.15.RELEASE'

}

dependencies {

implementation 'org.springframework.boot:spring-boot-starter-websocket'

}

此处我们需要在gradle配置文件的plugins闭包内添加两个plugins, 一个复制控制spring boot的版本, 一个负责管理依赖.

对于maven, 需要如下配置

org.springframework.boot

spring-boot-starter-parent

2.7.7

org.springframework.boot

spring-boot-starter-websocket

3.2. 创建ServerEndpoint

创建ServerEndpoint代码如下:

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;

import javax.websocket.OnMessage;

import javax.websocket.OnOpen;

import javax.websocket.Session;

import javax.websocket.server.ServerEndpoint;

import java.io.IOException;

@ServerEndpoint("/myWs")

@Component

public class WsServerEndpoint {

/**

* 连接成功

*

* @param session

*/

@OnOpen

public void onOpen(Session session) {

System.out.println("连接成功");

}

/**

* 连接关闭

*

* @param session

*/

@OnClose

public void onClose(Session session) {

System.out.println("连接关闭");

}

/**

* 接收到消息

*

* @param text

*/

@OnMessage

public String onMsg(String text) throws IOException {

return "servet 发送:" + text;

}

}

说明 这里有几个注解需要注意一下,首先是他们的包都在 javax.websocket下。并不是 spring 提供的,而 jdk 自带的,下面是他们的具体作用。

@ServerEndpoint 通过这个 spring boot 就可以知道你暴露出去的 ws 应用的路径,有点类似我们经常用的@RequestMapping。比如你的启动端口是 8080,而这个注解的值是 ws,那我们就可以通过 ws://127.0.0.1:8080/ws 来连接你的应用

@OnOpen 当 websocket 建立连接成功后会触发这个注解修饰的方法,注意它有一个 Session 参数

@OnClose 当 websocket 建立的连接断开后会触发这个注解修饰的方法,注意它有一个 Session 参数

@OnMessage 当客户端发送消息到服务端时,会触发这个注解修改的方法,它有一个 String 入参表明客户端传入的值

@OnError 当 websocket 建立连接时出现异常会触发这个注解修饰的方法,注意它有一个 Session 参数

服务器主动发送消息 当服务器端要主动给客户端发送, 需要获取掉相应客户端与服务器端的session, 通过 session.getBasicRemote().sendText(), 将消息发送到前端. 因此最好在onOpen方法中将session对象保存起来, 这样下次主动连接客户端时能找到相应的session对象.

3.3. 添加Spring配置

有了WsServerEndpoint后我们还要配置ServerEndpointExporter, 将Endpoint暴露出去让客户端来建立连接.

而配置ServerEndpointExporter的方式非常简单, 只需要创建一个ServerEndpointExporter bean即可, 它会去获取Spring上下文中所有的Endpoint示例, 完成endpoint的注册过程, 并监听在application.properties 的server.port 属性所指定的端口.

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.config.annotation.EnableWebSocket;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration

@EnableWebSocket

public class WebsocketConfig {

@Bean

public ServerEndpointExporter serverEndpoint() {

return new ServerEndpointExporter();

}

}

3.4. 启动应用程序并测试

我们只需要向一般的Spring boot应用一样启动它即可.

@SpringBootApplication

public class App {

public static void main(String[] args) {

SpringApplication.run(App.class, args);

}

}

测试, 我们依然使用websocket在线调试工具来测试, 详情可参考上一节中的介绍

与上一节稍微差异的地方是, 我们可以url中指定endpoint了 ws://127.0.0.1:8080/myWs

4. 使用SpringBoot实现websocket

4.1. 引入依赖

implementation 'org.springframework.boot:spring-boot-starter-websocket'

4.2. 实现类

HttpAuthHandler.java

import org.springframework.stereotype.Component;

import org.springframework.web.socket.CloseStatus;

import org.springframework.web.socket.TextMessage;

import org.springframework.web.socket.WebSocketSession;

import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.time.LocalDateTime;

@Component

public class HttpAuthHandler extends TextWebSocketHandler {

/**

* socket 建立成功事件

*

* @param session

* @throws Exception

*/

@Override

public void afterConnectionEstablished(WebSocketSession session) throws Exception {

Object sessionId = session.getAttributes().get("session_id");

if (sessionId != null) {

// 用户连接成功,放入在线用户缓存

WsSessionManager.add(sessionId.toString(), session);

} else {

throw new RuntimeException("用户登录已经失效!");

}

}

/**

* 接收消息事件

*

* @param session

* @param message

* @throws Exception

*/

@Override

protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {

// 获得客户端传来的消息

String payload = message.getPayload();

Object sessionId = session.getAttributes().get("session_id");

System.out.println("server 接收到 " + sessionId + " 发送的 " + payload);

session.sendMessage(new TextMessage("server 发送给 " + sessionId + " 消息 " + payload + " " + LocalDateTime.now().toString()));

}

/**

* socket 断开连接时

*

* @param session

* @param status

* @throws Exception

*/

@Override

public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {

Object sessionId = session.getAttributes().get("session_id");

if (sessionId != null) {

// 用户退出,移除缓存

WsSessionManager.remove(sessionId.toString());

}

}

}

WsSessionManager.java

import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;

import java.util.concurrent.ConcurrentHashMap;

public class WsSessionManager {

/**

* 保存连接 session 的地方

*/

private static ConcurrentHashMap SESSION_POOL = new ConcurrentHashMap<>();

/**

* 添加 session

*

* @param key

*/

public static void add(String key, WebSocketSession session) {

// 添加 session

SESSION_POOL.put(key, session);

}

/**

* 删除 session,会返回删除的 session

*

* @param key

* @return

*/

public static WebSocketSession remove(String key) {

// 删除 session

return SESSION_POOL.remove(key);

}

/**

* 删除并同步关闭连接

*

* @param key

*/

public static void removeAndClose(String key) {

WebSocketSession session = remove(key);

if (session != null) {

try {

// 关闭连接

session.close();

} catch (IOException e) {

// todo: 关闭出现异常处理

e.printStackTrace();

}

}

}

/**

* 获得 session

*

* @param key

* @return

*/

public static WebSocketSession get(String key) {

// 获得 session

return SESSION_POOL.get(key);

}

}

import java.util.Map;

import org.apache.logging.log4j.util.Strings;

import org.springframework.http.server.ServerHttpRequest;

import org.springframework.http.server.ServerHttpResponse;

import org.springframework.stereotype.Component;

import org.springframework.web.socket.WebSocketHandler;

import org.springframework.web.socket.server.HandshakeInterceptor;

@Component

public class MyInterceptor implements HandshakeInterceptor {

/**

* 握手前

*/

@Override

public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception {

System.out.println("握手开始");

String hostName = request.getRemoteAddress().getHostName();

String sessionId = hostName+String.valueOf((int)(Math.random()*1000));

if (Strings.isNotBlank(sessionId)) {

// 放入属性域

attributes.put("session_id", sessionId);

System.out.println("用户 session_id " + sessionId + " 握手成功!");

return true;

}

System.out.println("用户登录已失效");

return false;

}

/**

* 握手后

*/

@Override

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

System.out.println("握手完成");

}

}

4.3. Spring 配置

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.config.annotation.EnableWebSocket;

import org.springframework.web.socket.config.annotation.WebSocketConfigurer;

import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import DataLoader.ws.HttpAuthHandler;

import DataLoader.ws.MyInterceptor;

@Configuration

@EnableWebSocket

public class WebsocketConfig implements WebSocketConfigurer {

@Autowired

private HttpAuthHandler httpAuthHandler;

@Autowired

private MyInterceptor myInterceptor;

@Override

public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

registry

.addHandler(httpAuthHandler, "myWS")

.addInterceptors(myInterceptor)

.setAllowedOrigins("*");

}

}

4.4. 启动与测试

启动代码如下:

import org.springframework.boot.CommandLineRunner;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.ApplicationContext;

import org.springframework.context.annotation.Bean;

@SpringBootApplication

public class App {

public static void main(String[] args) {

SpringApplication.run(App.class, args);

}

@Bean

public CommandLineRunner commandLineRunner(ApplicationContext ctx) {

return args -> {

System.out.println("application started");

};

}

}

执行main方法启动应用程序

测试依然使用websocket在线调试工具

5. 使用TIO+SpringBoot实现websocket

以下是关于t-io的一些信息, 如果需要更详细的了解tio可以访问以下这些站点

sitlinkt-io sitehttps://www.t-io.orgt-io on giteehttps://gitee.com/tywo45/t-iot-io on githubhttps://github.com/tywo45/t-io

5.1. 添加相应依赖

gradle:

implementation 'org.t-io:tio-websocket-spring-boot-starter:3.6.0.v20200315-RELEASE'

maven:

org.t-io

tio-websocket-spring-boot-starter

3.6.0.v20200315-RELEASE

5.2. 编写消息处理类

import org.springframework.stereotype.Component;

import org.tio.core.ChannelContext;

import org.tio.http.common.HttpRequest;

import org.tio.http.common.HttpResponse;

import org.tio.websocket.common.WsRequest;

import org.tio.websocket.server.handler.IWsMsgHandler;

@Component

public class MyWebSocketMsgHandler implements IWsMsgHandler {

@Override

public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext)

throws Exception {

return httpResponse;

}

@Override

public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext)

throws Exception {

System.out.println("握手成功");

}

@Override

public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {

System.out.println("接收到bytes消息");

return null;

}

@Override

public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {

return null;

}

@Override

public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {

System.out.println("接收到文本消息:" + s);

return null;

}

}

5.3. 修改配置文件

application.properties

tio.websocket.server.port=9876

tio.websocket.server.heartbeat-timeout=60000

5.4. 启动tio Websocket Server

启动tio Websocket Server 的方式如下, 执行main方法.

@SpringBootApplication

@EnableTioWebSocketServer

public class App {

public static void main(String[] args) {

SpringApplication.run(App.class, args);

}

@Bean

public CommandLineRunner commandLineRunner(ApplicationContext ctx) {

return args -> {

System.out.println("application started");

};

}

}

6. STOMP实现websocket

6.1. 添加相应依赖

gradle:

implementation 'org.springframework.boot:spring-boot-starter-websocket' maven:

org.springframework.boot

spring-boot-starter-websocket

6.2. 实现配置WebSocketMessageBrokerConfigurer接口

WebSocketConfig.java

import org.springframework.context.annotation.Configuration;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override

public void registerStompEndpoints(StompEndpointRegistry registry) {

// 配置客户端尝试连接地址

registry.addEndpoint("/ws").setAllowedOrigins("*");

}

@Override

public void configureMessageBroker(MessageBrokerRegistry registry) {

// 设置广播节点

registry.enableSimpleBroker("/topic", "/queue");

// 客户端向服务端发送消息需有/app 前缀

registry.setApplicationDestinationPrefixes("/app");

// 指定用户发送(一对一)的前缀 /user/

registry.setUserDestinationPrefix("/user/");

}

}

6.3. 实现消息请求处理的Controller

WSController.java

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.handler.annotation.MessageMapping;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.messaging.simp.annotation.SendToUser;

import org.springframework.stereotype.Controller;

@Controller

public class WSController {

@Autowired

private SimpMessagingTemplate simpMessagingTemplate;

@MessageMapping("/greeting")

@SendToUser("/queue/serverReply")

public String greating(@Payload String data) {

System.out.println("received greeting: " + data);

String msg = "server replys: " + data;

return msg;

}

@MessageMapping("/shout")

public void userShout(Shout shout) {

//String name = principal.getName();

String message = shout.getMessage();

System.out.println("收到的消息是:" + message);

simpMessagingTemplate.convertAndSend("/queue/notifications", shout);

}

}

domain object

Shout.java

public class Shout {

private String message;

public String getMessage() {

return message;

}

public void setMessage(String message) {

this.message = message;

}

}

6.4. 启动

import org.springframework.boot.CommandLineRunner;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.ApplicationContext;

import org.springframework.context.annotation.Bean;

@SpringBootApplication

public class App {

public static void main(String[] args) {

SpringApplication.run(App.class, args);

}

@Bean

public CommandLineRunner commandLineRunner(ApplicationContext ctx) {

return args -> {

System.out.println("application started");

};

}

}

6.5. 实现消息客户端

index.html

Document