Twython 1.3, OAuth support is now finally included and working. Ships with an example Django application to get people started with OAuth, entire library is refactored to not be a royal clusterfsck. Enjoy.

This commit is contained in:
Ryan McGrath 2010-10-16 23:37:47 -04:00
parent 7ccf8a2baf
commit eb5541e433
22 changed files with 2400 additions and 4492 deletions

View file

@ -1 +1 @@
import core, twyauth, streaming
from twython import Twython

File diff suppressed because it is too large Load diff

View file

@ -1,524 +0,0 @@
import cgi
import urllib
import time
import random
import urlparse
import hmac
import binascii
VERSION = '1.0' # Hi Blaine!
HTTP_METHOD = 'GET'
SIGNATURE_METHOD = 'PLAINTEXT'
# Generic exception class
class OAuthError(RuntimeError):
def __init__(self, message='OAuth error occured.'):
self.message = message
# optional WWW-Authenticate header (401 error)
def build_authenticate_header(realm=''):
return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
# url escape
def escape(s):
# escape '/' too
return urllib.quote(s, safe='~')
# util function: current timestamp
# seconds since epoch (UTC)
def generate_timestamp():
return int(time.time())
# util function: nonce
# pseudorandom number
def generate_nonce(length=8):
return ''.join([str(random.randint(0, 9)) for i in range(length)])
# OAuthConsumer is a data type that represents the identity of the Consumer
# via its shared secret with the Service Provider.
class OAuthConsumer(object):
key = None
secret = None
def __init__(self, key, secret):
self.key = key
self.secret = secret
# OAuthToken is a data type that represents an End User via either an access
# or request token.
class OAuthToken(object):
# access tokens and request tokens
key = None
secret = None
'''
key = the token
secret = the token secret
'''
def __init__(self, key, secret):
self.key = key
self.secret = secret
def to_string(self):
return urllib.urlencode({'oauth_token': self.key, 'oauth_token_secret': self.secret})
# return a token from something like:
# oauth_token_secret=digg&oauth_token=digg
def from_string(s):
params = cgi.parse_qs(s, keep_blank_values=False)
key = params['oauth_token'][0]
secret = params['oauth_token_secret'][0]
return OAuthToken(key, secret)
from_string = staticmethod(from_string)
def __str__(self):
return self.to_string()
# OAuthRequest represents the request and can be serialized
class OAuthRequest(object):
'''
OAuth parameters:
- oauth_consumer_key
- oauth_token
- oauth_signature_method
- oauth_signature
- oauth_timestamp
- oauth_nonce
- oauth_version
... any additional parameters, as defined by the Service Provider.
'''
parameters = None # oauth parameters
http_method = HTTP_METHOD
http_url = None
version = VERSION
def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None):
self.http_method = http_method
self.http_url = http_url
self.parameters = parameters or {}
def set_parameter(self, parameter, value):
self.parameters[parameter] = value
def get_parameter(self, parameter):
try:
return self.parameters[parameter]
except:
raise OAuthError('Parameter not found: %s' % parameter)
def _get_timestamp_nonce(self):
return self.get_parameter('oauth_timestamp'), self.get_parameter('oauth_nonce')
# get any non-oauth parameters
def get_nonoauth_parameters(self):
parameters = {}
for k, v in self.parameters.iteritems():
# ignore oauth parameters
if k.find('oauth_') < 0:
parameters[k] = v
return parameters
# serialize as a header for an HTTPAuth request
def to_header(self, realm=''):
auth_header = 'OAuth realm="%s"' % realm
# add the oauth parameters
if self.parameters:
for k, v in self.parameters.iteritems():
if k[:6] == 'oauth_':
auth_header += ', %s="%s"' % (k, escape(str(v)))
return {'Authorization': auth_header}
# serialize as post data for a POST request
def to_postdata(self):
return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) for k, v in self.parameters.iteritems()])
# serialize as a url for a GET request
def to_url(self):
return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata())
# return a string that consists of all the parameters that need to be signed
def get_normalized_parameters(self):
params = self.parameters
try:
# exclude the signature if it exists
del params['oauth_signature']
except:
pass
key_values = params.items()
# sort lexicographically, first after key, then after value
key_values.sort()
# combine key value pairs in string and escape
return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) for k, v in key_values])
# just uppercases the http method
def get_normalized_http_method(self):
return self.http_method.upper()
# parses the url and rebuilds it to be scheme://host/path
def get_normalized_http_url(self):
parts = urlparse.urlparse(self.http_url)
url_string = '%s://%s%s' % (parts[0], parts[1], parts[2]) # scheme, netloc, path
return url_string
# set the signature parameter to the result of build_signature
def sign_request(self, signature_method, consumer, token):
# set the signature method
self.set_parameter('oauth_signature_method', signature_method.get_name())
# set the signature
self.set_parameter('oauth_signature', self.build_signature(signature_method, consumer, token))
def build_signature(self, signature_method, consumer, token):
# call the build signature method within the signature method
return signature_method.build_signature(self, consumer, token)
def from_request(http_method, http_url, headers=None, parameters=None, query_string=None):
# combine multiple parameter sources
if parameters is None:
parameters = {}
# headers
if headers and 'Authorization' in headers:
auth_header = headers['Authorization']
# check that the authorization header is OAuth
if auth_header.index('OAuth') > -1:
try:
# get the parameters from the header
header_params = OAuthRequest._split_header(auth_header)
parameters.update(header_params)
except:
raise OAuthError('Unable to parse OAuth parameters from Authorization header.')
# GET or POST query string
if query_string:
query_params = OAuthRequest._split_url_string(query_string)
parameters.update(query_params)
# URL parameters
param_str = urlparse.urlparse(http_url)[4] # query
url_params = OAuthRequest._split_url_string(param_str)
parameters.update(url_params)
if parameters:
return OAuthRequest(http_method, http_url, parameters)
return None
from_request = staticmethod(from_request)
def from_consumer_and_token(oauth_consumer, token=None, http_method=HTTP_METHOD, http_url=None, parameters=None):
if not parameters:
parameters = {}
defaults = {
'oauth_consumer_key': oauth_consumer.key,
'oauth_timestamp': generate_timestamp(),
'oauth_nonce': generate_nonce(),
'oauth_version': OAuthRequest.version,
}
defaults.update(parameters)
parameters = defaults
if token:
parameters['oauth_token'] = token.key
return OAuthRequest(http_method, http_url, parameters)
from_consumer_and_token = staticmethod(from_consumer_and_token)
def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD, http_url=None, parameters=None):
if not parameters:
parameters = {}
parameters['oauth_token'] = token.key
if callback:
parameters['oauth_callback'] = callback
return OAuthRequest(http_method, http_url, parameters)
from_token_and_callback = staticmethod(from_token_and_callback)
# util function: turn Authorization: header into parameters, has to do some unescaping
def _split_header(header):
params = {}
parts = header.split(',')
for param in parts:
# ignore realm parameter
if param.find('OAuth realm') > -1:
continue
# remove whitespace
param = param.strip()
# split key-value
param_parts = param.split('=', 1)
# remove quotes and unescape the value
params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"'))
return params
_split_header = staticmethod(_split_header)
# util function: turn url string into parameters, has to do some unescaping
def _split_url_string(param_str):
parameters = cgi.parse_qs(param_str, keep_blank_values=False)
for k, v in parameters.iteritems():
parameters[k] = urllib.unquote(v[0])
return parameters
_split_url_string = staticmethod(_split_url_string)
# OAuthServer is a worker to check a requests validity against a data store
class OAuthServer(object):
timestamp_threshold = 300 # in seconds, five minutes
version = VERSION
signature_methods = None
data_store = None
def __init__(self, data_store=None, signature_methods=None):
self.data_store = data_store
self.signature_methods = signature_methods or {}
def set_data_store(self, oauth_data_store):
self.data_store = data_store
def get_data_store(self):
return self.data_store
def add_signature_method(self, signature_method):
self.signature_methods[signature_method.get_name()] = signature_method
return self.signature_methods
# process a request_token request
# returns the request token on success
def fetch_request_token(self, oauth_request):
try:
# get the request token for authorization
token = self._get_token(oauth_request, 'request')
except OAuthError:
# no token required for the initial token request
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
self._check_signature(oauth_request, consumer, None)
# fetch a new token
token = self.data_store.fetch_request_token(consumer)
return token
# process an access_token request
# returns the access token on success
def fetch_access_token(self, oauth_request):
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
# get the request token
token = self._get_token(oauth_request, 'request')
self._check_signature(oauth_request, consumer, token)
new_token = self.data_store.fetch_access_token(consumer, token)
return new_token
# verify an api call, checks all the parameters
def verify_request(self, oauth_request):
# -> consumer and token
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
# get the access token
token = self._get_token(oauth_request, 'access')
self._check_signature(oauth_request, consumer, token)
parameters = oauth_request.get_nonoauth_parameters()
return consumer, token, parameters
# authorize a request token
def authorize_token(self, token, user):
return self.data_store.authorize_request_token(token, user)
# get the callback url
def get_callback(self, oauth_request):
return oauth_request.get_parameter('oauth_callback')
# optional support for the authenticate header
def build_authenticate_header(self, realm=''):
return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
# verify the correct version request for this server
def _get_version(self, oauth_request):
try:
version = oauth_request.get_parameter('oauth_version')
except:
version = VERSION
if version and version != self.version:
raise OAuthError('OAuth version %s not supported.' % str(version))
return version
# figure out the signature with some defaults
def _get_signature_method(self, oauth_request):
try:
signature_method = oauth_request.get_parameter('oauth_signature_method')
except:
signature_method = SIGNATURE_METHOD
try:
# get the signature method object
signature_method = self.signature_methods[signature_method]
except:
signature_method_names = ', '.join(self.signature_methods.keys())
raise OAuthError('Signature method %s not supported try one of the following: %s' % (signature_method, signature_method_names))
return signature_method
def _get_consumer(self, oauth_request):
consumer_key = oauth_request.get_parameter('oauth_consumer_key')
if not consumer_key:
raise OAuthError('Invalid consumer key.')
consumer = self.data_store.lookup_consumer(consumer_key)
if not consumer:
raise OAuthError('Invalid consumer.')
return consumer
# try to find the token for the provided request token key
def _get_token(self, oauth_request, token_type='access'):
token_field = oauth_request.get_parameter('oauth_token')
token = self.data_store.lookup_token(token_type, token_field)
if not token:
raise OAuthError('Invalid %s token: %s' % (token_type, token_field))
return token
def _check_signature(self, oauth_request, consumer, token):
timestamp, nonce = oauth_request._get_timestamp_nonce()
self._check_timestamp(timestamp)
self._check_nonce(consumer, token, nonce)
signature_method = self._get_signature_method(oauth_request)
try:
signature = oauth_request.get_parameter('oauth_signature')
except:
raise OAuthError('Missing signature.')
# validate the signature
valid_sig = signature_method.check_signature(oauth_request, consumer, token, signature)
if not valid_sig:
key, base = signature_method.build_signature_base_string(oauth_request, consumer, token)
raise OAuthError('Invalid signature. Expected signature base string: %s' % base)
built = signature_method.build_signature(oauth_request, consumer, token)
def _check_timestamp(self, timestamp):
# verify that timestamp is recentish
timestamp = int(timestamp)
now = int(time.time())
lapsed = now - timestamp
if lapsed > self.timestamp_threshold:
raise OAuthError('Expired timestamp: given %d and now %s has a greater difference than threshold %d' % (timestamp, now, self.timestamp_threshold))
def _check_nonce(self, consumer, token, nonce):
# verify that the nonce is uniqueish
nonce = self.data_store.lookup_nonce(consumer, token, nonce)
if nonce:
raise OAuthError('Nonce already used: %s' % str(nonce))
# OAuthClient is a worker to attempt to execute a request
class OAuthClient(object):
consumer = None
token = None
def __init__(self, oauth_consumer, oauth_token):
self.consumer = oauth_consumer
self.token = oauth_token
def get_consumer(self):
return self.consumer
def get_token(self):
return self.token
def fetch_request_token(self, oauth_request):
# -> OAuthToken
raise NotImplementedError
def fetch_access_token(self, oauth_request):
# -> OAuthToken
raise NotImplementedError
def access_resource(self, oauth_request):
# -> some protected resource
raise NotImplementedError
# OAuthDataStore is a database abstraction used to lookup consumers and tokens
class OAuthDataStore(object):
def lookup_consumer(self, key):
# -> OAuthConsumer
raise NotImplementedError
def lookup_token(self, oauth_consumer, token_type, token_token):
# -> OAuthToken
raise NotImplementedError
def lookup_nonce(self, oauth_consumer, oauth_token, nonce, timestamp):
# -> OAuthToken
raise NotImplementedError
def fetch_request_token(self, oauth_consumer):
# -> OAuthToken
raise NotImplementedError
def fetch_access_token(self, oauth_consumer, oauth_token):
# -> OAuthToken
raise NotImplementedError
def authorize_request_token(self, oauth_token, user):
# -> OAuthToken
raise NotImplementedError
# OAuthSignatureMethod is a strategy class that implements a signature method
class OAuthSignatureMethod(object):
def get_name(self):
# -> str
raise NotImplementedError
def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token):
# -> str key, str raw
raise NotImplementedError
def build_signature(self, oauth_request, oauth_consumer, oauth_token):
# -> str
raise NotImplementedError
def check_signature(self, oauth_request, consumer, token, signature):
built = self.build_signature(oauth_request, consumer, token)
return built == signature
class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod):
def get_name(self):
return 'HMAC-SHA1'
def build_signature_base_string(self, oauth_request, consumer, token):
sig = (
escape(oauth_request.get_normalized_http_method()),
escape(oauth_request.get_normalized_http_url()),
escape(oauth_request.get_normalized_parameters()),
)
key = '%s&' % escape(consumer.secret)
if token:
key += escape(token.secret)
raw = '&'.join(sig)
return key, raw
def build_signature(self, oauth_request, consumer, token):
# build the base signature string
key, raw = self.build_signature_base_string(oauth_request, consumer, token)
# hmac object
try:
import hashlib # 2.5
hashed = hmac.new(key, raw, hashlib.sha1)
except:
import sha # deprecated
hashed = hmac.new(key, raw, sha)
# calculate the digest base 64
return binascii.b2a_base64(hashed.digest())[:-1]
class OAuthSignatureMethod_PLAINTEXT(OAuthSignatureMethod):
def get_name(self):
return 'PLAINTEXT'
def build_signature_base_string(self, oauth_request, consumer, token):
# concatenate the consumer key and secret
sig = escape(consumer.secret) + '&'
if token:
sig = sig + escape(token.secret)
return sig
def build_signature(self, oauth_request, consumer, token):
return self.build_signature_base_string(oauth_request, consumer, token)

