5分钟学会Java9的WebSocket 客户端api

初体验Java9的WebSocket 客户端api

最近开始学习高版本的JDK。花了点儿时间学习了jdk9中http和websocket的api。

虽然这俩是jdk9的东西,但是我本地学习环境实际上是JDK13,可能高版本的API有一些小的变化

以前如果需要java实现的websocket客户端。要么自己写(我没那么本事)实现协议,要么用第三方的框架。例如优秀牛逼的: Netty

谢天谢地的是jdk提供了这一系列的api。

主要的类库

WebSocket
ws的客户端接口,主要职责是消息发送,连接关闭,设置监听器

Builder
用于创建 WebSocket实例的Builder接口

Listener
用于监听连接事件,消息事件,异常事件,关闭事件的监听器接口

Builder & Listener 都是以内部接口的形式,定义在 WebSocket 类中的

源码 & 注释

package java.net.http;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public interface WebSocket {
    // 正常关闭的消息code
    int NORMAL_CLOSURE = 1000;

	// WebSocket的builder
    interface Builder {

		// 设置header
        Builder header(String name, String value);

		// 设置链接超时时间
        Builder connectTimeout(Duration timeout);
		
		// 设置子协议
        Builder subprotocols(String mostPreferred, String... lesserPreferred);

		// 异步的创建链接,并且设置监听器
        CompletableFuture<WebSocket> buildAsync(URI uri, Listener listener);
    }

	// 消息监听器
    interface Listener {
		
		// 监听连接就绪事件
        default void onOpen(WebSocket webSocket) { webSocket.request(1); }
		// 监听文本消息事件
        default CompletionStage<?> onText(WebSocket webSocket,
                                          CharSequence data,
                                          boolean last) {
            webSocket.request(1);
            return null;
        }
		// 监听二进制消息事件
        default CompletionStage<?> onBinary(WebSocket webSocket,
                                            ByteBuffer data,
                                            boolean last) {
            webSocket.request(1);
            return null;
        }
		// 监听心跳事件
        default CompletionStage<?> onPing(WebSocket webSocket,
                                          ByteBuffer message) {
            webSocket.request(1);
            return null;
        }
        default CompletionStage<?> onPong(WebSocket webSocket,
                                          ByteBuffer message) {
            webSocket.request(1);
            return null;
        }
		// 监听关闭事件
        default CompletionStage<?> onClose(WebSocket webSocket,
                                           int statusCode,
                                           String reason) {
            return null;
        }
		// 监听异常事件
        default void onError(WebSocket webSocket, Throwable error) { }
    }

	// 发送文本,二进制,心跳,关闭消息。都是异步的
    CompletableFuture<WebSocket> sendText(CharSequence data, boolean last);
    CompletableFuture<WebSocket> sendBinary(ByteBuffer data, boolean last);
    CompletableFuture<WebSocket> sendPing(ByteBuffer message);
    CompletableFuture<WebSocket> sendPong(ByteBuffer message);
    CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
	
	// 统计消息数量
    void request(long n);

    String getSubprotocol();
    boolean isOutputClosed();
    boolean isInputClosed();
    void abort();
}

看完了,是不是觉得贼简单? :grinning:

通过 HttpClient 创建 WebSocket 的 Builder

HttpClient,可以理解为一个浏览器

WebSocket.Builder builder = HttpClient.newBuilder()
			.build()  // 先通过 Builder 创建 HttpClient 对象
			.newWebSocketBuilder() // 调用对象的 newWebSocketBuilder 方法创建 WebSocket 的Builder

实例

使用 Netty 提供一个 WebSocket 的服务,客户端连接上后,循环发送消息。服务端收到消息后,转换到大写返回给客户端。

当服务端收到超过10条消息后,就给客户端返回 "bye"消息,由客户端主动来断开与服务端的链接。

客户端

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.net.http.WebSocket.Listener;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

/**
 * 
 * @author KevinBlandy
 *
 */
public class Client implements Test{
	
