Subscribed unsubscribe Subscribe Subscribe

アメーバなうクライアントを Python で書いてみる

タイムライン取得とかつくらんとなー。つけた。

http://gist.github.com/253169

# encoding: utf-8

import urllib2
import cookielib
import re
import xml.dom as dom
import xml.dom.minidom as minidom

from BeautifulSoup import BeautifulSoup, Tag, Comment, NavigableString

__all__ = (
    'AmebaNowClientException',
    'UnsupportedContentTypeError',
    'AuthenticationError',
    'PostError',
    'UnexpectedResponseError',
    'AmebaNowClient',
    )

class AmebaNowClientException(Exception):
    pass

class UnsupportedContentTypeError(AmebaNowClientException):
    pass

class AuthenticationError(AmebaNowClientException):
    pass

class UnexpectedResponseError(AmebaNowClientException):
    pass

class PostError(AmebaNowClientException):
    pass

class AmebaNowClient(object):
    LOGIN_URL = 'http://www.ameba.jp/login.do'
    FORM_URL = 'http://now.ameba.jp/'
    POST_URL = 'http://ucsnow.ameba.jp/post'
    API_MYTIMELINE = 'http://now.ameba.jp/api/timeline'
    API_TIMELINE = 'http://now.ameba.jp/api/entryList'
    API_ENCODING = 'utf-8'

    def __init__(self, credentials):
        self.credentials = credentials
        self.cookiejar = cookielib.CookieJar()
        self.opener = urllib2.build_opener(
            urllib2.HTTPCookieProcessor(self.cookiejar))

    @staticmethod
    def _textify(nodelist):
        retval = ''
        for n in nodelist:
            if isinstance(n, Comment):
                pass
            elif isinstance(n, Tag):
                if n.name == 'br':
                    retval += "\n"
                else:
                    retval += AmebaNowClient._textify(n)
            elif isinstance(n, NavigableString):
                retval += unicode(n)
        return retval

    def _urlread(self, url, data=None, default_charset='utf-8'):
        r = self.opener.open(url, data)
        headers = r.info()
        resp = r.read()
        if headers.gettype() == 'text/html':
            encoding = headers.getparam('charset')
            if encoding == None:
                encoding = default_charset
                match = re.search(r'<meta\s+http-equiv=(["\']?)Content-Type\1\s+content=("[^"]*"|\'[^\']*\'|[^"\'\s]*)[^>]*>', resp, re.IGNORECASE)
                value = match.group(2)
                if value[0] == '"' or value[0] == "'":
                    value = value[1:-1]
                # XXX: should take care of quoted values
                tmp = re.split(r'\s*;\s*', value)
                for k, v in (t.split('=') for t in tmp[1:]):
                    if k.lower() == 'charset':
                        encoding = v
        elif headers.gettype() == 'text/xml':
            encoding = headers.getparam('charset')
            if encoding == None:
                encoding = default_charset
                match = re.match(r'''<?xml\s+[^?>]*encoding=("[^"]*"|'[^']*'|[^"'?>]*)[^?>]*?>''', resp, re.IGNORECASE)
                value = match.group(1)
                if value[0] == '"' or value[0] == "'":
                    value = value[1:-1]
                encoding = v
        else:
            raise UnsupportedContentTypeError(headers.gettype())
        r.close()
        return r, encoding, resp.decode(encoding)

    def login(self):
        _, _, resp = self._urlread(self.LOGIN_URL,
            'password=%s&amebaId=%s' % (
                urllib2.quote(self.credentials['password']),
                urllib2.quote(self.credentials['ameba_id'])))
        error_node = BeautifulSoup(resp).find('div', 'errorId', recursive=True)
        if error_node:
            raise AuthenticationError(
                AmebaNowClient._textify(error_node).strip())

    @staticmethod
    def _buildquery(params, encoding):
        data = []
        for k, v in params.iteritems():
            if isinstance(k, unicode): k = k.encode(encoding)
            if isinstance(v, unicode): v = v.encode(encoding)
            k = str(k)
            v = str(v)
            data.append('%s=%s' % (urllib2.quote(k), urllib2.quote(v)))
        return '&'.join(data)

    def post(self, text, reply_to=None):
        _, encoding, resp = self._urlread(self.FORM_URL)
        form = BeautifulSoup(resp).find('form', id='inputForm')
        params = {}
        for n in form.findAll('input', type='hidden'):
            params[n['name']] = n['value']
        text_area_name = form.find('textarea')['name']
        params[text_area_name] = text
        if reply_to is not None:
            params['replyEntryId'] = reply_to['id']
            params['replyAmebaId'] = reply_to['ameba_id']
        r, encoding, resp = self._urlread(self.POST_URL, AmebaNowClient._buildquery(params, encoding))
        if r.url != self.FORM_URL:
            if r.url == self.POST_URL:
                error_node = BeautifulSoup(resp).find('p', id='errorArea', recursive=True)
                if error_node:
                    raise PostError(AmebaNowClient._textify(error_node).strip())
            raise PostError()

    @staticmethod
    def _selectonenode(parent, name):
        nodes = parent.getElementsByTagName(name)
        if len(nodes) == 0:
            raise UnexpectedResponseError('No <%s> element' % name)
        elif len(nodes) > 1:
            raise UnexpectedResponseError('More than one <%s> elements found' % name)
        return nodes[0]

    @staticmethod
    def _getnodevalue(node, name):
        n = AmebaNowClient._selectonenode(node, name)
        if len(n.childNodes) > 0:
            return ''.join(i.nodeValue if i.nodeValue is not None else '' for i in n.childNodes)
        else:
            return None

    @staticmethod
    def _parseentrylist(entry_list_node):
        result = []
        getnodevalue = AmebaNowClient._getnodevalue
        for i in entry_list_node.childNodes:
            if i.nodeType == dom.Node.ELEMENT_NODE:
                result.append({
                    'id':             getnodevalue(i, 'entryId'),
                    'ameba_id':       getnodevalue(i, 'amebaId'),
                    'text':           getnodevalue(i, 'entryText'),
                    'reply_to': {
                        'ameba_id': getnodevalue(i, 'replyAmebaId'),
                        'id':       getnodevalue(i, 'replyEntryId'),
                        },
                    'nickname': getnodevalue(i, 'thumbnailNickname'),
                    'image': {
                        'url': getnodevalue(i, 'thumbnailImagePath'),
                        'width': getnodevalue(i, 'thumbnailImageWidth'),
                        'height': getnodevalue(i, 'thumbnailImageHeight'),
                        },
                    'reply_allowed': \
                        not bool(int(getnodevalue(i, 'denyReplyFlag'))),
                    'mine': bool(int(getnodevalue(i, 'isMyEntry'))),
                    })
        return result

    def getmytimeline(self, offset=0, limit=20):
        doc = minidom.parse(self.opener.open(
                self.API_MYTIMELINE + '?' \
                + AmebaNowClient._buildquery(dict(offset=offset),
                                             self.API_ENCODING)))
        if doc.documentElement.nodeName != u'response':
            raise UnexpectedResponseError(doc.documentElement.nodeName)
        entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList')
        offset = entry_list_node.getAttribute('offset')
        return int(offset), AmebaNowClient._parseentrylist(entry_list_node)

    def gettimeline(self, ameba_id, offset=0, limit=20):
        doc = minidom.parse(self.opener.open(
                self.API_TIMELINE + '/%s' % urllib2.quote(ameba_id) + '?' \
                + AmebaNowClient._buildquery(dict(offset=offset),
                                             self.API_ENCODING)))
        doc = minidom.parseString(resp)
        if doc.documentElement.nodeName != u'response':
            raise UnexpectedResponseError(doc.documentElement.nodeName)
        entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList')
        offset = entry_list_node.getAttribute('offset')
        return int(offset), AmebaNowClient._parseentrylist(entry_list_node)

if __name__ == '__main__':
    import os
    c = AmebaNowClient(dict(ameba_id=os.environ['AMEBA_ID'], password=os.environ['AMEBA_PASSWORD']))
    c.login()
    offset, entries = c.getmytimeline()
    for entry in entries:
        print entry['id'], entry['nickname'], entry['text']
    c.post(u'ちんこ')