Subscribed unsubscribe Subscribe Subscribe

PHP だけで非同期 DNS レゾルバを書く

PHP DNS asynchronous concurrency tick Net_DNS

fiber について調べてて、そういえば PHP には tick とかいうのがあったなあと思い、頭に浮かんでから 30 分ほどでできたのがこれ。きれいに書けてないのは Net_DNS が汚いからと、PHP の select() の API のせいです。そうそう、Net_DNS が必要です。

<?php
require_once 'Net/DNS.php';

class AsyncIOHandler
{
    const READ   = 1;
    const WRITE  = 2;
    private $hdlrs = array();
    private $readees = array();
    private $writees = array();
    private $in_crit_section = false;

    function add($sock, $event, $hdlr, $timeout)
    {
        if ($this->hdlrs[(int)$sock]) {
            return;
        }

        $this->in_crit_section = true;

        $this->hdlrs[(int)$sock] = array(
            'hdlr'    => $hdlr,
            'ts'      => microtime(true),
            'event'   => $event,
            'timeout' => $timeout,
            'sock'    => $sock,
        );

        if ($event & self::READ)
            $this->readees[] = $sock;

        if ($event & self::WRITE)
            $this->writees[] = $sock;

        $this->in_crit_section = false;
    }

    function __tick()
    {
        if ($this->in_crit_section)
            return;

        $readees = $this->readees;
        $writees = $this->writees;
        $hdlrs = $this->hdlrs;
        $now = microtime(true);

        foreach (array_keys($hdlrs) as $sock_no) {
            $hdlr = $hdlrs[$sock_no];

            if ($hdlr['timeout'] > 0 &&
                    $now - $hdlr['ts'] > $hdlr['timeout']) {
                if (method_exists($hdlr['hdlr'], 'onTimeout')) {
                    $hdlr['hdlr']->onTimeout($hdlr['sock']);
                }
                unset($this->hdlrs[$sock_no]);
                if ($hdlr['event'] & self::READ)
                    unset($this->readees[array_search($hdlr['sock'],
                            $readees, true)]);
                if ($hdlr['event'] & self::WRITE)
                    unset($this->writees[array_search($hdlr['sock'],
                            $writees, true)]);
            }
        }

        stream_select($readees, $writees, $dummy = NULL, 0, 0);

        foreach ($readees as $sock) {
            $hdlr = $hdlrs[(int)$sock];
            unset($this->hdlrs[(int)$sock]);
            unset($this->readees[array_search($hdlr['sock'], $readees, true)]);
            if (method_exists($hdlr['hdlr'], 'onReadReady'))
                $hdlr['hdlr']->onReadReady($sock);
        }

        foreach ($writees as $sock) {
            $hdlr = $hdlrs[(int)$sock];
            unset($this->hdlrs[(int)$sock]);
            unset($this->readees[array_search($hdlr['sock'], $writees, true)]);
            if (method_exists($hdlr['hdlr'], 'onWriteReady'))
                $hdlr['hdlr']->onWriteReady($sock);
        }
    }
}

class AsyncDNSResponseHandler
{
    private $resolver;
    private $question;
    public $host;
    public $port;

    function __construct($resolver, $host, $port, $question)
    {
        $this->resolver = $resolver;
        $this->host = $host;
        $this->port = $port;
        $this->question = $question;
    }

    function onReadReady($s)
    {
        $buf = fread($s, 512);
        if ($buf === false || strlen($buf) == 0) {
            return;
        }

        $ans = new Net_DNS_Packet($this->resolver->debug);
        if (!$ans->parse($buf)) {
            return;
        }
        if ($ans->header->qr != 1) {
            return;
        }
        if ($ans->header->id != $this->question->header->id) {
            return;
        }

        $this->resolver->_notifyReception($this, $ans);
    }
}

class AsyncDNSResolver extends Net_DNS_Resolver
{
    private $lsnrs = array();

    private $async;

    public function __construct($async, $defaults = array())
    { 
        parent::__construct($defaults);
        $this->async = $async;
    }

    public function addListener($lsnr)
    {
        $this->lsnrs[] = $lsnr;
    }

    public function _notifyReception($hdlr, $ans)
    {
        foreach ($this->lsnrs as $lsnr) {
            $lsnr->answerReceived($ans);
        }
    }

    public function send_udp($packet, $packet_data)
    {
        $entries = array();

        // Create a socket handle for each nameserver
        foreach ($this->nameservers as $nameserver) {
            if ($s = stream_socket_client("udp://$nameserver:{$this->port}")) {
                socket_set_blocking($s, false);
                $entries[] = array(
                    'peerhost' => $nameserver,
                    'peerport' => $this->port,
                    'sock'     => $s,
                );
            }
        }

        if (empty($entries)) {
            $this->errorstring = 'no nameservers';
            return null;
        }

        foreach ($entries as $entry) {
            if (fwrite($entry['sock'], $packet_data) != strlen($packet_data)) {
                continue;
            }

            $hdlr = new AsyncDNSResponseHandler($this,
                    $hdlr['peerhost'],
                    $hdlr['peerport'],
                    $packet);
            $this->async->add($entry['sock'], AsyncIOHandler::READ,
                    $hdlr, $this->retrans);
        }
    }
}

class Listener
{
    private $toTerminate = false;

    public function answerReceived($ans)
    {
        var_dump($ans->string());
        $this->toTerminate = true;
    }

    public function toTerminate()
    {
        return $this->toTerminate;
    }
}

// 非同期 I/O ハンドラを生成し、tick ハンドラとして登録
$async = new AsyncIOHandler();
register_tick_function(array($async, '__tick'));

// 非同期 DNS レゾルバを生成
$resolver = new AsyncDNSResolver($async);
$resolver->addListener($lsnr = new Listener());

// クエリを実際に発行してみる。
declare(ticks=2) {
    $resolver->query('d.hatena.ne.jp');
    while (!$lsnr->toTerminate()) {
        echo "hoge\n";
        usleep(1000);
    }
}

// vim: sts=4 sw=4 ts=4 et
?>

実行結果

moriyoshi@roadrunner ~/src/asyncdnstest% php test.php
hoge
hoge
hoge
hoge
hoge
string(516) ";; HEADER SECTION
;; id = 59307
;; qr = 1    opcode = QUERY    aa = 0    tc = 0    rd = 1
;; ra = 1    rcode  = NOERROR
;; qdcount = 1  ancount = 4  nscount = 0  arcount = 0

;; QUESTION SECTION (1 record)
;; d.hatena.ne.jp.      IN      A

;; ANSWER SECTION (4 records)
;; d.hatena.ne.jp.              84886   IN      A       221.186.129.146
;; d.hatena.ne.jp.              84886   IN      A       221.186.146.29
;; d.hatena.ne.jp.              84886   IN      A       61.196.246.67
;; d.hatena.ne.jp.              84886   IN      A       125.206.202.83

;; AUTHORITY SECTION (0 records)

;; ADDITIONAL SECTION (0 records)

ちゃんと「hoge」の出力と並行してパケットの到着を待っていますね。
もしかしたら謎バグが潜んでるかも。実際に使うときは用心してください。