【Netty】四、Netty服务端推送消息到客户端实现消息个性化推送

Netty服务端推送消息到客户端

  • 一、 Netty服务端推送消息到客户端
    • 需求:
    • 应用场景
  • 二、服务端代码
    • PushServer
    • PushServerHandler
    • PushAckHandler
  • 三、服务端代码
    • PushClient
    • PushClientHandler
  • 四、测试
    • 客户端发送消息
    • 服务端接受消息

一、 Netty服务端推送消息到客户端

需求:

1、服务端从redis或数据库等存储层获取到要推送的消息;
2、服务端把获取到的消息主动推送给客户端(只要查询到了数据就推送给客户端);
3、如果有多个客户端连接到了服务端,要区分客户端,不同客户端发送不同的消息;
4、客户端接收到消息后给服务端一个应答;
5、服务端接收到应答之后,对消息状态进行修改,表示该消息已经处理;

应用场景

1、个性化推送,可以千人千面,女性用户可以推送化妆品优惠活动消息,男性用户可以推送电子科技类产品消息;
2、每个客户端(比如手机App),他们首先会登录,登录后有用户的id等唯一业务参数值,用户登录成功后,可以建立一个与netty服务端的长连接并向服务端写出一条信息,信息里面带上用户id等唯一业务参数值;
3、服务端收到客户端的用户id后,可以用一个全局的Map存放起来:
Map
Key: channelId, value: (key: userId, value: Channel)
4、然后服务端启动netty的定时任务,每隔多久从redis或数据库等存储层查询出业务数据,并把数据推送给对应的客户端;
5、如果连接channel断开了,把全局Map中的channel移除;

二、服务端代码

PushServer

