初体验Java9的WebSocket 客户端api
最近开始学习高版本的JDK。花了点儿时间学习了jdk9中http和websocket的api。
虽然这俩是jdk9的东西,但是我本地学习环境实际上是JDK13,可能高版本的API有一些小的变化
以前如果需要java实现的websocket客户端。要么自己写(我没那么本事)实现协议,要么用第三方的框架。例如优秀牛逼的: Netty
谢天谢地的是jdk提供了这一系列的api。
主要的类库
WebSocket
ws的客户端接口,主要职责是消息发送,连接关闭,设置监听器
Builder
用于创建 WebSocket
实例的Builder接口
Listener
用于监听连接事件,消息事件,异常事件,关闭事件的监听器接口
Builder & Listener 都是以内部接口的形式,定义在 WebSocket 类中的
源码 & 注释
package java.net.http;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public interface WebSocket {
// 正常关闭的消息code
int NORMAL_CLOSURE = 1000;
// WebSocket的builder
interface Builder {
// 设置header
Builder header(String name, String value);
// 设置链接超时时间
Builder connectTimeout(Duration timeout);
// 设置子协议
Builder subprotocols(String mostPreferred, String... lesserPreferred);
// 异步的创建链接,并且设置监听器
CompletableFuture<WebSocket> buildAsync(URI uri, Listener listener);
}
// 消息监听器
interface Listener {
// 监听连接就绪事件
default void onOpen(WebSocket webSocket) { webSocket.request(1); }
// 监听文本消息事件
default CompletionStage<?> onText(WebSocket webSocket,
CharSequence data,
boolean last) {
webSocket.request(1);
return null;
}
// 监听二进制消息事件
default CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer data,
boolean last) {
webSocket.request(1);
return null;
}
// 监听心跳事件
default CompletionStage<?> onPing(WebSocket webSocket,
ByteBuffer message) {
webSocket.request(1);
return null;
}
default CompletionStage<?> onPong(WebSocket webSocket,
ByteBuffer message) {
webSocket.request(1);
return null;
}
// 监听关闭事件
default CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
return null;
}
// 监听异常事件
default void onError(WebSocket webSocket, Throwable error) { }
}
// 发送文本,二进制,心跳,关闭消息。都是异步的
CompletableFuture<WebSocket> sendText(CharSequence data, boolean last);
CompletableFuture<WebSocket> sendBinary(ByteBuffer data, boolean last);
CompletableFuture<WebSocket> sendPing(ByteBuffer message);
CompletableFuture<WebSocket> sendPong(ByteBuffer message);
CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
// 统计消息数量
void request(long n);
String getSubprotocol();
boolean isOutputClosed();
boolean isInputClosed();
void abort();
}
看完了,是不是觉得贼简单?
通过 HttpClient 创建 WebSocket 的 Builder
HttpClient,可以理解为一个浏览器
WebSocket.Builder builder = HttpClient.newBuilder()
.build() // 先通过 Builder 创建 HttpClient 对象
.newWebSocketBuilder() // 调用对象的 newWebSocketBuilder 方法创建 WebSocket 的Builder
实例
使用 Netty 提供一个 WebSocket 的服务,客户端连接上后,循环发送消息。服务端收到消息后,转换到大写返回给客户端。
当服务端收到超过10条消息后,就给客户端返回 "bye"消息,由客户端主动来断开与服务端的链接。
客户端
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.net.http.WebSocket.Listener;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
/**
*
* @author KevinBlandy
*
*/
public class Client implements Test{
public static void main(String[] args) throws Exception {
// 执行线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建监听器
WebSocket.Listener listener = new Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
String message = data.toString();
System.out.println("收到服务器消息:" + message + ",是否是最后一帧:" + last);
if (last) {
// 完整的一条消息,才纳入消息数量统计
webSocket.request(1);
}
if (message.equalsIgnoreCase("bye")) {
// 收到服务端的 "byte", 客户端主动断开链接
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "byte").whenComplete((websocket, throable) -> {
// 资源释放
executor.shutdown();
System.out.println("已经关闭和服务端的channel");
});
}
return null;
}
};
// 创建websocket的builder
HttpClient.newBuilder()
.executor(executor)
.build()
.newWebSocketBuilder()
.buildAsync(new URI("ws://localhost:1024/channel"), listener).whenComplete((webSocket, throable) -> {
while (true) {
webSocket.sendText("Hello: " + System.currentTimeMillis(), true);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public static void json(Object object) {
System.out.println(JSON.toJSONString(object, SerializerFeature.PrettyFormat));
}
}
服务端
import java.net.InetSocketAddress;
import java.util.Locale;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AttributeKey;
/**
*
* websocket server
*
* @author KevinBlandy
*
*/
public class Server {
static final String ENDPOINT = "/channel";
public static void main(String[] args) throws InterruptedException {
AttributeKey<Integer> count = AttributeKey.valueOf("count");
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.localAddress(new InetSocketAddress("0.0.0.0", 1024));
serverBootstrap.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri();
if (!uri.equals(ENDPOINT)) {
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
.addListener(ChannelFutureListener.CLOSE);
return ;
}
}
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(ENDPOINT, null, true));
pipeline.addLast(new SimpleChannelInboundHandler<WebSocketFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
// 仅仅出力文本消息处理
if (msg instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) msg).text();
System.out.println("服收到消息:" + request);
// 转换为大写后,写回
ctx.channel().writeAndFlush(new TextWebSocketFrame(request.toUpperCase(Locale.CHINA)));
}
Attribute<Integer> attribute = ctx.channel().attr(count);
Integer val = attribute.get();
if (val == null) {
attribute.set(1);
} else {
if (val >= 10) {
// 发送 bye 消息到客户端,由客户端主动关闭channel
ctx.channel().writeAndFlush(new TextWebSocketFrame("bye"));
} else {
attribute.compareAndSet(val, val + 1);
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("已经断开与客户端的channel");
super.channelInactive(ctx);
};
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof HandshakeComplete) {
HandshakeComplete handshakeComplete = (HandshakeComplete) evt;
String uri = handshakeComplete.requestUri();
HttpHeaders httpHeaders = handshakeComplete.requestHeaders();
String selectedSubprotocol = handshakeComplete.selectedSubprotocol();
System.out.println("ws握手:uri=" + uri + ",headers=" + httpHeaders + ",subPortocol=" + selectedSubprotocol);
}
super.userEventTriggered(ctx, evt);
}
});
}
});
Channel channel = serverBootstrap.bind(1024).sync().channel();
channel.closeFuture().sync();
} finally {
bossEventLoopGroup.shutdownGracefully();
workerEventLoopGroup.shutdownGracefully();
}
}
}
结果
客户端输出日志
收到服务器消息:HELLO: 1584259504015,是否是最后一帧:true
收到服务器消息:HELLO: 1584259505018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259506018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259507018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259508018,是否是最后一帧:true
收到服务器消息:HELLO: 1584259509019,是否是最后一帧:true
收到服务器消息:HELLO: 1584259510020,是否是最后一帧:true
收到服务器消息:HELLO: 1584259511021,是否是最后一帧:true
收到服务器消息:HELLO: 1584259512022,是否是最后一帧:true
收到服务器消息:HELLO: 1584259513022,是否是最后一帧:true
收到服务器消息:HELLO: 1584259514023,是否是最后一帧:true
收到服务器消息:bye,是否是最后一帧:true
已经关闭和服务端的channel
服务端输出日志
ws握手:uri=/channel,headers=DefaultHttpHeaders[Connection: Upgrade, Content-Length: 0, Host: localhost:1024, Upgrade: websocket, User-Agent: Java-http-client/13.0.2, Sec-WebSocket-Key: HE3IjKqJTR72L2CpgyiVIg==, Sec-WebSocket-Version: 13],subPortocol=null
服收到消息:Hello: 1584259504015
服收到消息:Hello: 1584259505018
服收到消息:Hello: 1584259506018
服收到消息:Hello: 1584259507018
服收到消息:Hello: 1584259508018
服收到消息:Hello: 1584259509019
服收到消息:Hello: 1584259510020
服收到消息:Hello: 1584259511021
服收到消息:Hello: 1584259512022
服收到消息:Hello: 1584259513022
服收到消息:Hello: 1584259514023
已经断开与客户端的channel