マルチスレッドで悩んでみる

| 0 Comments | 0 TrackBacks

Xango というクローラーのフレームワークがあるらしいけど、POE って何?状態だし、ドキュメント(Xangoというクロウラーフレームワークを使ってみる)読んでみてもデータベースとの連携方法に見当がつかなかったので、スレッドでなんとかしてみることにしました。

Perlでマルチスレッドperlthrtut - Perlにおけるスレッドのチュートリアル がとても参考になりました。

で、今までスレッドなんかいじったことなくて、これであってるのかいまいち自信が持てないのだけど、わからないなりに役割分担を考えて、3 種類のスレッドを作って動かすことにしました。

  • manager: SQL からクロールすべき情報をとってくるスレッド
  • worker: HTTP 通信を行うスレッド
  • postman: クロールした結果を SQL につっこむスレッド

という感じで、適当に名前を付けました。manager と postman それぞれが SQL 鯖に コネクションを張っています。

で、スレッド間の通信には、Thread::Queue を使いました。 使っている場所は、manager -> worker への受け渡し、worker -> postman への受け渡しです。

困ったことに、データがうまく渡らなかったので、BitTorrent なデータフォーマットに使われている奴 Convert::Bencode でエンコードして文字列として渡してみました。

今のところ、(たぶん postman が)うまく終了してくれないようですが、クローラーとしては、ちゃんと動いてくれています。

宣言とか

スレッドを使うので宣言
use threads;
use Thread::Queue;

スレッド間のデータの受け渡しに使うので宣言
use Convert::Bencode qw(bencode bdecode);

子供の数
my $maxth = 10;

sql からクロールするべき情報を引っ張ってくるスレッド

sub manager {
    my ($qi, $qo, $name) = @_;

    require hoge::DBI;
    foreach my $one (hoge::DBI::sites->search(hoge => 0)) {
        my $p = {
                 id => $one->id,
                 url => $one->url
             };
        # worker が使う url と、postman が使う id を渡す
        $qi->enqueue(bencode($p));
    }

    # sql から全部取りおわったら、worker をコロ助。
    for (my $e = 0; $e < $maxth; $e++) {
        $qi->enqueue(undef);
    }

    return 0;
}

クローラーのスレッド (こいつは maxth 個作られる)

sub worker {
    my ($qi, $qo, $name) = @_;

    while (my $p = $qi->dequeue) {
        my $x = bdecode($p);

        # ここでクロールする

        $qo->enqueue(bencode($x));
    }
    return 0;
}

sql にクロールした結果を突っ込むスレッド

sub postman {
    my ($qi, $qo, $name) = @_;

    require hoge::DBI;
    while (my $p = $qo->dequeue) {
        my $x = bdecode($p);
        my $d = hoge::DBI::sites->retrieve(id => $x->{id});

        # 値を入れて更新
        $d->update;
    }
    return 0;
}

偉大なるママン

sub mother {
    my $qi = new Thread::Queue;
    my $qo = new Thread::Queue;
    my @kids;
    my ($c, $m, $p);

    $m = threads->new(\&manager, $qi, $qo, "hoge");
    $p = threads->new(\&postman, $qi, $qo, "hoge");
    for ($c = 0; $c < $maxth; $c++) {
        $kids[$c] = threads->new(\&worker, $qi, $qo, $c);
    }

    # 終わったらコロ助
    for ($c = 0; $c < $maxth; $c++) {
        $kids[$c]->join;
    }
    $m->join;
    $p->join;

    print "done.\n";
}

&mother;

No TrackBacks

TrackBack URL: http://blog.woremacx.com/MT/mt-tb.cgi/38

Leave a comment