connvoiのブログ

前だけ向いていけばいい。

storm-starterでstormの動作を知る。

リアルタイム分散処理で有名なstormを触ります。 本家にあるtutorialのstrom-starterをやってみる.

インストールから実行

leiningenを入れて、 あとはstorm-starterのコマンド通りにやるだけ。

$brew install leiningen
$git clone git://github.com/nathanmarz/storm-starter.git
$cd storm-starter
$lein deps
$lein compile
#このとき、実行されるのは、src/jvm/storm/starter/Exclamationtopology.java
$java -cp $(lein classpath) storm.starter.ExclamationTopology

実行結果の一部としてこんな表示がでればOK

INFO  backtype.storm.daemon.worker  - Worker e5a06064-17b6-4892-bff2-0e27b72a3e02 for storm test-1-1368162622 on d52ea4b6-b116-4a0f-9761-bb5ae251287c:4 has finished loading
INFO  backtype.storm.daemon.task  - Emitting: word default [jackson]
INFO  backtype.storm.daemon.executor  - Processing received message source: word:5, stream: default, id: {}, [jackson]
INFO  backtype.storm.daemon.task  - Emitting: exclaim1 default [jackson!!!]
INFO  backtype.storm.daemon.executor  - Processing received message source: exclaim1:2, stream: default, id: {}, [jackson!!!]
INFO  backtype.storm.daemon.task  - Emitting: exclaim2 default [jackson!!!!!!]

Exclamationtopology.javaはsploutから生成されたワードに!マークを追加していく物で 1bolt毎に「!!!」を追加していきます。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word", new TestWordSpout(), 1);
builder.setBolt("exclaim1", new ExclamationBolt(), 1)
          .shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
          .shuffleGrouping("exclaim1");

Config conf = new Config();
conf.setDebug(true);

WordCountTopology

storm-starterの中に入っているWordCountTopologyを試します。 動作的にはRandomSentenceSpout.javaで発行される文章をsplitして、 ワードカウントをしてきます。

$ java -cp $(lein classpath) storm.starter.WordCountTopology
...
INFO  backtype.storm.daemon.task  - Emitting: split default ["seven"]
INFO  backtype.storm.daemon.task  - Emitting: count default [four, 1]
INFO  backtype.storm.daemon.executor  - Processing received message source: split:3, stream: default, id: {}, ["score"]
INFO  backtype.storm.daemon.task  - Emitting: count default [score, 1]
INFO  backtype.storm.daemon.task  - Emitting: split default ["years"]
INFO  backtype.storm.daemon.executor  - Processing received message source: split:3, stream: default, id: {}, ["and"]
INFO  backtype.storm.daemon.task  - Emitting: count default [and, 1]
INFO  backtype.storm.daemon.executor  - Processing received message source: split:3, stream: default, id: {}, ["seven"]
INFO  backtype.storm.daemon.task  - Emitting: count default [seven, 1]
...

で、ちょっと変更を加えていきます。 RandomSentenceSpoutで発行される文章を適当な物に変更します。 sleepを1000に変更して、 "1000ms毎にhogehoge fofofo か hogehoge fugafuga のどちらかをランダムに発行する" Spoutにします。

@Override                                                              
public void nextTuple() {                                              
    Utils.sleep(1000);
    String[] sentences = new String[] {                                
        "hogehoge fofofo",
        "hogehoge fugafuga"                                            
    };  
    String sentence = sentences[_rand.nextInt(sentences.length)];      
    _collector.emit(new Values(sentence));
} 

WordCountTopology.javaのSetSpout,setBoltのparallelismを1に設定。 これで"RandomSentenceSpout -> SplitSentence -> WordCount" の流れをもったtopologyができると。

public static void main(String[] args) throws Exception {

   TopologyBuilder builder = new TopologyBuilder();
   builder.setSpout("spout", new RandomSentenceSpout(), 1);
   builder.setBolt("split", new SplitSentence(), 1)
          .shuffleGrouping("spout");
   builder.setBolt("count", new WordCount(), 1)
            .fieldsGrouping("split", new Fields("word"));

   Config conf = new Config();
   conf.setDebug(true);

これを実行して、 最終的にhogehogeの回数がfugafugaとfofofoを足した数になってれば意図した通りになってます。

.task  - Emitting: spout default [hogehoge fugafuga]
.executor  - Processing received message source: spout:4, stream: default, id: {}, [hogehoge fugafuga]
.task  - Emitting: split default ["hogehoge"]
.executor  - Processing received message source: split:3, stream: default, id: {}, ["hogehoge"]
.task  - Emitting: count default [hogehoge, 7]
.task  - Emitting: split default ["fugafuga"]
.executor  - Processing received message source: split:3, stream: default, id: {}, ["fugafuga"]
.task  - Emitting: count default [fugafuga, 6]
.task  - Emitting: spout default [hogehoge fofofo]
.executor  - Processing received message source: spout:4, stream: default, id: {}, [hogehoge fofofo]
.task  - Emitting: split default ["hogehoge"]
.executor  - Processing received message source: split:3, stream: default, id: {}, ["hogehoge"]
.task  - Emitting: count default [hogehoge, 8]
.task  - Emitting: split default ["fofofo"]
.executor  - Processing received message source: split:3, stream: default, id: {}, ["fofofo"]
.task  - Emitting: count default [fofofo, 2]

もうちょっと細かくみていく。

SplitSentenceのコンストラクタで書かれてるこれ。 Boltでpythonを読んでます。

public SplitSentence() {
    super("python", "splitsentence.py");
}

githubのstorm-starterのmultilang/resourcesにあるsplitsentence.pyを参照してます。 中身はスペースでsplitしてる物です。
STDINとOUTでやり取りするらしい。 パフォーマンスに影響しないのかな。 https://github.com/nathanmarz/storm/wiki/Multilang-protocol

次にWordCountBoltの受け取り。 受け取ったタプルのワードをカウントして、カウント数とワードをemit

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    String word = tuple.getString(0);
    Integer count = counts.get(word);
    if(count==null) count = 0;
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

WordCountする手前のところで、fieldsgroupingを使っていて、 word毎に流れるタスクを分けてます。

//WordCountTopology.java
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

これによって、wordAとwordBがそれぞれ同じgroup通過するようになります。 starterをlocalmodeで動かしてる場合は特に気にしなくても大丈夫。
https://github.com/nathanmarz/storm/wiki/Concepts

まとめ

基本的にはTopologyに書いたSpout,Bolt,Boltの形で処理が実行される。 書き方によってはSpout,Boltの分岐も可能。

parallelism,grouping,worker,taskと分散系のワードが結構合って、
それぞれの意味をちゃんと整理して理解しないと混乱しそう。

https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology つぎはもう少しドキュメントを読んでまとめよう。