View file

@ -0,0 +1,299 @@
"""
A huge map of every Twitter API endpoint to a function definition in Twython.
Parameters that need to be embedded in the URL are treated with mustaches, e.g:
{{version}}, etc
When creating new endpoint definitions, keep in mind that the name of the mustache
will be replaced with the keyword that gets passed in to the function at call time.
i.e, in this case, if I pass version = 47 to any function, {{version}} will be replaced
with 47, instead of defaulting to 1 (said defaulting takes place at conversion time).
"""
# Base Twitter API url, no need to repeat this junk...
base_url = 'http://api.twitter.com/{{version}}'
api_table = {
'getRateLimitStatus': {
'url': '/account/rate_limit_status.json',
'method': 'GET',
},
# Timeline methods
'getPublicTimeline': {
'url': '/statuses/public_timeline.json',
'method': 'GET',
},
'getHomeTimeline': {
'url': '/statuses/home_timeline.json',
'method': 'GET',
},
'getUserTimeline': {
'url': '/statuses/user_timeline.json',
'method': 'GET',
},
'getFriendsTimeline': {
'url': '/statuses/friends_timeline.json',
'method': 'GET',
},
# Interfacing with friends/followers
'getUserMentions': {
'url': '/statuses/mentions.json',
'method': 'GET',
},
'getFriendsStatus': {
'url': '/statuses/friends.json',
'method': 'GET',
},
'getFollowersStatus': {
'url': '/statuses/followers.json',
'method': 'GET',
},
'createFriendship': {
'url': '/friendships/create.json',
'method': 'POST',
},
'destroyFriendship': {
'url': '/friendships/destroy.json',
'method': 'POST',
},
'getFriendsIDs': {
'url': '/friends/ids.json',
'method': 'GET',
},
'getFollowersIDs': {
'url': '/followers/ids.json',
'method': 'GET',
},
# Retweets
'reTweet': {
'url': '/statuses/retweet/{{id}}.json',
'method': 'POST',
},
'getRetweets': {
'url': '/statuses/retweets/{{id}}.json',
'method': 'GET',
},
'retweetedOfMe': {
'url': '/statuses/retweets_of_me.json',
'method': 'GET',
},
'retweetedByMe': {
'url': '/statuses/retweeted_by_me.json',
'method': 'GET',
},
'retweetedToMe': {
'url': '/statuses/retweeted_to_me.json',
'method': 'GET',
},
# User methods
'showUser': {
'url': '/users/show.json',
'method': 'GET',
},
'searchUsers': {
'url': '/users/search.json',
'method': 'GET',
},
# Status methods - showing, updating, destroying, etc.
'showStatus': {
'url': '/statuses/show/{{id}}.json',
'method': 'GET',
},
'updateStatus': {
'url': '/statuses/update.json',
'method': 'POST',
},
'destroyStatus': {
'url': '/statuses/destroy/{{id}}.json',
'method': 'POST',
},
# Direct Messages - getting, sending, effing, etc.
'getDirectMessages': {
'url': '/direct_messages.json',
'method': 'GET',
},
'getSentMessages': {
'url': '/direct_messages/sent.json',
'method': 'GET',
},
'sendDirectMessage': {
'url': '/direct_messages/new.json',
'method': 'POST',
},
'destroyDirectMessage': {
'url': '/direct_messages/destroy/{{id}}.json',
'method': 'POST',
},
# Friendship methods
'checkIfFriendshipExists': {
'url': '/friendships/exists.json',
'method': 'GET',
},
'showFriendship': {
'url': '/friendships/show.json',
'method': 'GET',
},
# Profile methods
'updateProfile': {
'url': '/account/update_profile.json',
'method': 'POST',
},
'updateProfileColors': {
'url': '/account/update_profile_colors.json',
'method': 'POST',
},
# Favorites methods
'getFavorites': {
'url': '/favorites.json',
'method': 'GET',
},
'createFavorite': {
'url': '/favorites/create/{{id}}.json',
'method': 'POST',
},
'destroyFavorite': {
'url': '/favorites/destroy/{{id}}.json',
'method': 'POST',
},
# Blocking methods
'createBlock': {
'url': '/blocks/create/{{id}}.json',
'method': 'POST',
},
'destroyBlock': {
'url': '/blocks/destroy/{{id}}.json',
'method': 'POST',
},
'getBlocking': {
'url': '/blocks/blocking.json',
'method': 'GET',
},
'getBlockedIDs': {
'url': '/blocks/blocking/ids.json',
'method': 'GET',
},
'checkIfBlockExists': {
'url': '/blocks/exists.json',
'method': 'GET',
},
# Trending methods
'getCurrentTrends': {
'url': '/trends/current.json',
'method': 'GET',
},
'getDailyTrends': {
'url': '/trends/daily.json',
'method': 'GET',
},
'getWeeklyTrends': {
'url': '/trends/weekly.json',
'method': 'GET',
},
'availableTrends': {
'url': '/trends/available.json',
'method': 'GET',
},
'trendsByLocation': {
'url': '/trends/{{woeid}}.json',
'method': 'GET',
},
# Saved Searches
'getSavedSearches': {
'url': '/saved_searches.json',
'method': 'GET',
},
'showSavedSearch': {
'url': '/saved_searches/show/{{id}}.json',
'method': 'GET',
},
'createSavedSearch': {
'url': '/saved_searches/create.json',
'method': 'GET',
},
'destroySavedSearch': {
'url': '/saved_searches/destroy/{{id}}.json',
'method': 'GET',
},
# List API methods/endpoints. Fairly exhaustive and annoying in general. ;P
'createList': {
'url': '/{{username}}/lists.json',
'method': 'POST',
},
'updateList': {
'url': '/{{username}}/lists/{{list_id}}.json',
'method': 'POST',
},
'showLists': {
'url': '/{{username}}/lists.json',
'method': 'GET',
},
'getListMemberships': {
'url': '/{{username}}/lists/followers.json',
'method': 'GET',
},
'deleteList': {
'url': '/{{username}}/lists/{{list_id}}.json',
'method': 'DELETE',
},
'getListTimeline': {
'url': '/{{username}}/lists/{{list_id}}/statuses.json',
'method': 'GET',
},
'getSpecificList': {
'url': '/{{username}}/lists/{{list_id}}/statuses.json',
'method': 'GET',
},
'addListMember': {
'url': '/{{username}}/{{list_id}}/members.json',
'method': 'POST',
},
'getListMembers': {
'url': '/{{username}}/{{list_id}}/members.json',
'method': 'GET',
},
'deleteListMember': {
'url': '/{{username}}/{{list_id}}/members.json',
'method': 'DELETE',
},
'subscribeToList': {
'url': '/{{username}}/{{list_id}}/following.json',
'method': 'POST',
},
'unsubscribeFromList': {
'url': '/{{username}}/{{list_id}}/following.json',
'method': 'DELETE',
},
# The one-offs
'notificationFollow': {
'url': '/notifications/follow/follow.json',
'method': 'POST',
},
'notificationLeave': {
'url': '/notifications/leave/leave.json',
'method': 'POST',
},
'updateDeliveryService': {
'url': '/account/update_delivery_device.json',
'method': 'POST',
},
'reportSpam': {
'url': '/report_spam.json',
'method': 'POST',
},
}

