アメーバなうクライアントを Python で書いてみる
タイムライン取得とかつくらんとなー。つけた。
# encoding: utf-8 import urllib2 import cookielib import re import xml.dom as dom import xml.dom.minidom as minidom from BeautifulSoup import BeautifulSoup, Tag, Comment, NavigableString __all__ = ( 'AmebaNowClientException', 'UnsupportedContentTypeError', 'AuthenticationError', 'PostError', 'UnexpectedResponseError', 'AmebaNowClient', ) class AmebaNowClientException(Exception): pass class UnsupportedContentTypeError(AmebaNowClientException): pass class AuthenticationError(AmebaNowClientException): pass class UnexpectedResponseError(AmebaNowClientException): pass class PostError(AmebaNowClientException): pass class AmebaNowClient(object): LOGIN_URL = 'http://www.ameba.jp/login.do' FORM_URL = 'http://now.ameba.jp/' POST_URL = 'http://ucsnow.ameba.jp/post' API_MYTIMELINE = 'http://now.ameba.jp/api/timeline' API_TIMELINE = 'http://now.ameba.jp/api/entryList' API_ENCODING = 'utf-8' def __init__(self, credentials): self.credentials = credentials self.cookiejar = cookielib.CookieJar() self.opener = urllib2.build_opener( urllib2.HTTPCookieProcessor(self.cookiejar)) @staticmethod def _textify(nodelist): retval = '' for n in nodelist: if isinstance(n, Comment): pass elif isinstance(n, Tag): if n.name == 'br': retval += "\n" else: retval += AmebaNowClient._textify(n) elif isinstance(n, NavigableString): retval += unicode(n) return retval def _urlread(self, url, data=None, default_charset='utf-8'): r = self.opener.open(url, data) headers = r.info() resp = r.read() if headers.gettype() == 'text/html': encoding = headers.getparam('charset') if encoding == None: encoding = default_charset match = re.search(r'<meta\s+http-equiv=(["\']?)Content-Type\1\s+content=("[^"]*"|\'[^\']*\'|[^"\'\s]*)[^>]*>', resp, re.IGNORECASE) value = match.group(2) if value[0] == '"' or value[0] == "'": value = value[1:-1] # XXX: should take care of quoted values tmp = re.split(r'\s*;\s*', value) for k, v in (t.split('=') for t in tmp[1:]): if k.lower() == 'charset': encoding = v elif headers.gettype() == 'text/xml': encoding = headers.getparam('charset') if encoding == None: encoding = default_charset match = re.match(r'''<?xml\s+[^?>]*encoding=("[^"]*"|'[^']*'|[^"'?>]*)[^?>]*?>''', resp, re.IGNORECASE) value = match.group(1) if value[0] == '"' or value[0] == "'": value = value[1:-1] encoding = v else: raise UnsupportedContentTypeError(headers.gettype()) r.close() return r, encoding, resp.decode(encoding) def login(self): _, _, resp = self._urlread(self.LOGIN_URL, 'password=%s&amebaId=%s' % ( urllib2.quote(self.credentials['password']), urllib2.quote(self.credentials['ameba_id']))) error_node = BeautifulSoup(resp).find('div', 'errorId', recursive=True) if error_node: raise AuthenticationError( AmebaNowClient._textify(error_node).strip()) @staticmethod def _buildquery(params, encoding): data = [] for k, v in params.iteritems(): if isinstance(k, unicode): k = k.encode(encoding) if isinstance(v, unicode): v = v.encode(encoding) k = str(k) v = str(v) data.append('%s=%s' % (urllib2.quote(k), urllib2.quote(v))) return '&'.join(data) def post(self, text, reply_to=None): _, encoding, resp = self._urlread(self.FORM_URL) form = BeautifulSoup(resp).find('form', id='inputForm') params = {} for n in form.findAll('input', type='hidden'): params[n['name']] = n['value'] text_area_name = form.find('textarea')['name'] params[text_area_name] = text if reply_to is not None: params['replyEntryId'] = reply_to['id'] params['replyAmebaId'] = reply_to['ameba_id'] r, encoding, resp = self._urlread(self.POST_URL, AmebaNowClient._buildquery(params, encoding)) if r.url != self.FORM_URL: if r.url == self.POST_URL: error_node = BeautifulSoup(resp).find('p', id='errorArea', recursive=True) if error_node: raise PostError(AmebaNowClient._textify(error_node).strip()) raise PostError() @staticmethod def _selectonenode(parent, name): nodes = parent.getElementsByTagName(name) if len(nodes) == 0: raise UnexpectedResponseError('No <%s> element' % name) elif len(nodes) > 1: raise UnexpectedResponseError('More than one <%s> elements found' % name) return nodes[0] @staticmethod def _getnodevalue(node, name): n = AmebaNowClient._selectonenode(node, name) if len(n.childNodes) > 0: return ''.join(i.nodeValue if i.nodeValue is not None else '' for i in n.childNodes) else: return None @staticmethod def _parseentrylist(entry_list_node): result = [] getnodevalue = AmebaNowClient._getnodevalue for i in entry_list_node.childNodes: if i.nodeType == dom.Node.ELEMENT_NODE: result.append({ 'id': getnodevalue(i, 'entryId'), 'ameba_id': getnodevalue(i, 'amebaId'), 'text': getnodevalue(i, 'entryText'), 'reply_to': { 'ameba_id': getnodevalue(i, 'replyAmebaId'), 'id': getnodevalue(i, 'replyEntryId'), }, 'nickname': getnodevalue(i, 'thumbnailNickname'), 'image': { 'url': getnodevalue(i, 'thumbnailImagePath'), 'width': getnodevalue(i, 'thumbnailImageWidth'), 'height': getnodevalue(i, 'thumbnailImageHeight'), }, 'reply_allowed': \ not bool(int(getnodevalue(i, 'denyReplyFlag'))), 'mine': bool(int(getnodevalue(i, 'isMyEntry'))), }) return result def getmytimeline(self, offset=0, limit=20): doc = minidom.parse(self.opener.open( self.API_MYTIMELINE + '?' \ + AmebaNowClient._buildquery(dict(offset=offset), self.API_ENCODING))) if doc.documentElement.nodeName != u'response': raise UnexpectedResponseError(doc.documentElement.nodeName) entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList') offset = entry_list_node.getAttribute('offset') return int(offset), AmebaNowClient._parseentrylist(entry_list_node) def gettimeline(self, ameba_id, offset=0, limit=20): doc = minidom.parse(self.opener.open( self.API_TIMELINE + '/%s' % urllib2.quote(ameba_id) + '?' \ + AmebaNowClient._buildquery(dict(offset=offset), self.API_ENCODING))) doc = minidom.parseString(resp) if doc.documentElement.nodeName != u'response': raise UnexpectedResponseError(doc.documentElement.nodeName) entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList') offset = entry_list_node.getAttribute('offset') return int(offset), AmebaNowClient._parseentrylist(entry_list_node) if __name__ == '__main__': import os c = AmebaNowClient(dict(ameba_id=os.environ['AMEBA_ID'], password=os.environ['AMEBA_PASSWORD'])) c.login() offset, entries = c.getmytimeline() for entry in entries: print entry['id'], entry['nickname'], entry['text'] c.post(u'ちんこ')