soul网关系列(十四):divide插件源码解读
目录
- 一、divide插件概述
- 二、整体的处理流程
- 三、ip端口探活
- 四、负载均衡流程
- 五、小结
一、divide插件概述
divide插件定位是一个http代理插件,当请求头的rpcType为http的时候,并且插件开启的时候,它根据请求参数匹配到规则,然后进行响应式的代理调用。
divide插件是进行http正向代理,所有的http请求都由该插件进行负载均衡调用。具体的负载均衡策略在规则中指定。
二、整体的处理流程
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {String pluginName = named();//获取缓存final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);if (pluginData != null && pluginData.getEnabled()) {// 省略return doExecute(exchange, chain, selectorData, rule);}return chain.execute(exchange);}
根据昨天的分析,AbstractSoulPlugin插件责任链再获取selector和rule之后,进入具体插件的处理逻辑
在divide的doExecute打上断点
分析流程,比较清晰,
- 获取配置信息
- 探活
- 负载均衡选择一个服务器
- 获取最终的url
@Overrideprotected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;// 获取rule具体规则的信息,负载均衡,重试策略,超时参数final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);// 根据选择器id查服务列表,用作负载均衡(UpstreamCacheManager里做了ip端口探活)final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());if (CollectionUtils.isEmpty(upstreamList)) {// 可用服务列表为空log.error("divide upstream configuration error: {}", rule.toString());Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// 用负载均衡实现类选择当前具体的服务器信息final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);//如果负载均衡实现类计算出来的节点为空,则也报错if (Objects.isNull(divideUpstream)) {log.error("divide has no upstream");Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}//设置一下 http urlString domain = buildDomain(divideUpstream);String realURL = buildRealURL(domain, soulContext, exchange);exchange.getAttributes().put(Constants.HTTP_URL, realURL);//设置下超时时间exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());return chain.execute(exchange);}
具体的selector和rule


三、ip端口探活
divide的处理里有两个关键点,一个是ip端口探活,一个是负载均衡,探活这块的入口代码如下
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {log.error("divide upstream configuration error: {}", rule.toString());Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);return WebFluxResultUtils.result(exchange, error);
}
- 进入findUpstreamListBySelectorId方法可以看到,这里是从UPSTREAM_MAP的缓存里取值。
UPSTREAM_MAP的数据写入一共有两个地方
/*** Submit.** @param selectorData the selector data*/public void submit(final SelectorData selectorData) {// DivideUpstream包含ip 端口 url 权重信息final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);if (null != upstreamList && upstreamList.size() > 0) {UPSTREAM_MAP.put(selectorData.getId(), upstreamList);} else {UPSTREAM_MAP.remove(selectorData.getId());}}
- submit()具体调用是来自于DividePluginDataHandler的handlerSelector,这个在之前的分析里知道,这个主要是接收到soul-admin的同步数据后进行的数据操作(主要来源于服务注册的selector数据,就是之前springmvc/dubbo/sofa服务注册的方法),这里主要是被动更新,和主动探活关系不大。
private void scheduled() {if (UPSTREAM_MAP.size() > 0) {UPSTREAM_MAP.forEach((k, v) -> {// check方法是去根据url去检测注册服务端List<DivideUpstream> result = check(v);if (result.size() > 0) {UPSTREAM_MAP.put(k, result);} else {UPSTREAM_MAP.remove(k);}});}}
-
scheduled是个定时方法,调用方是UpstreamCacheManager构造方法,UpstreamCacheManager是个饿汉式的单例模式。
-
ScheduledThreadPoolExecutor定时任务是创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务从执行到结束所需要的时间+给定的间隔时间。这里默认30秒执行一次。
public final class UpstreamCacheManager {private static final UpstreamCacheManager INSTANCE = new UpstreamCacheManager();private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP = Maps.newConcurrentMap();private UpstreamCacheManager() {boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));if (check) {new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false)).scheduleWithFixedDelay(this::scheduled,30, Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS);}}
- 主要是轮询的去check,然后更新缓存的值
- 调用check()方法
- check方法里循环去checkurl()的服务注册方
public static boolean checkUrl(final String url) {if (StringUtils.isBlank(url)) {return false;}// 检测ipif (checkIP(url)) {String[] hostPort;if (url.startsWith(HTTP)) {final String[] http = StringUtils.split(url, "\\/\\/");hostPort = StringUtils.split(http[1], Constants.COLONS);} else {hostPort = StringUtils.split(url, Constants.COLONS);}//是ip的话 根据ip和端口socket通信探活return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1]));} else {//是域名的话,使用java自带的方法去探活InetAddress.getByName(host).isReachable(1000);return isHostReachable(url);}}
整体看下来,这边divide插件去拿到缓存里的服务器信息,服务器信息来源于admin端的同步或者主动去探活更新。
四、负载均衡流程
负载均衡的起点
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {log.error("divide has no upstream");Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);return WebFluxResultUtils.result(exchange, error);
}

可以看出负载均衡的策略为random
这里来源于内置的配置属性
public static final LoadBalanceEnum DEFAULT_LOAD_BALANCE = LoadBalanceEnum.RANDOM;
soul网关里默认支持三种负载均衡策略
- HASH(需要计算,可能存在不均衡的情况)
- RANDOM(最简单最快,大量请求下几乎平均)
- ROUND_ROBIN(需要记录状态,有一定的影响,大数据量下随机和轮询并无太大结果上的差异)
random的随机算法计算如下
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {// 总个数int length = upstreamList.size();// 总权重int totalWeight = 0;// 权重是否都一样boolean sameWeight = true;for (int i = 0; i < length; i++) {int weight = upstreamList.get(i).getWeight();// 累计总权重totalWeight += weight;if (sameWeight && i > 0&& weight != upstreamList.get(i - 1).getWeight()) {// 计算所有权重是否一样sameWeight = false;}}if (totalWeight > 0 && !sameWeight) {// 如果权重不相同且权重大于0则按总权重数随机int offset = RANDOM.nextInt(totalWeight);// 并确定随机值落在哪个片断上for (DivideUpstream divideUpstream : upstreamList) {offset -= divideUpstream.getWeight();if (offset < 0) {return divideUpstream;}}}// 如果权重相同或权重为0则均等随机return upstreamList.get(RANDOM.nextInt(length));}
五、小结
divide插件源码的整体流程
- 探活
- 去获取可用服务信息列表
- 服务信息列表来源于soul-admin同步
- 服务信息列表会根据每30秒的检测结果进行更新
- 负载均衡
- 得到服务信息列表
- 选择默认的负载均衡策略
- 具体执行random的负责均衡策略
- 返回一个最终选择的服务信息
- 拼装最终的的url信息
- 请求服务信息进行,得到结果后返回
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
