结合redis订阅和发布消息,解决websocket多节点问题
结合redis订阅和发布消息,解决websocket多节点问题
(最近发现项目多节点下redis消息会丢失,建议改成用mq广播模式推送,保证数据)
单节点和多节点下,websocket会出现什么问题呢?看如下两个对比图:


这时候你会发现有部分连接在ws node2节点上用户收不到消息推送。而且websocket的session是无法共享的,加上session是有序无法存入到redis缓存中。
了解大概内容后,直接进入到代码模块
1.相关依赖
@Configuration
public class RedisConfig{/*** 使CacheComponent的redisTemplate组件的key使用StringRedisSerializer而非默认的JdkSerializationRedisSerializer* 避免key出现字节码的情况* @param factory redis链接* @return RedisTemplate*/@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory factory) {RedisTemplate redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(factory);RedisSerializer redisSerializer = new StringRedisSerializer();Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);//key使用StringRedisSerializer序列化redisTemplate.setKeySerializer(redisSerializer);//value使用jackson2JsonRedisSerializer序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);return redisTemplate;}/*** 创建Redis消息监听者容器* @param factory* @return*/@Bean("redisMessageListenerContainer")public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);return container;}
}
3.新建消息订阅监听类
import com.opp.util.CommonUtil;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;import javax.websocket.Session;
import java.io.IOException;/*** @author: huangnenghuan* @create: 2019/12/03* @description: 创建消息订阅监听者类**/
@Component
public class RedisMessageListener implements MessageListener {private org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RedisMessageListener.class);/*** websocket客户端连接会话对象*/private Session session;public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}@Overridepublic void onMessage(Message message, byte[] bytes) {String msg = new String(message.getBody()).replace("\"", "");if(CommonUtil.isNotNull(session) && session.isOpen()){try {session.getBasicRemote().sendText(msg);}catch (IOException e){logger.error("RedisSubListener消息订阅监听异常:" + e.getMessage());}}}
}
4.对websocket进行修改
cacheComponent.sendMessage("liveBroadcast", "Barrage-" + String.valueOf(obj));
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