	public static void main(String[] args) throws Exception {
		
		// 执行线程池
		ExecutorService executor = Executors.newSingleThreadExecutor();
		
		// 创建监听器
		WebSocket.Listener listener = new Listener() {
			@Override
			public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
				String message = data.toString();
				System.out.println("收到服务器消息:" + message + ",是否是最后一帧:" + last);
				if (last) {
					// 完整的一条消息,才纳入消息数量统计
					webSocket.request(1);
				}
				if (message.equalsIgnoreCase("bye")) {
					// 收到服务端的 "byte", 客户端主动断开链接
					webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "byte").whenComplete((websocket, throable) -> {
						// 资源释放
						executor.shutdown();
						System.out.println("已经关闭和服务端的channel");
					});
				}
	            return null;
			}
		};
		
		// 创建websocket的builder
		HttpClient.newBuilder()
			.executor(executor)
			.build()
			.newWebSocketBuilder()
			.buildAsync(new URI("ws://localhost:1024/channel"), listener).whenComplete((webSocket, throable) -> {
				
				while (true) {
					webSocket.sendText("Hello: " + System.currentTimeMillis(), true);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		
	}
	
	public static void json(Object object) {
		System.out.println(JSON.toJSONString(object, SerializerFeature.PrettyFormat));
	}
}

服务端

import java.net.InetSocketAddress;
import java.util.Locale;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AttributeKey;
/**
 * 
 * websocket server
 * 
 * @author KevinBlandy
 *
 */
public class Server {
	
	static final String ENDPOINT = "/channel";
	
	public static void main(String[] args) throws InterruptedException {
		
		AttributeKey<Integer> count = AttributeKey.valueOf("count");
		
		EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
		EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap serverBootstrap = new ServerBootstrap();	
			serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup);
			serverBootstrap.channel(NioServerSocketChannel.class);
			serverBootstrap.localAddress(new InetSocketAddress("0.0.0.0", 1024));
			
			serverBootstrap.childHandler(new ChannelInitializer<>() {
				@Override
				protected void initChannel(Channel channel) throws Exception {
					ChannelPipeline pipeline = channel.pipeline();
					pipeline.addLast(new HttpServerCodec());
					pipeline.addLast(new ChunkedWriteHandler());
					pipeline.addLast(new HttpObjectAggregator(65536));
					pipeline.addLast(new ChannelInboundHandlerAdapter() {
						@Override
						public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
							if(msg instanceof FullHttpRequest) {
								FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
								String uri = fullHttpRequest.uri();
								if (!uri.equals(ENDPOINT)) {
									ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
										.addListener(ChannelFutureListener.CLOSE);
									return ;
								}
							}
							super.channelRead(ctx, msg);
						}
					});
					pipeline.addLast(new WebSocketServerCompressionHandler());
					pipeline.addLast(new WebSocketServerProtocolHandler(ENDPOINT, null, true));
					pipeline.addLast(new SimpleChannelInboundHandler<WebSocketFrame>() {
						@Override
						protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
							// 仅仅出力文本消息处理
							if (msg instanceof TextWebSocketFrame) { 
								
								String request = ((TextWebSocketFrame) msg).text();
								System.out.println("服收到消息:" + request);
								
								// 转换为大写后,写回
								ctx.channel().writeAndFlush(new TextWebSocketFrame(request.toUpperCase(Locale.CHINA)));
							} 
							
							Attribute<Integer> attribute = ctx.channel().attr(count);
							Integer val = attribute.get();
							if (val == null) {
								attribute.set(1);
							} else {
								if (val >= 10) {
									// 发送 bye 消息到客户端,由客户端主动关闭channel
									ctx.channel().writeAndFlush(new TextWebSocketFrame("bye"));
								} else {
									attribute.compareAndSet(val, val + 1);
								}
							}
						}
						
						@Override
						public void channelInactive(ChannelHandlerContext ctx) throws Exception {
							System.out.println("已经断开与客户端的channel");
							super.channelInactive(ctx);
						};
						
						@Override
						public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
							if(evt instanceof HandshakeComplete) {
								HandshakeComplete handshakeComplete = (HandshakeComplete) evt;
								String uri = handshakeComplete.requestUri();
								HttpHeaders httpHeaders = handshakeComplete.requestHeaders();
								String selectedSubprotocol = handshakeComplete.selectedSubprotocol();
								System.out.println("ws握手:uri=" + uri + ",headers=" + httpHeaders + ",subPortocol=" + selectedSubprotocol);
							}
							super.userEventTriggered(ctx, evt);
						}
					});
				}
			});
			Channel channel = serverBootstrap.bind(1024).sync().channel();
			channel.closeFuture().sync();
		} finally {
			bossEventLoopGroup.shutdownGracefully();
			workerEventLoopGroup.shutdownGracefully();
		}
	}
}

结果

客户端输出日志

收到服务器消息:HELLO: 1584259504015,是否是最后一帧:true
收到服务器消息:HELLO: 1584259505018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259506018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259507018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259508018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259509019,是否是最后一帧:true
收到服务器消息:HELLO: 1584259510020,是否是最后一帧:true
收到服务器消息:HELLO: 1584259511021,是否是最后一帧:true
收到服务器消息:HELLO: 1584259512022,是否是最后一帧:true
收到服务器消息:HELLO: 1584259513022,是否是最后一帧:true
收到服务器消息:HELLO: 1584259514023,是否是最后一帧:true
收到服务器消息:bye,是否是最后一帧:true
已经关闭和服务端的channel

服务端输出日志

ws握手:uri=/channel,headers=DefaultHttpHeaders[Connection: Upgrade, Content-Length: 0, Host: localhost:1024, Upgrade: websocket, User-Agent: Java-http-client/13.0.2, Sec-WebSocket-Key: HE3IjKqJTR72L2CpgyiVIg==, Sec-WebSocket-Version: 13],subPortocol=null
服收到消息:Hello: 1584259504015
服收到消息:Hello: 1584259505018
服收到消息:Hello: 1584259506018
服收到消息:Hello: 1584259507018
服收到消息:Hello: 1584259508018
服收到消息:Hello: 1584259509019
服收到消息:Hello: 1584259510020
服收到消息:Hello: 1584259511021
服收到消息:Hello: 1584259512022
服收到消息:Hello: 1584259513022
服收到消息:Hello: 1584259514023
已经断开与客户端的channel

2 Likes

学习了

可以,码了