import com.mytest.push.handler.PushAckHandler;
import com.mytest.push.handler.PushServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** 服务端向客户端推送消息* * 需求:* 1、服务端从redis或数据库等存储层获取到要推送的消息;* 2、服务端把获取到的消息主动推送给客户端(只要查询到了数据就推送给客户端);* 3、如果有多个客户端连接到了服务端,要区分客户端,不同客户端发送不同的消息;* 4、客户端接收到消息后给服务端一个应答;* 5、服务端接收到应答之后,对消息状态进行修改,表示该消息已经处理;*/
public class PushServer {private static final int PORT = 6868;public static void main(String[] args) {PushServer pushServer = new PushServer();pushServer.start();}private void start() {ServerBootstrap serverBootstrap = new ServerBootstrap();NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);NioEventLoopGroup workGroup = new NioEventLoopGroup();serverBootstrap.group(boosGroup, workGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {//ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));ch.pipeline().addLast(new ObjectEncoder());ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));ch.pipeline().addLast(PushServerHandler.INSTANCE);ch.pipeline().addLast(PushAckHandler.INSTANCE);}});ChannelFuture channelFuture = serverBootstrap.bind(PORT).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {System.out.println("Netty server start success!");} else {System.out.println("Netty server start fail!");}});try {channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}
}

PushServerHandler

import com.mytest.codec.LoginMessage;
import com.mytest.codec.PushAckMessage;
import com.mytest.codec.PushMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;@ChannelHandler.Sharable
public class PushServerHandler extends ChannelInboundHandlerAdapter {public static final PushServerHandler INSTANCE = new PushServerHandler();//key = channelID, value =(key:uid, value:channel)private static final Map<String, Map<String, Channel>> channelMap = new ConcurrentHashMap<String, Map<String, Channel>>();@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();String channelId = channel.id().asLongText();Map<String, Channel> uidMap = new ConcurrentHashMap<String, Channel>();//如果是 LoginMessageif (msg instanceof LoginMessage) {LoginMessage loginMessage = (LoginMessage) msg;//当前客户端登录的 用户id --> 当前的channeluidMap.put(loginMessage.getUid(), channel);channelMap.put(channelId, uidMap);// 每一条新连接,都是5秒之后发消息ctx.executor().scheduleAtFixedRate(() -> {String uid = loginMessage.getUid();//这样就是每个客户端发送每个客户端的消息Channel ch = channelMap.get(channelId).get(uid);if (ch != null) {//TODO 可以根据uid查询业务数据,然后把业务数据封装成消息推送给客户端 (业务查询省略)PushMessage pushMessage = new PushMessage();pushMessage.setMessageId(UUID.randomUUID().toString());pushMessage.setContent("尊敬的"+channelId+"童鞋,鉴于你2020年第二季度的卓越表现,公司奖励你1万元,于下月发放,期望再接再厉");pushMessage.setTimestamp(System.currentTimeMillis());pushMessage.setExt(String.valueOf(uid));ch.writeAndFlush(pushMessage);}System.out.println(new Date() + " 服务端推送消息ok,currentUid=" + uid + ", channelMap" + channelMap);}, 5, 5, TimeUnit.SECONDS);}//如果是 LoginMessageif (msg instanceof PushAckMessage) {PushAckMessage pushAck = (PushAckMessage) msg;System.out.println("服务端接收到客户端的确认消息:" + pushAck.getMessageId());}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {//空闲状态的事件if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.READER_IDLE)) {String channelId = ctx.channel().id().asLongText();channelMap.remove(channelId);// 心跳包丢失,10秒没有收到客户端心跳 (断开连接)ctx.channel().close().sync();System.out.println("已与 "+ctx.channel().remoteAddress()+" 断开连接");}}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asLongText();channelMap.remove(channelId);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.err.println(cause.getMessage());}
}

PushAckHandler

import com.mytest.codec.PushAckMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;@ChannelHandler.Sharable
public class PushAckHandler extends SimpleChannelInboundHandler<PushAckMessage> {public static final PushAckHandler INSTANCE = new PushAckHandler();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, PushAckMessage pushAck) {System.out.println("服务端接收到客户端的确认消息:" + pushAck.getMessageId());//TODO 更新消息状态 (业务处理,暂时省略)}
}

三、服务端代码

PushClient

import com.mytest.codec.LoginMessage;
import com.mytest.push.handler.PushClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.StringUtil;import java.util.Scanner;
import java.util.concurrent.TimeUnit;/*** 客户端接收服务端推送的消息* 需求:* 1、服务端从redis或数据库等存储层获取到要推送的消息;* 2、服务端把获取到的消息主动推送给客户端(只要查询到了数据就推送给客户端);* 3、如果有多个客户端连接到了服务端,要区分客户端,不同客户端发送不同的消息;* 4、客户端接收到消息后给服务端一个应答;* 5、服务端接收到应答之后,对消息状态进行修改,表示该消息已经处理;*/
public class PushClient {private static final String HOST = "127.0.0.1";private static final int PORT = 6868;public static void main(String[] args) throws Exception {//假设用户已经登录,登录消息LoginMessage loginMessage = new LoginMessage();loginMessage.setUid("1000");PushClient pushClient = new PushClient();pushClient.start(loginMessage);}private void start(LoginMessage loginMessage) throws Exception {Bootstrap bootstrap = new Bootstrap();NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {//ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));ch.pipeline().addLast(new ObjectEncoder());ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));ch.pipeline().addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS));ch.pipeline().addLast(PushClientHandler.INSTANCE);}});//连netty服务端ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();if (channelFuture.isSuccess()) {System.out.println("Connect netty server 成功, 请输入用户ID:");//写出登录信息Scanner scanner = new Scanner(System.in);for (;;) {String uid = scanner.nextLine();if (!StringUtil.isNullOrEmpty(uid)) {loginMessage.setUid(uid); //业务参数//向服务端写出loginMessage(或者理解成向服务端注册当前app客户端)channelFuture.channel().writeAndFlush(loginMessage);;break;}}}channelFuture.channel().closeFuture().sync();}
}

PushClientHandler

import com.mytest.codec.PushAckMessage;
import com.mytest.codec.PushMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;import java.util.Date;@ChannelHandler.Sharable
public class PushClientHandler extends SimpleChannelInboundHandler<PushMessage> {public static final PushClientHandler INSTANCE = new PushClientHandler();@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.channel().attr(AttributeKey.newInstance("uid")).set(100);//TODO 重连}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, PushMessage msg) {PushMessage message = (PushMessage) msg;System.out.println(new Date() + " 接收到的消息:" + message);//发送消息确认PushAckMessage pushAck = new PushAckMessage();pushAck.setMessageId(msg.getMessageId());pushAck.setTimestamp(System.currentTimeMillis());ctx.channel().writeAndFlush(pushAck);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("ping");}}}
}

四、测试

客户端发送消息

在这里插入图片描述

服务端接受消息

在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部