#! /usr/bin/python """ Twython is a library for Python that wraps the Twitter API. It aims to abstract away all the API endpoints, so that additions to the \ library and/or the Twitter API won't cause any overall problems. Questions, comments? ryan@venodesigns.net """ __author__ = "Ryan McGrath " __version__ = "1.4.1" import urllib.request, urllib.parse, urllib.error import urllib.request, urllib.error, urllib.parse import urllib.parse import http.client import httplib2 import mimetypes from email.generator import _make_boundary import re import oauth2 as oauth # Twython maps keyword based arguments to Twitter API endpoints. The endpoints # table is a file with a dictionary of every API endpoint that Twython supports. from .twitter_endpoints import base_url, api_table from urllib.error import HTTPError import json as simplejson class TwythonError(AttributeError): """ Generic error class, catch-all for most Twython issues. Special cases are handled by APILimit and AuthError. Note: To use these, the syntax has changed as of Twython 1.3. \ To catch these, you need to explicitly import them into your code, e.g: from twython import TwythonError, APILimit, AuthError """ def __init__(self, msg, error_code=None): self.msg = msg if error_code == 400: raise APILimit(msg) def __str__(self): return repr(self.msg) class APILimit(TwythonError): """ Raised when you've hit an API limit. Try to avoid these, read the API docs if you're running into issues here, Twython does not concern \ itself with this matter beyond telling you that you've done goofed. """ def __init__(self, msg): self.msg = msg def __str__(self): return repr(self.msg) class AuthError(TwythonError): """ Raised when you try to access a protected resource and it fails due to \ some issue with your authentication. """ def __init__(self, msg): self.msg = msg def __str__(self): return repr(self.msg) class Twython(object): def __init__(self, twitter_token = None, twitter_secret = None,\ oauth_token = None, oauth_token_secret = None, headers=None): """setup(self, oauth_token = None, headers = None) Instantiates an instance of Twython. Takes optional parameters for \ authentication and such (see below). Parameters: twitter_token - Given to you when you register your application\ with Twitter. twitter_secret - Given to you when you register your \ application with Twitter. oauth_token - If you've gone through the authentication process\ and have a token for this user,pass it in and \ it'll be used for all requests going forward. oauth_token_secret - see oauth_token; it's the other half. headers - User agent header, dictionary style aka \ {'User-Agent': 'Bert'} ** Note: versioning is not currently used by search.twitter \ functions; when Twitter moves their junk, it'll be supported. """ # Needed for hitting that there API. self.request_token_url = 'http://twitter.com/oauth/request_token' self.access_token_url = 'http://twitter.com/oauth/access_token' self.authorize_url = 'http://twitter.com/oauth/authorize' self.authenticate_url = 'http://twitter.com/oauth/authenticate' self.twitter_token = twitter_token self.twitter_secret = twitter_secret self.oauth_token = oauth_token self.oauth_secret = oauth_token_secret # If there's headers, set them, otherwise be an embarassing parent for \ # their own good. self.headers = headers if self.headers is None: self.headers = {'User-agent': 'Twython Python Twitter Library v1.3'} consumer = None token = None if self.twitter_token is not None and self.twitter_secret is not None: consumer = oauth.Consumer(self.twitter_token, self.twitter_secret) if self.oauth_token is not None and self.oauth_secret is not None: token = oauth.Token(oauth_token, oauth_token_secret) # Filter down through the possibilities here - if they have a token, \ # if they're first stage, etc. if consumer is not None and token is not None: self.client = oauth.Client(consumer, token) elif consumer is not None: self.client = oauth.Client(consumer) else: # If they don't do authentication, but still want to request \ # unprotected resources, we need an opener. self.client = httplib2.Http() def __getattr__(self, api_call): """ The most magically awesome block of code you'll see in 2010. Rather than list out 9 million damn methods for this API, \ we just keep a table (see above) of every API endpoint and their \ corresponding function id for this library. This pretty much gives unlimited flexibility in API support - there's a slight chance of a\ performance hit here, but if this is going to be your bottleneck...\ well, don't use Python. ;P For those who don't get what's going on here, Python classes have \ this great feature known as __getattr__(). It's called when an attribute that was called on an object doesn't \ seem to exist - since it doesn't exist,we can take over and find \ the API method in our table. We then return a function that \ downloads and parses what we're looking for, based on the keywords \ passed in. I'll hate myself for saying this, but this is heavily inspired by \ Ruby's "method_missing". """ def get(self, **kwargs): # Go through and replace any mustaches that are in our API url. fn = api_table[api_call] base = re.sub( '\{\{(?P[a-zA-Z_]+)\}\}', lambda m: "%s" % kwargs.get(m.group(1), '1'),\ # The '1' here catches the API version. Slightly hilarious. base_url + fn['url'] ) # Then open and load that shiiit, yo. # TODO: check HTTP method and junk, handle errors/authentication if fn['method'] == 'POST': resp, content = self.client.request(base, fn['method'], \ urllib.parse.urlencode(dict([k, v.encode('utf-8')] \ for k, v in list(kwargs.items())))) else: url = base + "?" + "&".join(["%s=%s" %(key, value) \ for (key, value) in list(kwargs.items())]) resp, content = self.client.request(url, fn['method']) return simplejson.loads(content.decode('utf-8')) if api_call in api_table: return get.__get__(self) else: raise TwythonError(api_call) def get_authentication_tokens(self): """ get_auth_url(self) Returns an authorization URL for a user to hit. """ resp, content = self.client.request(self.request_token_url, "GET") if resp['status'] != '200': raise AuthError("Seems something couldn't be verified with your \ OAuth junk. Error: %s, Message: %s" % (resp['status'], content)) request_tokens = dict(urllib.parse.parse_qsl(content)) request_tokens['auth_url'] = "%s?oauth_token=%s" % \ (self.authenticate_url, request_tokens['oauth_token']) return request_tokens def get_authorized_tokens(self): """ get_authorized_tokens Returns authorized tokens after they go through the auth_url phase. """ resp, content = self.client.request(self.access_token_url, "GET") return dict(urllib.parse.parse_qsl(content)) # -------------------------------------------------------------------------- # The following methods are all different in some manner or require special\ # attention with regards to the Twitter API. # Because of this, we keep them separate from all the other endpoint \ # definitions - ideally this should be change-able,but it's not high on \ # the priority list at the moment. # -------------------------------------------------------------------------- @staticmethod def constructApiURL(base_url, params): return base_url + "?" + "&".join(["%s=%s" %(Twython.unicode2utf8(key),\ urllib.parse.quote_plus(Twython.unicode2utf8(value))) \ for (key, value) in list(params.items())]) @staticmethod def shortenURL(url_to_shorten, shortener = "http://is.gd/api.php", \ query = "longurl"): """shortenURL(url_to_shorten, shortener = "http://is.gd/api.php", \ query = "longurl") Shortens url specified by url_to_shorten. Parameters: url_to_shorten - URL to shorten. shortener - In case you want to use a url shortening service \ other than is.gd. """ try: content = urllib.request.urlopen(shortener + "?" + \ urllib.parse.urlencode(\ {query: Twython.unicode2utf8(url_to_shorten)})).read() return content except HTTPError as e: raise TwythonError("shortenURL() failed with a %s error code." % \ repr(e.code)) def bulkUserLookup(self, ids = None, screen_names = None, version = None): """ bulkUserLookup(self, ids = None, screen_names = None, \ version = None) A method to do bulk user lookups against the Twitter API. \ Arguments (ids (numbers) / screen_names (strings)) should be flat \ Arrays that contain their respective data sets. Statuses for the users in question will be returned inline if \ they exist. Requires authentication! """ apiURL = "http://api.twitter.com/1/users/lookup.json?lol=1" if ids is not None: apiURL += "&user_id=" for id in ids: apiURL += repr(id) + "," if screen_names is not None: apiURL += "&screen_name=" for name in screen_names: apiURL += name + "," try: resp, content = self.client.request(apiURL, "GET") return simplejson.loads(content) except HTTPError as e: raise TwythonError("bulkUserLookup() failed with a %s error code." \ % repr(e.code), e.code) def search(self, **kwargs): """search(search_query, **kwargs) Returns tweets that match a specified query. Parameters: See the documentation at http://dev.twitter.com/doc/get/search.\ Pass in the API supported arguments as named parameters. e.g x.search(q = "python") """ searchURL = Twython.constructApiURL(\ "http://search.twitter.com/search.json", kwargs) try: resp, content = self.client.request(searchURL, "GET") return simplejson.loads(content) except HTTPError as e: raise TwythonError("getSearchTimeline() failed with a %s error \ code." % repr(e.code), e.code) def searchTwitter(self, **kwargs): """use search(search_query, **kwargs) searchTwitter(q = "python", page = "2")""" return search(self, **kwargs) def searchGen(self, search_query, **kwargs): """searchGen(search_query, **kwargs) Returns a generator of tweets that match a specified query. Parameters: See the documentation at http://dev.twitter.com/doc/get/search.\ Pass in the API supported arguments as named parameters. e.g x.search(search_query="python", page="2") """ searchURL = Twython.constructApiURL(\ "http://search.twitter.com/search.json?q=%s" % Twython.unicode2utf8(\ search_query), kwargs) try: resp, content = self.client.request(searchURL, "GET") data = simplejson.loads(content) except HTTPError as e: raise TwythonError("searchTwitterGen() failed with a %s error \ code." % repr(e.code), e.code) if not data['results']: raise StopIteration for tweet in data['results']: yield tweet if 'page' not in kwargs: kwargs['page'] = '2' else: try: kwargs['page'] = int(kwargs['page']) kwargs['page'] += 1 kwargs['page'] = str(kwargs['page']) except TypeError: raise TwythonError("searchGen() exited because page takes str") except e: raise TwythonError("searchGen() failed with %s error code" %\ repr(e.code), e.code) for tweet in self.searchGen(search_query, **kwargs): yield tweet def isListMember(self, list_id, id, username, version = 1): """ isListMember(self, list_id, id, version) Check if a specified user (id) is a member of the list in question \ (list_id). **Note: This method may not work for private/protected lists, \ unless you're authenticated and have access to those lists. Parameters: list_id - Required. The slug of the list to check against. id - Required. The ID of the user being checked in the list. username - User who owns the list you're checking against \ (username) version (number) - Optional. API version to request. Entire Twython class defaults to 1, but you can override on a function-by-function or class basis - (version=2), etc. """ try: resp, content = self.client.request("http://api.twitter.com/%d/%s/\ %s/members/%s.json" % (version, username, list_id, repr(id))) return simplejson.loads(content) except HTTPError as e: raise TwythonError("isListMember() failed with a %d error code." % \ repr(e.code), e.code) def isListSubscriber(self, username, list_id, id, version = 1): """ isListSubscriber(self, list_id, id, version) Check if a specified user (id) is a subscriber of the list in question (list_id). **Note: This method may not work for private/protected lists, unless you're authenticated and have access to those lists. Parameters: list_id - Required. The slug of the list to check against. id - Required. The ID of the user being checked in the list. username - Required. The username of the owner of the list \ that you're seeing if someone is subscribed to. version (number) - Optional. API version to request.\ Entire Twython class defaults to 1, but you can override on a \ function-by-function or class basis - (version=2), etc. """ try: resp, content = "http://api.twitter.com/%d/%s/%s/following/%s.json"\ % (version, username, list_id, repr(id)) return simplejson.loads(content) except HTTPError as e: raise TwythonError("isListMember() failed with a %d error code." % \ repr(e.code) , e.code) # The following methods are apart from the other Account methods, \ # because they rely on a whole multipart-data posting function set. def updateProfileBackgroundImage(self, filename, tile="true", version = 1): """ updateProfileBackgroundImage(filename, tile="true") Updates the authenticating user's profile background image. Parameters: image - Required. Must be a valid GIF, JPG, or PNG image of \ less than 800 kilobytes in size. Images with width larger than \ 2048 pixels will be forceably scaled down. tile - Optional (defaults to true). If set to true the \ background image will be displayed tiled. The image will not \ be tiled otherwise. ** Note: It's sad, but when using this method, pass the tile \ value as a string, e.g tile="false" version (number) - Optional. API version to request. Entire Twython class defaults to 1, but you can override on a \ function-by-function or class basis - (version=2), etc. """ try: files = [("image", filename, open(filename, 'rb').read())] fields = [] content_type, body = Twython.encode_multipart_formdata(fields, \ files) headers = {'Content-Type': content_type, 'Content-Length': \ str(len(body))} r = urllib.request.Request("http://api.twitter.com/%d/account/\ update_profile_background_image.json?tile=%s" %\ (version, tile), body, headers) return urllib.request.urlopen(r).read() except HTTPError as e: raise TwythonError("updateProfileBackgroundImage() \ failed with a %d error code." % repr(e.code), e.code) def updateProfileImage(self, filename, version = 1): """ updateProfileImage(filename) Updates the authenticating user's profile image (avatar). Parameters: image - Required. Must be a valid GIF, JPG, or PNG image of \ less than 700 kilobytes in size. Images with width larger than \ 500 pixels will be scaled down. version (number) - Optional. API version to request. \ Entire Twython class defaults to 1, but you can override on a \ function-by-function or class basis - (version=2), etc. """ try: files = [("image", filename, open(filename, 'rb').read())] fields = [] content_type, body = Twython.encode_multipart_formdata(fields, \ files) headers = {'Content-Type': content_type, 'Content-Length': \ str(len(body))} r = urllib.request.Request("http://api.twitter.com/%d/account/\ update_profile_image.json" % version, body, headers) return urllib.request.urlopen(r).read() except HTTPError as e: raise TwythonError("updateProfileImage() failed with a %d error \ code." % repr(e.code), e.code) @staticmethod def encode_multipart_formdata(fields, files): BOUNDARY = _make_boundary() CRLF = '\r\n' L = [] for (key, value) in fields: L.append('--' + BOUNDARY) L.append('Content-Disposition: form-data; name="%s"' % key) L.append('') L.append(value) for (key, filename, value) in files: L.append('--' + BOUNDARY) L.append('Content-Disposition: form-data; name="%s"; filename="%s"'\ % (key, filename)) L.append('Content-Type: %s' % mimetypes.guess_type(filename)[0] or \ 'application/octet-stream') L.append('') L.append(value) L.append('--' + BOUNDARY + '--') L.append('') body = CRLF.join(L) content_type = 'multipart/form-data; boundary=%s' % BOUNDARY return content_type, body @staticmethod def unicode2utf8(text): try: if isinstance(text, str): text = text.encode('utf-8') except: pass return text