View file

@ -1,85 +0,0 @@
#!/usr/bin/python
"""
Twython-oauth (twyauth) is a separate library to handle OAuth routines with Twython. This currently doesn't work, as I never get the time to finish it.
Feel free to help out.
Questions, comments? ryan@venodesigns.net
"""
import httplib, urllib, urllib2, mimetypes, mimetools
from urlparse import urlparse
from urllib2 import HTTPError
try:
import oauth
except ImportError:
pass
class oauth:
def __init__(self, username, consumer_key, consumer_secret, signature_method = None, headers = None, version = 1):
"""oauth(username = None, consumer_secret = None, consumer_key = None, headers = None)
Instantiates an instance of Twython with OAuth. Takes optional parameters for authentication and such (see below).
Parameters:
username - Your Twitter username, if you want Basic (HTTP) Authentication.
consumer_secret - Consumer secret, given to you when you register your App with Twitter.
consumer_key - Consumer key (see situation with consumer_secret).
signature_method - Method for signing OAuth requests; defaults to oauth.OAuthSignatureMethod_HMAC_SHA1()
headers - User agent header.
version (number) - Twitter supports a "versioned" API as of Oct. 16th, 2009 - this defaults to 1, but can be overridden on a class and function-based basis.
"""
# OAuth specific variables below
self.request_token_url = 'https://api.twitter.com/%s/oauth/request_token' % version
self.access_token_url = 'https://api.twitter.com/%s/oauth/access_token' % version
self.authorization_url = 'http://api.twitter.com/%s/oauth/authorize' % version
self.signin_url = 'http://api.twitter.com/%s/oauth/authenticate' % version
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.request_token = None
self.access_token = None
self.consumer = None
self.connection = None
self.signature_method = None
self.consumer = oauth.OAuthConsumer(self.consumer_key, self.consumer_secret)
self.connection = httplib.HTTPSConnection("http://api.twitter.com")
def getOAuthResource(self, url, access_token, params, http_method="GET"):
"""getOAuthResource(self, url, access_token, params, http_method="GET")
Returns a signed OAuth object for use in requests.
"""
newRequest = oauth.OAuthRequest.from_consumer_and_token(consumer, token=self.access_token, http_method=http_method, http_url=url, parameters=parameters)
oauth_request.sign_request(self.signature_method, consumer, access_token)
return oauth_request
def getResponse(self, oauth_request, connection):
"""getResponse(self, oauth_request, connection)
Returns a JSON-ified list of results.
"""
url = oauth_request.to_url()
connection.request(oauth_request.http_method, url)
response = connection.getresponse()
return simplejson.load(response.read())
def getUnauthorisedRequestToken(self, consumer, connection, signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()):
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, consumer, http_url=self.request_token_url)
oauth_request.sign_request(signature_method, consumer, None)
resp = fetch_response(oauth_request, connection)
return oauth.OAuthToken.from_string(resp)
def getAuthorizationURL(self, consumer, token, signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()):
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, token=token, http_url=self.authorization_url)
oauth_request.sign_request(signature_method, consumer, token)
return oauth_request.to_url()
def exchangeRequestTokenForAccessToken(self, consumer, connection, request_token, signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()):
# May not be needed...
self.request_token = request_token
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, token = request_token, http_url=self.access_token_url)
oauth_request.sign_request(signature_method, consumer, request_token)
resp = fetch_response(oauth_request, connection)
return oauth.OAuthToken.from_string(resp)

