Storm DRPC【分布式RPC】概述

一、什么是RPC?

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

二、Storm DRPC

在了解了什么是RPC之后,所谓DRPC就是分布式RPC,Storm是一个分布式实时流处理框架,因此我们它是建立在Strom之上并行计算的RPC架构。Storm DRPC就是客户端发送请求,发送给Strom Topology,经过Storm处理后将客户端所请求的结果返回给客户端,这里我们可以理解为一般的RPC调用。

如下图是官网的例子:
这里写图片描述

当客户发送请求到DRPC Server时,DRPC Server会将请求的参数args,request-id(唯一标识)等信息发送给Topology,经过Topology处理之后,就会返回result给客户端。

如何编写DRPC API呢?很简单,只需要在原来Storm的程序上将TopologyBuilder改为LinearDRPCTopologyBuilder即可,下面通过实例来讲解。

Storm DRPC有两种模式,本地模式和远程模式。

本地模式

package cn.just.shinelon.RPC;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;public class StormLocalDRPC {public static class DRPCBolt extends BaseRichBolt{private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {Object requestId = input.getValue(0);String value = input.getString(1);String result = "add user "+value;System.out.println(result);collector.emit(new Values(requestId,result));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("requestId","result"));}}public static void main(String[] args) {//参数为方法名LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("addUser");builder.addBolt(new DRPCBolt());LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("StormLocalDRPC",new Config(),builder.createLocalTopology(drpc));//第一个参数为调用的方法名,第二个参数为方法参数String result=drpc.execute("addUser","shinelon");System.err.println("From Client "+result);cluster.shutdown();drpc.shutdown();}
}

远程模式

package cn.just.shinelon.RPC;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 远程DRPC测试*/
public class StormRemoteDRPC {public static class DRPCBolt extends BaseRichBolt{private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {Object requestId = input.getValue(0);String value = input.getString(1);String result = "add user "+value;System.out.println(result);collector.emit(new Values(requestId,result));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("requestId","result"));}}public static void main(String[] args) throws Exception{//参数为方法名LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("addUser");builder.addBolt(new DRPCBolt());StormSubmitter.submitTopology("StormRemoteDRPC",new Config(),builder.createRemoteTopology());}
}

服务端的代码如上所示。然后将项目打包,首先启动drpc服务,然后运行jar包,最后运行客户端。

#后台启动drpc服务
nohup sh bin/storm drpc &
# 运行服务端
bin/storm jar /opt/storm_project/jars/Storm-1.0-SNAPSHOT.jar cn.just.shinelon.RPC.StormRemoteDRPC

客户端代码如下所示:

package cn.just.shinelon.RPC;import org.apache.storm.Config;
import org.apache.storm.utils.DRPCClient;public class StormRemoteDRPCClient {public static void main(String[] args) throws Exception{Config config = new Config();config.put("storm.thrift.transport","org.apache.storm.security.auth.SimpleTransportPlugin");config.put(Config.STORM_NIMBUS_RETRY_TIMES,3);config.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10);config.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,20);config.put(Config.DRPC_MAX_BUFFER_SIZE,104576);//drpc服务端口为3772DRPCClient client = new DRPCClient(config,"hadoop-senior.shinelon.com",3772);String result = client.execute("addUser","shinelon");System.out.println("From Server "+result);}
}

最后运行结果如下:
这里写图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部