netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》
前言介绍
Netty的性能非常好,在一些小型用户体量的socket服务内,仅部署单台机器就可以满足业务需求。但当遇到一些中大型用户体量的服务时,就需要考虑讲Netty按照集群方式部署,以更好的满足业务诉求。但Netty部署集群后都会遇到跨服务端怎么通信,也就是有集群服务X和Y,用户A链接服务X,用户B链接服务Y,那么他们都不在一个服务内怎么通信?本章节将介绍一种实现方式案例,以满足跨服务之间的用户通信。但实际场景中需要进行一些扩展性改造,案例仅将核心主干思路做以实现,只是一种思路指导,并不能直接使用于业务开发。
本章知识点
-
跨服务之间案例采用redis的发布和订阅进行传递消息,如果你是大型服务可以使用zookeeper
-
用户A在发送消息给用户B时候,需要传递B的channeId,以用于服务端进行查找channeId所属是否自己的服务内
-
单台机器也可以启动多个Netty服务,程序内会自动寻找可用端口
环境准备
1、jdk1.8【jdk1.7以下只能部分支持netty】
2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】
3、NetAssist 网络调试助手,可以从网上下载也可以联系我,微信公众号:bugstack虫洞栈 | 关注回复你的邮箱
4、redis服务端,案例采用windows版本,按需从官网下载即可
代码示例
itstack-demo-rpc-2-09
└── src└── main│ ├── java│ │ └── org.itstack.demo.netty│ │ ├── domain│ │ │ ├── EasyResult.java│ │ │ ├── MsgAgreement.java│ │ │ ├── ServerInfo.java│ │ │ └── UserChannelInfo.java│ │ ├── redis│ │ │ ├── config│ │ │ │ ├── PublisherConfig.java│ │ │ │ └── ReceiverConfig.java│ │ │ ├── AbstractReceiver.java│ │ │ ├── MsgAgreementReceiver.java│ │ │ ├── Publisher.java│ │ │ └── RedisUtil.java│ │ ├── server│ │ │ ├── MyChannelInitializer.java│ │ │ ├── MyServerHandler.java│ │ │ └── NettyServer.java│ │ ├── service│ │ │ └── ExtServerService.java│ │ ├── util│ │ │ ├── CacheUtil.java│ │ │ ├── MsgUtil.java│ │ │ └── NetUtil.java│ │ ├── web│ │ │ └── NettyController.java│ │ └── Application.java│ ├── resources│ │ └── application.yml│ └── webapp│ ├── res│ └── WEB-INF│ └── index.jsp└── test└── java└── org.itstack.demo.test└── ApiTest.java
演示讲解部分重点代码块,完整代码下载,关注公众号;bugstack虫洞栈 | 回复:netty案例源码
domain/MsgAgreement.java | 定义信息传输协议,这个看似简单但非常重要,每一个通信的根本就是定义传输协议信息
/*** 消息协议* 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {关注获取学习源码}* 虫洞群:①群5398358 ②群5360692* Create by fuzhengwei on 2019*/
public class MsgAgreement {private String toChannelId; //发送给某人,某人channelIdprivate String content; //消息内容public MsgAgreement() {}public MsgAgreement(String toChannelId, String content) {this.toChannelId = toChannelId;this.content = content;}public String getToChannelId() {return toChannelId;}public void setToChannelId(String toChannelId) {this.toChannelId = toChannelId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}
redis/config/PublisherConfig.java | redis消息发布者,集成与SpringBoot的配置方式
/*** 发布者* 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {获取学习源码}* 虫洞群:①群5398358 ②群5360692* Create by fuzhengwei on @2019*/
@Configuration
public class PublisherConfig {@Beanpublic RedisTemplate redisMessageTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class));return template;}}
redis/config/ReceiverConfig.java | redis消息的订阅者,集成与SpringBoot的配置方式。可以订阅多个主题,本章节只订阅一个。
/*** 订阅者* 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {获取学习源码}* 虫洞群:①群5398358 ②群5360692* Create by fuzhengwei on @2019*/
@Configuration
public class ReceiverConfig {@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter msgAgreementListenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(msgAgreementListenerAdapter, new PatternTopic("itstack-demo-netty-push-msgAgreement"));return container;}@Beanpublic MessageListenerAdapter msgAgreementListenerAdapter(MsgAgreementReceiver receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}}
redis/MsgAgreementReceiver.java | 实现抽象类用于接收订阅到的消息,接收消息后进行业务处理
/*** 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {获取学习源码}* 虫洞群:①群5398358 ②群5360692* Create by fuzhengwei on @2019*/
@Service
public class MsgAgreementReceiver extends AbstractReceiver {private Logger logger = LoggerFactory.getLogger(MsgAgreementReceiver.class);@Overridepublic void receiveMessage(Object message) {logger.info("接收到PUSH消息:{}", message);MsgAgreement msgAgreement = JSON.parseObject(message.toString(), MsgAgreement.class);String toChannelId = msgAgreement.getToChannelId();Channel channel = CacheUtil.cacheChannel.get(toChannelId);if (null == channel) return;channel.writeAndFlush(MsgUtil.obj2Json(msgAgreement));}}
redis/RedisUtil.java | redis操作工具类,帮助存储数据。以下是将链接到服务的用户信息存放到redis方便可以在每个服务端都能看到这份用户链接数据。
/*** 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {关注获取学习源码}* Create by fuzhengwei on 2019*/
@Service("redisUtil")
public class RedisUtil {@Autowiredprivate StringRedisTemplate redisTemplate;public void pushObj(UserChannelInfo userChannelInfo) {redisTemplate.opsForHash().put("itstack-demo-netty-2-09-user", userChannelInfo.getChannelId(), JSON.toJSONString(userChannelInfo));}public List popList() {List
server/MyServerHandler.java | 处理接收到的信息,尤其在channelRead中,将接受者不是本服务端的用户,进行全局push
/*** 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {关注获取学习源码}* Create by fuzhengwei on 2019*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);private ExtServerService extServerService;public MyServerHandler(ExtServerService extServerService) {this.extServerService = extServerService;}/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");//保存用户信息UserChannelInfo userChannelInfo = new UserChannelInfo(channel.localAddress().getHostString(), channel.localAddress().getPort(), channel.id().toString(), new Date());extServerService.getRedisUtil().pushObj(userChannelInfo);CacheUtil.cacheChannel.put(channel.id().toString(), channel);//通知客户端链接建立成功String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());extServerService.getRedisUtil().remove(ctx.channel().id().toString());CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object objMsgJsonStr) throws Exception {//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + objMsgJsonStr);MsgAgreement msgAgreement = MsgUtil.json2Obj(objMsgJsonStr.toString());String toChannelId = msgAgreement.getToChannelId();//判断接收消息用户是否在本服务端Channel channel = CacheUtil.cacheChannel.get(toChannelId);if (null != channel) {channel.writeAndFlush(MsgUtil.obj2Json(msgAgreement));return;}//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局logger.info("接收消息的用户不在本服务端,PUSH!");extServerService.push(msgAgreement);}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();extServerService.getRedisUtil().remove(ctx.channel().id().toString());CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());System.out.println("异常信息:\r\n" + cause.getMessage());}}
util/CacheUtil.java | 缓存必要信息,用于业务流程处理
/*** 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {关注获取学习源码}* 虫洞群:①群5398358 ②群5360692* Create by fuzhengwei on 2019*/
public class CacheUtil {// 缓存channelpublic static Map cacheChannel = Collections.synchronizedMap(new HashMap());// 缓存服务信息public static Map serverInfoMap = Collections.synchronizedMap(new HashMap());// 缓存服务端public static Map serverMap = Collections.synchronizedMap(new HashMap());}
web/NettyController.java | 接口处理控制类,方便我们操作服务端方法,包括启动Netty服务、获取用户信息等
/*** 虫洞栈:https://bugstack.cn* 公众号:bugstack虫洞栈 {获取学习源码}* 虫洞群:①群5398358 ②群5360692* Create by fuzhengwei on @2019*/
@Controller
public class NettyController {private Logger logger = LoggerFactory.getLogger(NettyController.class);//默认线程池private static ExecutorService executorService = Executors.newFixedThreadPool(2);@Value("${server.port}")private int serverPort;@Autowiredprivate ExtServerService extServerService;@Resourceprivate RedisUtil redisUtil;//Netty服务端private NettyServer nettyServer;@RequestMapping("/index")public String index(Model model) {model.addAttribute("serverPort", serverPort);return "index";}@RequestMapping("/openNettyServer")@ResponseBodypublic EasyResult openNettyServer() {try {int port = NetUtil.getPort();logger.info("启动Netty服务,获取可用端口:{}", port);nettyServer = new NettyServer(new InetSocketAddress(port), extServerService);Future future = executorService.submit(nettyServer);Channel channel = future.get();if (null == channel) {throw new RuntimeException("netty server open error channel is null");}while (!channel.isActive()) {logger.info("启动Netty服务,循环等待启动...");Thread.sleep(500);}CacheUtil.serverInfoMap.put(port, new ServerInfo(NetUtil.getHost(), port, new Date()));CacheUtil.serverMap.put(port, nettyServer);logger.info("启动Netty服务,完成:{}", channel.localAddress());return EasyResult.buildSuccessResult();} catch (Exception e) {logger.error("启动Netty服务失败", e);return EasyResult.buildErrResult(e);}}@RequestMapping("/closeNettyServer")@ResponseBodypublic EasyResult closeNettyServer(int port) {try {logger.info("关闭Netty服务开始,端口:{}", port);NettyServer nettyServer = CacheUtil.serverMap.get(port);if (null == nettyServer) {CacheUtil.serverMap.remove(port);return EasyResult.buildSuccessResult();}nettyServer.destroy();CacheUtil.serverMap.remove(port);CacheUtil.serverInfoMap.remove(port);logger.info("关闭Netty服务完成,端口:{}", port);return EasyResult.buildSuccessResult();} catch (Exception e) {logger.error("关闭Netty服务失败,端口:{}", port, e);return EasyResult.buildErrResult(e);}}@RequestMapping("/queryNettyServerList")@ResponseBodypublic Collection queryNettyServerList() {try {Collection serverInfos = CacheUtil.serverInfoMap.values();logger.info("查询服务端列表。{}", JSON.toJSONString(serverInfos));return serverInfos;} catch (Exception e) {logger.info("查询服务端列表失败。", e);return null;}}@RequestMapping("/queryUserChannelInfoList")@ResponseBodypublic List queryUserChannelInfoList() {try {logger.info("查询用户列表信息开始");List userChannelInfoList = redisUtil.popList();logger.info("查询用户列表信息完成。list:{}", JSON.toJSONString(userChannelInfoList));return userChannelInfoList;} catch (Exception e) {logger.error("查询用户列表信息失败", e);return null;}}}
resources/application.yml | 基础配置,在我们启动服务端的时候,如果只有一台机器模拟,那么需要改变server.port端口{8080、8081}
server:port: 8080spring:mvc:view:prefix: /WEB-INF/suffix: .jspredis:host: 127.0.0.1port: 6379
index.jap | 页面操作,控制和展示的一些内容
<%--虫洞栈:https://bugstack.cn公众号:bugstack虫洞栈 {获取学习源码}Create by fuzhengwei on 2019
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
关注公众号:bugstack虫洞栈 | 专题案例开发,关注取源码 | bugstack.cn 付政委
IP 端口 启动时间
IP 端口 用户ID 链接时间