417
twython/twython.py Normal file
View file

@ -0,0 +1,417 @@
#!/usr/bin/python
"""
Twython is a library for Python that wraps the Twitter API.
It aims to abstract away all the API endpoints, so that additions to the library
and/or the Twitter API won't cause any overall problems.
Questions, comments? ryan@venodesigns.net
"""
__author__ = "Ryan McGrath <ryan@venodesigns.net>"
__version__ = "1.3"
import urllib
import urllib2
import urlparse
import httplib
import httplib2
import mimetypes
import mimetools
import re
import oauth2 as oauth
# Twython maps keyword based arguments to Twitter API endpoints. The endpoints
# table is a file with a dictionary of every API endpoint that Twython supports.
from twitter_endpoints import base_url, api_table
from urllib2 import HTTPError
# There are some special setups (like, oh, a Django application) where
# simplejson exists behind the scenes anyway. Past Python 2.6, this should
# never really cause any problems to begin with.
try:
# Python 2.6 and up
import json as simplejson
except ImportError:
try:
# Python 2.6 and below (2.4/2.5, 2.3 is not guranteed to work with this library to begin with)
import simplejson
except ImportError:
try:
# This case gets rarer by the day, but if we need to, we can pull it from Django provided it's there.
from django.utils import simplejson
except:
# Seriously wtf is wrong with you if you get this Exception.
raise Exception("Twython requires the simplejson library (or Python 2.6) to work. http://www.undefined.org/python/")
class TwythonError(Exception):
"""
Generic error class, catch-all for most Twython issues.
Special cases are handled by APILimit and AuthError.
Note: To use these, the syntax has changed as of Twython 1.3. To catch these,
you need to explicitly import them into your code, e.g:
from twython import TwythonError, APILimit, AuthError
"""
def __init__(self, msg, error_code=None):
self.msg = msg
if error_code == 400:
raise APILimit(msg)
def __str__(self):
return repr(self.msg)
class APILimit(TwythonError):
"""
Raised when you've hit an API limit. Try to avoid these, read the API
docs if you're running into issues here, Twython does not concern itself with
this matter beyond telling you that you've done goofed.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class AuthError(TwythonError):
"""
Raised when you try to access a protected resource and it fails due to some issue with
your authentication.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class Twython(object):
def __init__(self, twitter_token = None, twitter_secret = None, oauth_token = None, oauth_token_secret = None, headers=None):
"""setup(self, oauth_token = None, headers = None)
Instantiates an instance of Twython. Takes optional parameters for authentication and such (see below).
Parameters:
twitter_token - Given to you when you register your application with Twitter.
twitter_secret - Given to you when you register your application with Twitter.
oauth_token - If you've gone through the authentication process and have a token for this user,
pass it in and it'll be used for all requests going forward.
oauth_token_secret - see oauth_token; it's the other half.
headers - User agent header, dictionary style ala {'User-Agent': 'Bert'}
** Note: versioning is not currently used by search.twitter functions; when Twitter moves their junk, it'll be supported.
"""
# Needed for hitting that there API.
self.request_token_url = 'http://twitter.com/oauth/request_token'
self.access_token_url = 'http://twitter.com/oauth/access_token'
self.authorize_url = 'http://twitter.com/oauth/authorize'
self.authenticate_url = 'http://twitter.com/oauth/authenticate'
self.twitter_token = twitter_token
self.twitter_secret = twitter_secret
self.oauth_token = oauth_token
self.oauth_secret = oauth_token_secret
# If there's headers, set them, otherwise be an embarassing parent for their own good.
self.headers = headers
if self.headers is None:
headers = {'User-agent': 'Twython Python Twitter Library v1.3'}
consumer = None
token = None
if self.twitter_token is not None and self.twitter_secret is not None:
consumer = oauth.Consumer(self.twitter_token, self.twitter_secret)
if self.oauth_token is not None and self.oauth_secret is not None:
token = oauth.Token(oauth_token, oauth_token_secret)
# Filter down through the possibilities here - if they have a token, if they're first stage, etc.
if consumer is not None and token is not None:
self.client = oauth.Client(consumer, token)
elif consumer is not None:
self.client = oauth.Client(consumer)
else:
# If they don't do authentication, but still want to request unprotected resources, we need an opener.
self.client = httplib2.Http()
def __getattr__(self, api_call):
"""
The most magically awesome block of code you'll see in 2010.
Rather than list out 9 million damn methods for this API, we just keep a table (see above) of
every API endpoint and their corresponding function id for this library. This pretty much gives
unlimited flexibility in API support - there's a slight chance of a performance hit here, but if this is
going to be your bottleneck... well, don't use Python. ;P
For those who don't get what's going on here, Python classes have this great feature known as __getattr__().
It's called when an attribute that was called on an object doesn't seem to exist - since it doesn't exist,
we can take over and find the API method in our table. We then return a function that downloads and parses
what we're looking for, based on the keywords passed in.
I'll hate myself for saying this, but this is heavily inspired by Ruby's "method_missing".
"""
def get(self, **kwargs):
# Go through and replace any mustaches that are in our API url.
fn = api_table[api_call]
base = re.sub(
'\{\{(?P<m>[a-zA-Z]+)\}\}',
lambda m: "%s" % kwargs.get(m.group(1), '1'), # The '1' here catches the API version. Slightly hilarious.
base_url + fn['url']
)
# Then open and load that shiiit, yo. TODO: check HTTP method and junk, handle errors/authentication
if fn['method'] == 'POST':
resp, content = self.client.request(base, fn['method'], urllib.urlencode(kwargs))
else:
url = base + "?" + "&".join(["%s=%s" %(key, value) for (key, value) in kwargs.iteritems()])
resp, content = self.client.request(url, fn['method'])
return simplejson.loads(content)
if api_call in api_table:
return get.__get__(self)
else:
raise AttributeError, api_call
def get_authentication_tokens(self):
"""
get_auth_url(self)
Returns an authorization URL for a user to hit.
"""
resp, content = self.client.request(self.request_token_url, "GET")
if resp['status'] != '200':
raise AuthError("Seems something couldn't be verified with your OAuth junk. Error: %s, Message: %s" % (resp['status'], content))
request_tokens = dict(urlparse.parse_qsl(content))
request_tokens['auth_url'] = "%s?oauth_token=%s" % (self.authenticate_url, request_tokens['oauth_token'])
return request_tokens
def get_authorized_tokens(self):
"""
get_authorized_tokens
Returns authorized tokens after they go through the auth_url phase.
"""
resp, content = self.client.request(self.access_token_url, "GET")
return dict(urlparse.parse_qsl(content))
# ------------------------------------------------------------------------------------------------------------------------
# The following methods are all different in some manner or require special attention with regards to the Twitter API.
# Because of this, we keep them separate from all the other endpoint definitions - ideally this should be change-able,
# but it's not high on the priority list at the moment.
# ------------------------------------------------------------------------------------------------------------------------
@staticmethod
def shortenURL(url_to_shorten, shortener = "http://is.gd/api.php", query = "longurl"):
"""shortenURL(url_to_shorten, shortener = "http://is.gd/api.php", query = "longurl")
Shortens url specified by url_to_shorten.
Parameters:
url_to_shorten - URL to shorten.
shortener - In case you want to use a url shortening service other than is.gd.
"""
try:
resp, content = self.client.request(
shortener + "?" + urllib.urlencode({query: Twython.unicode2utf8(url_to_shorten)}),
"GET"
)
return content
except HTTPError, e:
raise TwythonError("shortenURL() failed with a %s error code." % `e.code`)
def bulkUserLookup(self, ids = None, screen_names = None, version = None):
""" bulkUserLookup(self, ids = None, screen_names = None, version = None)
A method to do bulk user lookups against the Twitter API. Arguments (ids (numbers) / screen_names (strings)) should be flat Arrays that
contain their respective data sets.
Statuses for the users in question will be returned inline if they exist. Requires authentication!
"""
apiURL = "http://api.twitter.com/1/users/lookup.json?lol=1"
if ids is not None:
apiURL += "&user_id="
for id in ids:
apiURL += `id` + ","
if screen_names is not None:
apiURL += "&screen_name="
for name in screen_names:
apiURL += name + ","
try:
resp, content = self.client.request(apiURL, "GET")
return simplejson.loads(content)
except HTTPError, e:
raise TwythonError("bulkUserLookup() failed with a %s error code." % `e.code`, e.code)
def searchTwitter(self, **kwargs):
"""searchTwitter(search_query, **kwargs)
Returns tweets that match a specified query.
Parameters:
See the documentation at http://dev.twitter.com/doc/get/search. Pass in the API supported arguments as named parameters.
e.g x.searchTwitter(q="jjndf", page="2")
"""
searchURL = self.constructApiURL("http://search.twitter.com/search.json", kwargs) + "&" + urllib.urlencode({"q": self.unicode2utf8(search_query)})
try:
resp, content = self.client.request(searchURL, "GET")
return simplejson.loads(content)
except HTTPError, e:
raise TwythonError("getSearchTimeline() failed with a %s error code." % `e.code`, e.code)
def searchTwitterGen(self, **kwargs):
"""searchTwitterGen(search_query, **kwargs)
Returns a generator of tweets that match a specified query.
Parameters:
See the documentation at http://dev.twitter.com/doc/get/search. Pass in the API supported arguments as named parameters.
e.g x.searchTwitter(q="jjndf", page="2")
"""
searchURL = self.constructApiURL("http://search.twitter.com/search.json", kwargs) + "&" + urllib.urlencode({"q": self.unicode2utf8(search_query)})
try:
resp, content = self.client.request(searchURL, "GET")
data = simplejson.loads(content)
except HTTPError, e:
raise TwythonError("searchTwitterGen() failed with a %s error code." % `e.code`, e.code)
if not data['results']:
raise StopIteration
for tweet in data['results']:
yield tweet
if 'page' not in kwargs:
kwargs['page'] = 2
else:
kwargs['page'] += 1
for tweet in self.searchTwitterGen(search_query, **kwargs):
yield tweet
def isListMember(self, list_id, id, username, version = 1):
""" isListMember(self, list_id, id, version)
Check if a specified user (id) is a member of the list in question (list_id).
**Note: This method may not work for private/protected lists, unless you're authenticated and have access to those lists.
Parameters:
list_id - Required. The slug of the list to check against.
id - Required. The ID of the user being checked in the list.
username - User who owns the list you're checking against (username)
version (number) - Optional. API version to request. Entire Twython class defaults to 1, but you can override on a function-by-function or class basis - (version=2), etc.
"""
try:
resp, content = self.client.request("http://api.twitter.com/%d/%s/%s/members/%s.json" % (version, username, list_id, `id`))
return simplejson.loads(content)
except HTTPError, e:
raise TwythonError("isListMember() failed with a %d error code." % e.code, e.code)
def isListSubscriber(self, list_id, id, version = 1):
""" isListSubscriber(self, list_id, id, version)
Check if a specified user (id) is a subscriber of the list in question (list_id).
**Note: This method may not work for private/protected lists, unless you're authenticated and have access to those lists.
Parameters:
list_id - Required. The slug of the list to check against.
id - Required. The ID of the user being checked in the list.
username - Required. The username of the owner of the list that you're seeing if someone is subscribed to.
version (number) - Optional. API version to request. Entire Twython class defaults to 1, but you can override on a function-by-function or class basis - (version=2), etc.
"""
try:
resp, content = "http://api.twitter.com/%d/%s/%s/following/%s.json" % (version, username, list_id, `id`)
return simplejson.loads(content)
except HTTPError, e:
raise TwythonError("isListMember() failed with a %d error code." % e.code, e.code)
# The following methods are apart from the other Account methods, because they rely on a whole multipart-data posting function set.
def updateProfileBackgroundImage(self, filename, tile="true", version = 1):
""" updateProfileBackgroundImage(filename, tile="true")
Updates the authenticating user's profile background image.
Parameters:
image - Required. Must be a valid GIF, JPG, or PNG image of less than 800 kilobytes in size. Images with width larger than 2048 pixels will be forceably scaled down.
tile - Optional (defaults to true). If set to true the background image will be displayed tiled. The image will not be tiled otherwise.
** Note: It's sad, but when using this method, pass the tile value as a string, e.g tile="false"
version (number) - Optional. API version to request. Entire Twython class defaults to 1, but you can override on a function-by-function or class basis - (version=2), etc.
"""
try:
files = [("image", filename, open(filename, 'rb').read())]
fields = []
content_type, body = Twython.encode_multipart_formdata(fields, files)
headers = {'Content-Type': content_type, 'Content-Length': str(len(body))}
r = urllib2.Request("http://api.twitter.com/%d/account/update_profile_background_image.json?tile=%s" % (version, tile), body, headers)
return urllib2.urlopen(r).read()
except HTTPError, e:
raise TwythonError("updateProfileBackgroundImage() failed with a %d error code." % e.code, e.code)
def updateProfileImage(self, filename, version = 1):
""" updateProfileImage(filename)
Updates the authenticating user's profile image (avatar).
Parameters:
image - Required. Must be a valid GIF, JPG, or PNG image of less than 700 kilobytes in size. Images with width larger than 500 pixels will be scaled down.
version (number) - Optional. API version to request. Entire Twython class defaults to 1, but you can override on a function-by-function or class basis - (version=2), etc.
"""
try:
files = [("image", filename, open(filename, 'rb').read())]
fields = []
content_type, body = Twython.encode_multipart_formdata(fields, files)
headers = {'Content-Type': content_type, 'Content-Length': str(len(body))}
r = urllib2.Request("http://api.twitter.com/%d/account/update_profile_image.json" % version, body, headers)
return urllib2.urlopen(r).read()
except HTTPError, e:
raise TwythonError("updateProfileImage() failed with a %d error code." % e.code, e.code)
@staticmethod
def encode_multipart_formdata(fields, files):
BOUNDARY = mimetools.choose_boundary()
CRLF = '\r\n'
L = []
for (key, value) in fields:
L.append('--' + BOUNDARY)
L.append('Content-Disposition: form-data; name="%s"' % key)
L.append('')
L.append(value)
for (key, filename, value) in files:
L.append('--' + BOUNDARY)
L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
L.append('Content-Type: %s' % Twython.get_content_type(filename))
L.append('')
L.append(value)
L.append('--' + BOUNDARY + '--')
L.append('')
body = CRLF.join(L)
content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
return content_type, body
@staticmethod
def get_content_type(filename):
""" get_content_type(self, filename)
Exactly what you think it does. :D
"""
return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
@staticmethod
def unicode2utf8(text):
try:
if isinstance(text, unicode):
text = text.encode('utf-8')
except:
pass
return text