From e8660df23c5f9488bae8b571ac15844a4866d4ec Mon Sep 17 00:00:00 2001 From: Ryan McGrath Date: Thu, 25 Jun 2009 02:12:00 -0400 Subject: [PATCH] An experimental version of Tango for Python 3k --- tango_3k.py | 694 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 694 insertions(+) create mode 100644 tango_3k.py diff --git a/tango_3k.py b/tango_3k.py new file mode 100644 index 0000000..32a1a6d --- /dev/null +++ b/tango_3k.py @@ -0,0 +1,694 @@ +#!/usr/bin/python + +""" + Tango is an up-to-date library for Python that wraps the Twitter API. + Other Python Twitter libraries seem to have fallen a bit behind, and + Twitter's API has evolved a bit. Here's hoping this helps. + + TODO: OAuth, Streaming API? + + Questions, comments? ryan@venodesigns.net +""" + +import http.client, urllib, urllib.request, urllib.error, urllib.parse, mimetypes, mimetools + +from urllib.error import HTTPError + +try: + import simplejson +except ImportError: + print("Tango requires the simplejson library to work. http://www.undefined.org/python/") + +try: + import oauth +except ImportError: + print("Tango requires oauth for authentication purposes. http://oauth.googlecode.com/svn/code/python/oauth/oauth.py") + +class setup: + def __init__(self, authtype = "OAuth", username = None, password = None, oauth_keys = None, debug = False): + self.authtype = authtype + self.authenticated = False + self.username = username + self.password = password + self.oauth_keys = oauth_keys + self.debug = debug + if self.username is not None and self.password is not None: + if self.authtype == "OAuth": + pass + elif self.authtype == "Basic": + self.auth_manager = urllib.request.HTTPPasswordMgrWithDefaultRealm() + self.auth_manager.add_password(None, "http://twitter.com", self.username, self.password) + self.handler = urllib.request.HTTPBasicAuthHandler(self.auth_manager) + self.opener = urllib.request.build_opener(self.handler) + self.authenticated = True + + # OAuth functions; shortcuts for verifying the credentials. + def fetch_response_oauth(self, oauth_request): + pass + + def get_unauthorized_request_token(self): + pass + + def get_authorization_url(self, token): + pass + + def exchange_tokens(self, request_token): + pass + + # URL Shortening function huzzah + def shortenURL(self, url_to_shorten, shortener = "http://is.gd/api.php", query = "longurl"): + try: + return urllib.request.urlopen(shortener + "?" + urllib.urlencode({query: url_to_shorten})).read() + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("shortenURL() failed with a " + str(e.code) + " error code.") + + def constructApiURL(self, base_url, params, **kwargs): + queryURL = base_url + questionMarkUsed = False + if ("questionMarkUsed" in kwargs) is True: + questionMarkUsed = True + for param in params: + if params[param] is not None: + queryURL += (("&" if questionMarkUsed is True else "?") + param + "=" + params[param]) + questionMarkUsed = True + return queryURL + + def getRateLimitStatus(self, rate_for = "requestingIP"): + try: + if rate_for == "requestingIP": + return simplejson.load(urllib.request.urlopen("http://twitter.com/account/rate_limit_status.json")) + else: + return simplejson.load(self.opener.open("http://twitter.com/account/rate_limit_status.json")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("It seems that there's something wrong. Twitter gave you a " + str(e.code) + " error code; are you doing something you shouldn't be?") + + def getPublicTimeline(self): + try: + return simplejson.load(urllib.request.urlopen("http://twitter.com/statuses/public_timeline.json")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getPublicTimeline() failed with a " + str(e.code) + " error code.") + + def getFriendsTimeline(self, **kwargs): + if self.authenticated is True: + try: + friendsTimelineURL = self.constructApiURL("http://twitter.com/statuses/friends_timeline.json", kwargs) + return simplejson.load(self.opener.open(friendsTimelineURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getFriendsTimeline() failed with a " + str(e.code) + " error code.") + else: + print("getFriendsTimeline() requires you to be authenticated.") + pass + + def getUserTimeline(self, id = None, **kwargs): + if id is not None and ("user_id" in kwargs) is False and ("screen_name" in kwargs) is False: + userTimelineURL = self.constructApiURL("http://twitter.com/statuses/user_timeline/" + id + ".json", kwargs) + elif id is None and ("user_id" in kwargs) is False and ("screen_name" in kwargs) is False and self.authenticated is True: + userTimelineURL = self.constructApiURL("http://twitter.com/statuses/user_timeline/" + self.username + ".json", kwargs) + else: + userTimelineURL = self.constructApiURL("http://twitter.com/statuses/user_timeline.json", kwargs) + try: + # We do our custom opener if we're authenticated, as it helps avoid cases where it's a protected user + if self.authenticated is True: + return simplejson.load(self.opener.open(userTimelineURL)) + else: + return simplejson.load(urllib.request.urlopen(userTimelineURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("Failed with a " + str(e.code) + " error code. Does this user hide/protect their updates? If so, you'll need to authenticate and be their friend to get their timeline.") + + def getUserMentions(self, **kwargs): + if self.authenticated is True: + try: + mentionsFeedURL = self.constructApiURL("http://twitter.com/statuses/mentions.json", kwargs) + return simplejson.load(self.opener.open(mentionsFeedURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getUserMentions() failed with a " + str(e.code) + " error code.") + else: + print("getUserMentions() requires you to be authenticated.") + pass + + def showStatus(self, id): + try: + return simplejson.load(self.opener.open("http://twitter.com/statuses/show/" + id + ".json")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("Failed with a " + str(e.code) + " error code. Does this user hide/protect their updates? If so, you'll need to authenticate and be their friend to get their timeline.") + pass + + def updateStatus(self, status, in_reply_to_status_id = None): + if len(list(status)) > 140: + print("This status message is over 140 characters, but we're gonna try it anyway. Might wanna watch this!") + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/statuses/update.json?", urllib.parse.urlencode({"status": status}, {"in_reply_to_status_id": in_reply_to_status_id}))) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("updateStatus() failed with a " + str(e.code) + " error code.") + else: + print("updateStatus() requires you to be authenticated.") + pass + + def destroyStatus(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/status/destroy/" + id + ".json", "POST")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("destroyStatus() failed with a " + str(e.code) + " error code.") + else: + print("destroyStatus() requires you to be authenticated.") + pass + + def endSession(self): + if self.authenticated is True: + try: + self.opener.open("http://twitter.com/account/end_session.json", "") + self.authenticated = False + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("endSession failed with a " + str(e.code) + " error code.") + else: + print("You can't end a session when you're not authenticated to begin with.") + pass + + def getDirectMessages(self, since_id = None, max_id = None, count = None, page = "1"): + if self.authenticated is True: + apiURL = "http://twitter.com/direct_messages.json?page=" + page + if since_id is not None: + apiURL += "&since_id=" + since_id + if max_id is not None: + apiURL += "&max_id=" + max_id + if count is not None: + apiURL += "&count=" + count + + try: + return simplejson.load(self.opener.open(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getDirectMessages() failed with a " + str(e.code) + " error code.") + else: + print("getDirectMessages() requires you to be authenticated.") + + def getSentMessages(self, since_id = None, max_id = None, count = None, page = "1"): + if self.authenticated is True: + apiURL = "http://twitter.com/direct_messages/sent.json?page=" + page + if since_id is not None: + apiURL += "&since_id=" + since_id + if max_id is not None: + apiURL += "&max_id=" + max_id + if count is not None: + apiURL += "&count=" + count + + try: + return simplejson.load(self.opener.open(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getSentMessages() failed with a " + str(e.code) + " error code.") + else: + print("getSentMessages() requires you to be authenticated.") + + def sendDirectMessage(self, user, text): + if self.authenticated is True: + if len(list(text)) < 140: + try: + return self.opener.open("http://twitter.com/direct_messages/new.json", urllib.parse.urlencode({"user": user, "text": text})) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("sendDirectMessage() failed with a " + str(e.code) + " error code.") + else: + print("Your message must be longer than 140 characters") + else: + print("You must be authenticated to send a new direct message.") + + def destroyDirectMessage(self, id): + if self.authenticated is True: + try: + return self.opener.open("http://twitter.com/direct_messages/destroy/" + id + ".json", "") + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("destroyDirectMessage() failed with a " + str(e.code) + " error code.") + else: + print("You must be authenticated to destroy a direct message.") + + def createFriendship(self, id = None, user_id = None, screen_name = None, follow = "false"): + if self.authenticated is True: + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/friendships/create/" + id + ".json" + "?follow=" + follow + if user_id is not None: + apiURL = "http://twitter.com/friendships/create.json?user_id=" + user_id + "&follow=" + follow + if screen_name is not None: + apiURL = "http://twitter.com/friendships/create.json?screen_name=" + screen_name + "&follow=" + follow + try: + return simplejson.load(self.opener.open(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("createFriendship() failed with a " + str(e.code) + " error code. ") + if e.code == 403: + print("It seems you've hit the update limit for this method. Try again in 24 hours.") + else: + print("createFriendship() requires you to be authenticated.") + + def destroyFriendship(self, id = None, user_id = None, screen_name = None): + if self.authenticated is True: + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/friendships/destroy/" + id + ".json" + if user_id is not None: + apiURL = "http://twitter.com/friendships/destroy.json?user_id=" + user_id + if screen_name is not None: + apiURL = "http://twitter.com/friendships/destroy.json?screen_name=" + screen_name + try: + return simplejson.load(self.opener.open(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("destroyFriendship() failed with a " + str(e.code) + " error code.") + else: + print("destroyFriendship() requires you to be authenticated.") + + def checkIfFriendshipExists(self, user_a, user_b): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/friendships/exists.json", urllib.parse.urlencode({"user_a": user_a, "user_b": user_b}))) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("checkIfFriendshipExists() failed with a " + str(e.code) + " error code.") + else: + print("checkIfFriendshipExists(), oddly, requires that you be authenticated.") + + def updateDeliveryDevice(self, device_name = "none"): + if self.authenticated is True: + try: + return self.opener.open("http://twitter.com/account/update_delivery_device.json?", urllib.parse.urlencode({"device": device_name})) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("updateDeliveryDevice() failed with a " + str(e.code) + " error code.") + else: + print("updateDeliveryDevice() requires you to be authenticated.") + + def updateProfileColors(self, **kwargs): + if self.authenticated is True: + try: + return self.opener.open(self.constructApiURL("http://twitter.com/account/update_profile_colors.json?", kwargs)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("updateProfileColors() failed with a " + str(e.code) + " error code.") + else: + print("updateProfileColors() requires you to be authenticated.") + + def updateProfile(self, name = None, email = None, url = None, location = None, description = None): + if self.authenticated is True: + useAmpersands = False + updateProfileQueryString = "" + if name is not None: + if len(list(name)) < 20: + updateProfileQueryString += "name=" + name + useAmpersands = True + else: + print("Twitter has a character limit of 20 for all usernames. Try again.") + if email is not None and "@" in email: + if len(list(email)) < 40: + if useAmpersands is True: + updateProfileQueryString += "&email=" + email + else: + updateProfileQueryString += "email=" + email + useAmpersands = True + else: + print("Twitter has a character limit of 40 for all email addresses, and the email address must be valid. Try again.") + if url is not None: + if len(list(url)) < 100: + if useAmpersands is True: + updateProfileQueryString += "&" + urllib.parse.urlencode({"url": url}) + else: + updateProfileQueryString += urllib.parse.urlencode({"url": url}) + useAmpersands = True + else: + print("Twitter has a character limit of 100 for all urls. Try again.") + if location is not None: + if len(list(location)) < 30: + if useAmpersands is True: + updateProfileQueryString += "&" + urllib.parse.urlencode({"location": location}) + else: + updateProfileQueryString += urllib.parse.urlencode({"location": location}) + useAmpersands = True + else: + print("Twitter has a character limit of 30 for all locations. Try again.") + if description is not None: + if len(list(description)) < 160: + if useAmpersands is True: + updateProfileQueryString += "&" + urllib.parse.urlencode({"description": description}) + else: + updateProfileQueryString += urllib.parse.urlencode({"description": description}) + else: + print("Twitter has a character limit of 160 for all descriptions. Try again.") + + if updateProfileQueryString != "": + try: + return self.opener.open("http://twitter.com/account/update_profile.json?", updateProfileQueryString) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("updateProfile() failed with a " + e.code + " error code.") + else: + # If they're not authenticated + print("updateProfile() requires you to be authenticated.") + + def getFavorites(self, page = "1"): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/favorites.json?page=" + page)) + except HTTPError as e: + if self.debug: + print(e.headers) + print("getFavorites() failed with a " + str(e.code) + " error code.") + else: + print("getFavorites() requires you to be authenticated.") + + def createFavorite(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/favorites/create/" + id + ".json", "")) + except HTTPError as e: + if self.debug: + print(e.headers) + print("createFavorite() failed with a " + str(e.code) + " error code.") + else: + print("createFavorite() requires you to be authenticated.") + + def destroyFavorite(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/favorites/destroy/" + id + ".json", "")) + except HTTPError as e: + if self.debug: + print(e.headers) + print("destroyFavorite() failed with a " + str(e.code) + " error code.") + else: + print("destroyFavorite() requires you to be authenticated.") + + def notificationFollow(self, id = None, user_id = None, screen_name = None): + if self.authenticated is True: + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/notifications/follow/" + id + ".json" + if user_id is not None: + apiURL = "http://twitter.com/notifications/follow/follow.json?user_id=" + user_id + if screen_name is not None: + apiURL = "http://twitter.com/notifications/follow/follow.json?screen_name=" + screen_name + try: + return simplejson.load(self.opener.open(apiURL, "")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("notificationFollow() failed with a " + str(e.code) + " error code.") + else: + print("notificationFollow() requires you to be authenticated.") + + def notificationLeave(self, id = None, user_id = None, screen_name = None): + if self.authenticated is True: + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/notifications/leave/" + id + ".json" + if user_id is not None: + apiURL = "http://twitter.com/notifications/leave/leave.json?user_id=" + user_id + if screen_name is not None: + apiURL = "http://twitter.com/notifications/leave/leave.json?screen_name=" + screen_name + try: + return simplejson.load(self.opener.open(apiURL, "")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("notificationLeave() failed with a " + str(e.code) + " error code.") + else: + print("notificationLeave() requires you to be authenticated.") + + def getFriendsIDs(self, id = None, user_id = None, screen_name = None, page = "1"): + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/friends/ids/" + id + ".json" + "?page=" + page + if user_id is not None: + apiURL = "http://twitter.com/friends/ids.json?user_id=" + user_id + "&page=" + page + if screen_name is not None: + apiURL = "http://twitter.com/friends/ids.json?screen_name=" + screen_name + "&page=" + page + try: + return simplejson.load(urllib.request.urlopen(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getFriendsIDs() failed with a " + str(e.code) + " error code.") + + def getFollowersIDs(self, id = None, user_id = None, screen_name = None, page = "1"): + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/followers/ids/" + id + ".json" + "?page=" + page + if user_id is not None: + apiURL = "http://twitter.com/followers/ids.json?user_id=" + user_id + "&page=" + page + if screen_name is not None: + apiURL = "http://twitter.com/followers/ids.json?screen_name=" + screen_name + "&page=" + page + try: + return simplejson.load(urllib.request.urlopen(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getFollowersIDs() failed with a " + str(e.code) + " error code.") + + def createBlock(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/blocks/create/" + id + ".json", "")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("createBlock() failed with a " + str(e.code) + " error code.") + else: + print("createBlock() requires you to be authenticated.") + + def destroyBlock(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/blocks/destroy/" + id + ".json", "")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("destroyBlock() failed with a " + str(e.code) + " error code.") + else: + print("destroyBlock() requires you to be authenticated.") + + def checkIfBlockExists(self, id = None, user_id = None, screen_name = None): + apiURL = "" + if id is not None: + apiURL = "http://twitter.com/blocks/exists/" + id + ".json" + if user_id is not None: + apiURL = "http://twitter.com/blocks/exists.json?user_id=" + user_id + if screen_name is not None: + apiURL = "http://twitter.com/blocks/exists.json?screen_name=" + screen_name + try: + return simplejson.load(urllib.request.urlopen(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("checkIfBlockExists() failed with a " + str(e.code) + " error code.") + + def getBlocking(self, page = "1"): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/blocks/blocking.json?page=" + page)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getBlocking() failed with a " + str(e.code) + " error code.") + else: + print("getBlocking() requires you to be authenticated") + + def getBlockedIDs(self): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/blocks/blocking/ids.json")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getBlockedIDs() failed with a " + str(e.code) + " error code.") + else: + print("getBlockedIDs() requires you to be authenticated.") + + def searchTwitter(self, search_query, **kwargs): + baseURL = "http://search.twitter.com/search.json?" + urllib.parse.urlencode({"q": search_query}) + searchURL = self.constructApiURL(baseURL, kwargs, questionMarkUsed=True) + try: + return simplejson.load(urllib.request.urlopen(searchURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getSearchTimeline() failed with a " + str(e.code) + " error code.") + + def getCurrentTrends(self, excludeHashTags = False): + apiURL = "http://search.twitter.com/trends/current.json" + if excludeHashTags is True: + apiURL += "?exclude=hashtags" + try: + return simplejson.load(urllib.urlopen(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getCurrentTrends() failed with a " + str(e.code) + " error code.") + + def getDailyTrends(self, date = None, exclude = False): + apiURL = "http://search.twitter.com/trends/daily.json" + questionMarkUsed = False + if date is not None: + apiURL += "?date=" + date + questionMarkUsed = True + if exclude is True: + if questionMarkUsed is True: + apiURL += "&exclude=hashtags" + else: + apiURL += "?exclude=hashtags" + try: + return simplejson.load(urllib.urlopen(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getDailyTrends() failed with a " + str(e.code) + " error code.") + + def getWeeklyTrends(self, date = None, exclude = False): + apiURL = "http://search.twitter.com/trends/daily.json" + questionMarkUsed = False + if date is not None: + apiURL += "?date=" + date + questionMarkUsed = True + if exclude is True: + if questionMarkUsed is True: + apiURL += "&exclude=hashtags" + else: + apiURL += "?exclude=hashtags" + try: + return simplejson.load(urllib.urlopen(apiURL)) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getWeeklyTrends() failed with a " + str(e.code) + " error code.") + + def getSavedSearches(self): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/saved_searches.json")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("getSavedSearches() failed with a " + str(e.code) + " error code.") + else: + print("getSavedSearches() requires you to be authenticated.") + + def showSavedSearch(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/saved_searches/show/" + id + ".json")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("showSavedSearch() failed with a " + str(e.code) + " error code.") + else: + print("showSavedSearch() requires you to be authenticated.") + + def createSavedSearch(self, query): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/saved_searches/create.json?query=" + query, "")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("createSavedSearch() failed with a " + str(e.code) + " error code.") + else: + print("createSavedSearch() requires you to be authenticated.") + + def destroySavedSearch(self, id): + if self.authenticated is True: + try: + return simplejson.load(self.opener.open("http://twitter.com/saved_searches/destroy/" + id + ".json", "")) + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("destroySavedSearch() failed with a " + str(e.code) + " error code.") + else: + print("destroySavedSearch() requires you to be authenticated.") + + # The following methods are apart from the other Account methods, because they rely on a whole multipart-data posting function set. + def updateProfileBackgroundImage(self, filename, tile="true"): + if self.authenticated is True: + try: + files = [("image", filename, open(filename).read())] + fields = [] + content_type, body = self.encode_multipart_formdata(fields, files) + headers = {'Content-Type': content_type, 'Content-Length': str(len(body))} + r = urllib.request.Request("http://twitter.com/account/update_profile_background_image.json?tile=" + tile, body, headers) + return self.opener.open(r).read() + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("updateProfileBackgroundImage() failed with a " + str(e.code) + " error code.") + else: + print("You realize you need to be authenticated to change a background image, right?") + + def updateProfileImage(self, filename): + if self.authenticated is True: + try: + files = [("image", filename, open(filename).read())] + fields = [] + content_type, body = self.encode_multipart_formdata(fields, files) + headers = {'Content-Type': content_type, 'Content-Length': str(len(body))} + r = urllib.request.Request("http://twitter.com/account/update_profile_image.json", body, headers) + return self.opener.open(r).read() + except HTTPError as e: + if self.debug is True: + print(e.headers) + print("updateProfileImage() failed with a " + str(e.code) + " error code.") + else: + print("You realize you need to be authenticated to change a profile image, right?") + + def encode_multipart_formdata(self, fields, files): + BOUNDARY = mimetools.choose_boundary() + CRLF = '\r\n' + L = [] + for (key, value) in fields: + L.append('--' + BOUNDARY) + L.append('Content-Disposition: form-data; name="%s"' % key) + L.append('') + L.append(value) + for (key, filename, value) in files: + L.append('--' + BOUNDARY) + L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)) + L.append('Content-Type: %s' % self.get_content_type(filename)) + L.append('') + L.append(value) + L.append('--' + BOUNDARY + '--') + L.append('') + body = CRLF.join(L) + content_type = 'multipart/form-data; boundary=%s' % BOUNDARY + return content_type, body + + def get_content_type(self, filename): + return mimetypes.guess_type(filename)[0] or 'application/octet-stream'