仿淘宝开放平台之消息服务——客户端处理链条设计与实现
使用netty框架实现websocket客户端。
启动框架
相比服务端标准模式而言,客户端启动类有比较多需要注意的地方,关键的逻辑有两个:
一是客户端需要处理自动重连,这里实际是两种情况,一种是客户端刚启动的时候,尝试去连接服务端,如不成功,则休眠5秒后再次重试;另外一种是出现异常时,包括原先建立连接、正常通信的情况下因为各种原因导致通道失效、心跳异常、服务端退出等,都会自动尝试重连,这样可以确保出现问题时无需系统管理员手工干预,自动重连来恢复运行。
二是连接成功后,要发起WebSocket的握手操作,将http协议升级为websocket协议,关键在于自实现的WebSocketClientHandshakerHandler处理器。
@Slf4j
@Component
public class MessageClient {@Autowiredprivate MessageClientGlobalHolder config;@Autowiredprivate MessageClientChannelInitializer messageClientChannelInitializer;/*** 启动客户端方法*/public void start() {EventLoopGroup workerGroup = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(messageClientChannelInitializer);//客户端与服务端连接的通道,final修饰表示只会有一个ChannelFuture channelFuture = bootstrap.connect(config.getHost(), config.getPort());channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {//未成功log.error("连接失败", future.cause());//执行重连reconnect();} else {log.info("连接成功");Channel channel = future.channel();//将channel保存到全局变量config.setChannel(channel);//发起握手WebSocketClientHandshakerHandler handler = (WebSocketClientHandshakerHandler) channel.pipeline().get("hookedHandler");handler.handshake(config.getChannel());}}});//等待服务器端关闭channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("消息客户端启动失败:{}" + e.getMessage(), e);//执行重连reconnect();} finally {workerGroup.shutdownGracefully();}}/*** 重连*/public void reconnect() {try {Thread.sleep(5000);//执行重连log.info("消息客户端进行重连");start();} catch (InterruptedException e) {log.error("消息客户端重连过程中线程休眠失败", e);}}}
处理器配置
消息处理器的装配与实现关键实现
/*** 初始化通道** @author wqliu* @date 2021-2-5 15:12*/
@Slf4j
@Component
public class MessageClientChannelInitializer extends ChannelInitializer<SocketChannel> {@Autowiredprivate MessageClientGlobalHolder config;@Autowiredprivate Environment environment;/*** 生产运行模式*/private final String PRD_MODE="prd";/*** 初始化channel*/@Overridepublic void initChannel(SocketChannel socketChannel) throws Exception {//获取通道链路ChannelPipeline pipeline = socketChannel.pipeline();//仅在生产模式下加载ssl过滤器String mode=environment.getProperty("spring.profiles.active");if(PRD_MODE.equals(mode)){//sslSSLContext sslContext = createSslContext();SSLEngine engine = sslContext.createSSLEngine();engine.setNeedClientAuth(false);engine.setUseClientMode(false);pipeline.addLast(new SslHandler(engine));}//HTTP 编解码pipeline.addLast(new HttpClientCodec());// 聚合为单个 FullHttpRequest 或者 FullHttpResponsepipeline.addLast(new HttpObjectAggregator(64 * 1024));/*** 注意,因WebSocketClientHandshakerHandler继承自SimpleChannelInboundHandler,会自动释放消息* 对于收到服务端的pong消息,默认情况下不会往通道后续的处理器传递,所以若放到WebSocketClientHandshakerHandler之后,* 则会产生读空闲,导致心跳超时失效。*/// 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,0, TimeUnit.SECONDS));//心跳超时处理pipeline.addLast(new HeartbeatTimeoutHandler());//处理web socket协议与握手pipeline.addLast("hookedHandler", new WebSocketClientHandshakerHandler());//心跳发送pipeline.addLast(new HeartbeatRequestHandler(config.getHeartbeatRate()));//将文本按消息类型转换为请求消息或响应消息pipeline.addLast(new MessageTypeDecodeHandler());//请求消息业务逻辑处理器pipeline.addLast(new RequestMessageBusinessHandler());//响应消息业务逻辑处理器pipeline.addLast(new ResponseMessageBusinessHandler());//编码为TextWebSocketFramepipeline.addLast(new TextWebSocketFrameEncodeHandler());//json序列化pipeline.addLast(new JsonEncodeHandler());}/*** 创建ssl上下文对象* @param type* @param path* @param password* @return* @throws Exception*/public SSLContext createSslContext() throws Exception {//读取配置信息String path=environment.getProperty("server.ssl.key-store");log.info("证书地址:{}",path);String password=environment.getProperty("server.ssl.key-store-password");String type=environment.getProperty("server.ssl.key-store-type");//构建证书上下文对象KeyStore ks = KeyStore.getInstance(type);path=path.replace("classpath:","");log.info("处理后的证书地址:{}",path);ClassPathResource resource = new ClassPathResource(path);InputStream ksInputStream = resource.getInputStream();ks.load(ksInputStream, password.toCharArray());//KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());kmf.init(ks, password.toCharArray());//SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。SSLContext sslContext = SSLContext.getInstance("TLS");sslContext.init(kmf.getKeyManagers(), null, null);return sslContext;}}
处理器清单
一共涉及到12个消息处理器,其中4个是netty内置的,只是进行了参数配置,其他8个是自己实现的,用于处理逻辑和数据的,依次如下:
| 序号 | 处理器类型 | 职责 | 实现 | 说明 |
|---|---|---|---|---|
| 1 | SslHandler | 处理可靠安全连接 | 内置 | 仅在生产环境,需要进行ssl加解密 |
| 2 | HttpClientCodec | HTTP 编解码 | 内置 | 对Http请求进行解码与编码 |
| 3 | HttpObjectAggregator | 聚合HTTP 请求或响应 | 内置 | 将http请求或响应聚合为一个完整对象 |
| 4 | IdleStateHandler | 空闲监测 | 内置 | 监测空闲状态,触发后续超时处理 |
| 5 | HeartbeatTimeoutHandler | 心跳超时处理 | 自定义 | 心跳超时执行关闭连接,触发重连 |
| 6 | WebSocketClientHandshakerHandler | WebSocket专用处理 | 自定义 | 处理WebSocket的握手以及Ping、Pong、Close消息 |
| 7 | HeartbeatRequestHandler | 发送心跳请求 | 自定义 | 客户端向服务端定时发送心跳 |
| 8 | MessageTypeDecodeHandler | 文本反序列化成消息对象 | 自定义 | 将文本按消息类型转换为请求消息或响应消息 |
| 9 | RequestMessageBusinessHandler | 处理请求消息 | 自定义 | 请求消息业务逻辑处理器 |
| 10 | ResponseMessageBusinessHandler | 处理响应消息 | 自定义 | 响应消息业务逻辑处理器 |
| 11 | TextWebSocketFrameEncodeHandler | JSON格式转文本帧 | 自定义 | 将json格式字符串编码为TextWebSocketFrame |
| 12 | JsonEncodeHandler | 对象序列化为JSON字符串 | 自定义 | 将对象序列化为json格式字符串 |
自定义的8个处理器中,最后两个11和12是出站处理器,注意实际执行顺序是先12后11,也就是,业务逻辑处理器9或10的处理结果是一个对象,先由出站处理器12将其序列化为json字符串,然后再由出站处理器11将其包装为一个WebSocket协议约定的文本帧TextWebSocketFrame。
自定义处理器
心跳超时处理
这个处理器实际是配合netty内置的空闲检测处理器IdleStateHandler使用的,只有满足IdleStateHandler中设置的触发条件,才会触发本处理器中的userEventTriggered方法,执行自定义的逻辑操作,这里是主动关闭连接。
客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。
/*** 心跳超时处理器* @author wqliu* @date 2021-10-2 14:25**/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.READER_IDLE)) {log.info("读空闲");//关闭连接ctx.channel().close();}} else {//非空闲事件,传递到下个处理器super.userEventTriggered(ctx, evt);}}}
WebSocket专用处理
这是很关键的一个处理器,自身也比较复杂。
主要实现是借助netty提供的一个WebSocketClientHandshaker类,在初始化时设置websocket服务端连接信息,然后在客户端启动时,调用该类的发起握手方法handshake,服务器端收到该握手请求后,会进行后续处理,响应一个协议升级,将http协议升级为WebSocket协议。
同时需要注意的是,这里还有一个我们自定义的操作,即在握手成功,协议升级后,客户端发出一个登录服务端的请求消息。
/*** 处理web socket握手** @author wqliu* @date 2021-9-28 16:33**/
@Slf4j
@Data
public class WebSocketClientHandshakerHandler extends SimpleChannelInboundHandler<Object> {/*** 握手*/private WebSocketClientHandshaker handshaker;/*** 握手 异步处理*/private ChannelPromise handshakeFuture;public WebSocketClientHandshakerHandler() {//初始化握手处理者MessageClientGlobalHolder config = SpringUtil.getBean(MessageClientGlobalHolder.class);URI webSocketUri = null;try {webSocketUri = new URI(config.getWebSocketUrl());} catch (URISyntaxException e) {log.error("解析远程服务器地址出错", e);}WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketUri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders());this.setHandshaker(handshaker);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// log.info("收到消息:{}",msg.toString());Channel ch = ctx.channel();FullHttpResponse response;//进行握手操作if (!this.handshaker.isHandshakeComplete()) {try {response = (FullHttpResponse) msg;//握手协议返回,设置结束握手this.handshaker.finishHandshake(ch, response);//设置成功this.handshakeFuture.setSuccess();} catch (WebSocketHandshakeException var7) {//已握手成功并将http协议升级为了WebSocket协议,不应再收到Http消息,发生这种情况则抛出异常FullHttpResponse res = (FullHttpResponse) msg;String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));this.handshakeFuture.setFailure(new Exception(errorMsg));}} else if (msg instanceof FullHttpResponse) {response = (FullHttpResponse) msg;throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');} else if (msg instanceof CloseWebSocketFrame) {log.info("收到关闭信息");} else if (msg instanceof TextWebSocketFrame) {log.info("收到文本帧,往下传递");ReferenceCountUtil.retain(msg);ctx.fireChannelRead(msg);}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {//设置握手成功后,发起登录请求this.handshakeFuture = ctx.newPromise();ChannelFuture handshakeFuture = this.handshakeFuture;handshakeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {//发送登录请求log.info("握手成功");Login login = new Login();login.sendMessage();} else {//握手失败log.error("握手失败", future.cause());}}});}/*** 发起握手*/public void handshake(Channel channel) {this.getHandshaker().handshake(channel);}}
发送心跳请求
心跳机制是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame
/*** 心跳请求处理器* @author wqliu* @date 2021-10-2 13:24**/
@Slf4j
public class HeartbeatRequestHandler extends ChannelInboundHandlerAdapter {/*** 心跳发送间隔,单位秒*/private int heartbeatInterval=5;public HeartbeatRequestHandler(int heartbeatInterval){this.heartbeatInterval=heartbeatInterval;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {EventLoop eventLoop = ctx.channel().eventLoop();eventLoop.scheduleWithFixedDelay(new Runnable() {private Channel channel;@Overridepublic void run() {// log.info("发送心跳");PingWebSocketFrame frame=new PingWebSocketFrame();ChannelFuture channelFuture = channel.writeAndFlush(frame);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// log.error(future.isSuccess()+"",future.cause());}});}public Runnable setChannel(Channel channel){this.channel=channel;return this;}}.setChannel(ctx.channel()),15,heartbeatInterval, TimeUnit.SECONDS);//不调用父类方法,则其他处理器的channelActive事件不再触发super.channelActive(ctx);}
}
文本反序列化成消息对象
我将消息设计为两类,请求消息和响应消息,这里通过自己实现的一个处理器,将客户端传来的文本帧,通过消息类型属性反序列化成请求消息对象或响应消息对象,这里调用的是公用的处理器,即服务端也使用相同的处理器。
/*** 消息类型解码* @author wqliu* @date 2021-10-6 11:23**/
public class MessageTypeDecodeHandler extends MessageToMessageDecoder<TextWebSocketFrame> {@Overrideprotected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {String message=msg.text();//消息解析JSONObject jsonObject = JSONObject.parseObject(message);String messageType = jsonObject.getString("messageType");if (messageType.equals(MessageType.REQUEST.name())) {RequestMessage requestMessage = JSON.parseObject(message, RequestMessage.class);out.add(requestMessage);}else if (messageType.equals(MessageType.RESPONSE.name())) {ResponseMessage responseMessage = JSON.parseObject(message, ResponseMessage.class);out.add(responseMessage);}}}
请求/响应消息处理器
上一步把消息内容通过解码形成了请求消息或响应消息,而这两个处理器只需加入到链条中即可,根据传入的消息类型,也就是泛型参数类型,会自动识别处理或者往下传递。
JSON格式转文本帧
这个其实也没什么好说的,就是把JSON格式字符串放到文本帧中,这里调用的是公用的处理器,即服务端也使用相同的处理器。
/*** 将json格式字符串编码为TextWebSocketFrame* @author wqliu* @date 2021-10-6 11:23**/public class TextWebSocketFrameEncodeHandler extends MessageToMessageEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {TextWebSocketFrame frame=new TextWebSocketFrame(msg);out.add(frame);}}
对象序列化为JSON字符串
这个其实也没什么好说的,就是把对象转换为JSON格式字符串,这里调用的是公用的处理器,即服务端也使用相同的处理器。
/*** 将对象序列化为json格式字符串* @author wqliu* @date 2021-10-6 11:23**/
public class JsonEncodeHandler extends MessageToMessageEncoder<BaseMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, BaseMessage msg, List<Object> out) throws Exception {if(msg instanceof BaseMessage) {out.add(JSONObject.toJSONString(msg));}else{out.add(msg);}}}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
