原创

Storm DRPC实战应用分析

        DRPC ,Distributed Remote Procedure Call RPC本身是个成熟和古老的概念, Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算 DRPC, 只是storm应用的一个场景, 并且storm提供相应的编程框架 ...


DRPC ,Distributed Remote Procedure Call      
RPC本身是个成熟和古老的概念, Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算

  
DRPC, 只是storm应用的一个场景, 并且storm提供相应的编程框架, 以方便程序员

  
提供DRPC server的实现, 并提供对DRPC场景经行封装的Topology  
对于storm, Topology内部其实是没有任何区别的, 主要就是实现和封装Topology和DRPC Server之间交互的spout和bolt  
具体交互过程如下图,

08173349-300dff1eb01945db9d2ecd91d0a75825.png

LinearDRPCTopologyBuilder

  
Storm封装了这样一个Topology实现LinearDRPCTopologyBuilder的topology builder

  
会使用DRPCSpout从DRPC服务器接收函数调用流, 每个函数调用被DRPC服务器标记了一个唯一的id.       
这个topology然后计算结果, 在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器, 并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识).       
DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它.

   ExclaimBolt  IBasicBolt {
      prepare(Map conf, TopologyContext context) {
    }
 
      execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit( Values(tuple.getValue(0), input + ""));
    }
 
      cleanup() {
    }
 
      declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare( Fields("", ""));
    }
 
}
 
   main(String[] args)  Exception {
    LinearDRPCTopologyBuilder builder
        =  LinearDRPCTopologyBuilder("");
    builder.addBolt( ExclaimBolt(), 3);
    
}


使用LinearDRPCTopologyBuilder

首先, 使用builder创建topology的时候, topology name要等于function name(PRC call)     
其次, 这里不需要指定spout, 因为已经做了封装, 会自动从DRPC server读取数据     
再者, 所有bolt的输出的第一个field必须是request-id, 最后一般bolt的输出一定是(request-id, result)     
最后, builder会自动将结果发给DRPC server


A more complex example, ReachTopology

前面的例子, 无法说明为什么要使用storm来实现RPC, 那个操作直接在RPC server上就可以完成     
当只有对并行计算和数据量有一定要求时, 才能体现出价值...     
ReachTopology, 每个人发送的URL都会被所有follower收到, 所以要计算某URL的reach, 只需要如下几步,    
找出所有发送该URL的tweeter, 取出他们的follower, 去重复, 计数

Let's look at a more complex example which really needs the parallelism a Storm cluster provides for computing the DRPC function.   
The example we'll look at is computing the reach of a URL on Twitter.

The reach of a URL is the number of unique people exposed to a URL on Twitter. To compute reach, you need to:

Get all the people who tweeted the URL 
Get all the followers of all those people 
Unique the set of followers 
Count the unique set of followers 

LinearDRPCTopologyBuilder builder =  LinearDRPCTopologyBuilder("");
builder.addBolt( GetTweeters(), 3);
builder.addBolt( GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt( PartialUniquer(), 6)
        .fieldsGrouping( Fields("", ""));
builder.addBolt( CountAggregator(), 2)
        .fieldsGrouping( Fields(""));



首先对于LinearDRPCTopologyBuilder中, 没有各个spout和bolt的id, 可能因为是线性的, 所以没有必要显式指明关系, 按加入顺序就可以  
GetTweeters(), 输入(rpc-id, url), 输出(rpc-id, tweeter)  
GetFollowers(), 输入(rpc-id, tweeter), 输出(rpc-id, follower)

对于PartialUniquer仔细分析一下,   
首先是基于id和follower的grouping, fieldsGrouping(new Fields("id", "follower")), 因为可能由多个RPC同时执行, 所以只需要保证同一个RPC的followers被发到同一个task即可.  
接着继承BaseBatchBolt, 可以在finishBatch中emit结果  
使用HashSet来存放follower, 保证不重复

   PartialUniquer  BaseBatchBolt {
        BatchOutputCollector _collector;
        Object _id;
        Set<String> _followers =  HashSet<String>();
        
        @Override
          prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            _collector = collector;
            _id = id;
        }

        @Override
          execute(Tuple tuple) {
            _followers.add(tuple.getString(1));
        }
        
        @Override
          finishBatch() {
            _collector.emit( Values(_id, _followers.size()));
        }

        @Override
          declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare( Fields("", ""));
        }
    }


Non-linear DRPC topologies

当前LinearDRPCTopologyBuilder只能handle线性的bolt workflow    

LinearDRPCTopologyBuilder only handles "linear" DRPC topologies, where the computation is expressed as a sequence of steps (like reach). It's not hard to imagine functions that would require a more complicated topology with branching and merging of the bolts. For now, to do this you'll need to drop down into using CoordinatedBolt directly. Be sure to talk about your use case for non-linear DRPC topologies on the mailing list to inform the construction of more general abstractions for DRPC topologies.

来源:GAMELOOK

关注下方微信公众号“Java精选”(w_z90110),回复关键字领取资料:如HadoopDubboCAS源码等等,免费领取资料视频和项目。 

涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis缓存、Spring源码、各大主流框架、Web开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL非关系型数据库、运维等。

评论

分享:

支付宝

微信