diff --git a/.gitignore b/.gitignore index 4382a8d..7146bfd 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ nosetests.xml .project .pydevproject twython/.DS_Store + +test.py diff --git a/HISTORY.rst b/HISTORY.rst index 76b4c86..3a2853c 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,6 +1,11 @@ History ------- +2.9.0 (2013-05-xx) +++++++++++++++++++ + +- Fixed streaming issue #144, added ``TwythonStreamer`` and ``TwythonStreamHandler`` to aid users in a friendly streaming experience + 2.8.0 (2013-04-29) ++++++++++++++++++ diff --git a/README.md b/README.md index 38205f9..6d8bdc2 100644 --- a/README.md +++ b/README.md @@ -103,23 +103,26 @@ except TwythonAuthError as e: ``` ##### Streaming API -*Usage is as follows; it's designed to be open-ended enough that you can adapt it to higher-level (read: Twitter must give you access) -streams.* ```python -from twython import Twython +from twython import TwythonStreamer, TwythonStreamHandler -def on_results(results): - """A callback to handle passed results. Wheeee. - """ - print results +class MyHandler(TwythonStreamHandler): + def on_success(self, data): + print data -Twython.stream({ - 'username': 'your_username', - 'password': 'your_password', - 'track': 'python' -}, on_results) + def on_error(self, status_code, data): + print status_code, data + +handler = MyHandler() + +# Requires Authentication as of Twitter API v1.1 +stream = TwythonStreamer(APP_KEY, APP_SECRET, + OAUTH_TOKEN, OAUTH_TOKEN_SECRET, + handler) + +stream.statuses.filter(track='twitter') ``` Notes diff --git a/README.rst b/README.rst index 3b0117f..9c5f58e 100644 --- a/README.rst +++ b/README.rst @@ -111,24 +111,27 @@ Catching exceptions Streaming API ~~~~~~~~~~~~~ -*Usage is as follows; it's designed to be open-ended enough that you can adapt it to higher-level (read: Twitter must give you access) -streams.* :: - from twython import Twython - - def on_results(results): - """A callback to handle passed results. Wheeee. - """ + from twython import TwythonStreamer, TwythonStreamHandler - print results - Twython.stream({ - 'username': 'your_username', - 'password': 'your_password', - 'track': 'python' - }, on_results) + class MyHandler(TwythonStreamHandler): + def on_success(self, data): + print data + + def on_error(self, status_code, data): + print status_code, data + + handler = MyHandler() + + # Requires Authentication as of Twitter API v1.1 + stream = TwythonStreamer(APP_KEY, APP_SECRET, + OAUTH_TOKEN, OAUTH_TOKEN_SECRET, + handler) + + stream.statuses.filter(track='twitter') Notes diff --git a/examples/stream.py b/examples/stream.py new file mode 100644 index 0000000..0fee30c --- /dev/null +++ b/examples/stream.py @@ -0,0 +1,20 @@ +from twython import TwythonStreamer, TwythonStreamHandler + + +class MyHandler(TwythonStreamHandler): + def on_success(self, data): + print data + + def on_error(self, status_code, data): + print status_code, data + +handler = MyHandler() + +# Requires Authentication as of Twitter API v1.1 +stream = TwythonStreamer(APP_KEY, APP_SECRET, + OAUTH_TOKEN, OAUTH_TOKEN_SECRET, + handler) + +stream.statuses.filter(track='twitter') +#stream.user(track='twitter') +#stream.site(follow='twitter') diff --git a/setup.py b/setup.py index 8b7702b..14c5439 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import sys from setuptools import setup __author__ = 'Ryan McGrath ' -__version__ = '2.8.0' +__version__ = '2.9.0' packages = [ 'twython' diff --git a/twython/__init__.py b/twython/__init__.py index 76ce26c..481750e 100644 --- a/twython/__init__.py +++ b/twython/__init__.py @@ -18,7 +18,8 @@ Questions, comments? ryan@venodesigns.net """ __author__ = 'Ryan McGrath ' -__version__ = '2.8.0' +__version__ = '2.9.0' from .twython import Twython +from .streaming import TwythonStreamer, TwythonStreamHandler from .exceptions import TwythonError, TwythonRateLimitError, TwythonAuthError diff --git a/twython/exceptions.py b/twython/exceptions.py index 17736e4..265356a 100644 --- a/twython/exceptions.py +++ b/twython/exceptions.py @@ -46,3 +46,8 @@ class TwythonRateLimitError(TwythonError): if isinstance(retry_after, int): msg = '%s (Retry after %d seconds)' % (msg, retry_after) TwythonError.__init__(self, msg, error_code=error_code) + + +class TwythonStreamError(TwythonError): + """Test""" + pass diff --git a/twython/streaming.py b/twython/streaming.py new file mode 100644 index 0000000..0666367 --- /dev/null +++ b/twython/streaming.py @@ -0,0 +1,224 @@ +from . import __version__ +from .compat import json +from .exceptions import TwythonStreamError + +import requests +from requests_oauthlib import OAuth1 + +import time + + +class TwythonStreamHandler(object): + def on_success(self, data): + """Called when data has been successfull received from the stream + + Feel free to override this in your own handler. + See https://dev.twitter.com/docs/streaming-apis/messages for messages + sent along in stream responses. + + :param data: dict of data recieved from the stream + """ + + if 'delete' in data: + self.on_delete(data.get('delete')) + elif 'limit' in data: + self.on_limit(data.get('limit')) + elif 'disconnect' in data: + self.on_disconnect(data.get('disconnect')) + + def on_error(self, status_code, data): + """Called when stream returns non-200 status code + + :param status_code: Non-200 status code sent from stream + :param data: Error message sent from stream + """ + return + + def on_delete(self, data): + """Called when a deletion notice is received + + Twitter docs for deletion notices: http://spen.se/8qujd + + :param data: dict of data from the 'delete' key recieved from + the stream + """ + return data + + def on_limit(self, data): + """Called when a limit notice is received + + Twitter docs for limit notices: http://spen.se/hzt0b + + :param data: dict of data from the 'limit' key recieved from + the stream + """ + return data + + def on_disconnect(self, data): + """Called when a disconnect notice is received + + Twitter docs for disconnect notices: http://spen.se/xb6mm + + :param data: dict of data from the 'disconnect' key recieved from + the stream + """ + return data + + def on_timeout(self): + return + + +class TwythonStreamStatuses(object): + """Class for different statuses endpoints + + Available so TwythonStreamer.statuses.filter() is available. + Just a bit cleaner than TwythonStreamer.statuses_filter(), + statuses_sample(), etc. all being single methods in TwythonStreamer + """ + def __init__(self, streamer): + self.streamer = streamer + + def filter(self, **params): + """Stream statuses/filter + + Accepted params found at: + https://dev.twitter.com/docs/api/1.1/post/statuses/filter + """ + url = 'https://stream.twitter.com/%s/statuses/filter.json' \ + % self.streamer.api_version + self.streamer._request(url, 'POST', params=params) + + def sample(self, **params): + """Stream statuses/sample + + Accepted params found at: + https://dev.twitter.com/docs/api/1.1/get/statuses/sample + """ + url = 'https://stream.twitter.com/%s/statuses/sample.json' \ + % self.streamer.api_version + self.streamer._request(url, params=params) + + def firehose(self, **params): + """Stream statuses/filter + + Accepted params found at: + https://dev.twitter.com/docs/api/1.1/get/statuses/firehose + """ + url = 'https://stream.twitter.com/%s/statuses/firehose.json' \ + % self.streamer.api_version + self.streamer._request(url, params=params) + + +class TwythonStreamTypes(object): + """Class for different stream endpoints + + Not all streaming endpoints have nested endpoints. + User Streams and Site Streams are single streams with no nested endpoints + Status Streams include filter, sample and firehose endpoints + """ + def __init__(self, streamer): + self.streamer = streamer + self.statuses = TwythonStreamStatuses(streamer) + + def user(self, **params): + """Stream user + + Accepted params found at: + https://dev.twitter.com/docs/api/1.1/get/user + """ + url = 'https://userstream.twitter.com/%s/user.json' \ + % self.streamer.api_version + self.streamer._request(url, params=params) + + def site(self, **params): + """Stream site + + Accepted params found at: + https://dev.twitter.com/docs/api/1.1/get/site + """ + url = 'https://sitestream.twitter.com/%s/site.json' \ + % self.streamer.api_version + self.streamer._request(url, params=params) + + +class TwythonStreamer(object): + def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret, + handler, timeout=300, retry_count=None, retry_in=10, + headers=None): + """Streaming class for a friendly streaming user experience + + :param app_key: (required) Your applications key + :param app_secret: (required) Your applications secret key + :param oauth_token: (required) Used with oauth_token_secret to make + authenticated calls + :param oauth_token_secret: (required) Used with oauth_token to make + authenticated calls + :param handler: (required) Instance of TwythonStreamHandler to handle + stream responses + :param headers: (optional) Custom headers to send along with the + request + """ + + self.auth = OAuth1(app_key, app_secret, + oauth_token, oauth_token_secret) + + self.headers = {'User-Agent': 'Twython Streaming v' + __version__} + if headers: + self.headers.update(headers) + + self.client = requests.Session() + self.client.auth = self.auth + self.client.headers = self.headers + self.client.stream = True + + self.timeout = timeout + + self.api_version = '1.1' + + self.handler = handler + + self.retry_in = retry_in + self.retry_count = retry_count + + # Set up type methods + StreamTypes = TwythonStreamTypes(self) + self.statuses = StreamTypes.statuses + self.__dict__['user'] = StreamTypes.user + self.__dict__['site'] = StreamTypes.site + + def _request(self, url, method='GET', params=None): + """Internal stream request handling""" + retry_counter = 0 + + method = method.lower() + func = getattr(self.client, method) + + def _send(retry_counter): + try: + if method == 'get': + response = func(url, params=params, timeout=self.timeout) + else: + response = func(url, data=params, timeout=self.timeout) + except requests.exceptions.Timeout: + self.handler.on_timeout() + else: + if response.status_code != 200: + self.handler.on_error(response.status_code, + response.content) + + if self.retry_count and (self.retry_count - retry_counter) > 0: + time.sleep(self.retry_in) + retry_counter += 1 + _send(retry_counter) + + return response + + response = _send(retry_counter) + + for line in response.iter_lines(): + if line: + try: + self.handler.on_success(json.loads(line)) + except ValueError: + raise TwythonStreamError('Response was not valid JSON, \ + unable to decode.') diff --git a/twython/twython.py b/twython/twython.py index 1297436..7e1cbf1 100644 --- a/twython/twython.py +++ b/twython/twython.py @@ -405,60 +405,6 @@ class Twython(object): ########################################################################### - @staticmethod - def stream(data, callback): - """A Streaming API endpoint, because requests (by Kenneth Reitz) - makes this not stupidly annoying to implement. - - In reality, Twython does absolutely *nothing special* here, - but people new to programming expect this type of function to - exist for this library, so we provide it for convenience. - - Seriously, this is nothing special. :) - - For the basic stream you're probably accessing, you'll want to - pass the following as data dictionary keys. If you need to use - OAuth (newer streams), passing secrets/etc - as keys SHOULD work... - - This is all done over SSL (https://), so you're not left - totally vulnerable by passing your password. - - :param username: (required) Username, self explanatory. - :param password: (required) The Streaming API doesn't use OAuth, - so we do this the old school way. - :param callback: (required) Callback function to be fired when - tweets come in (this is an event-based-ish API). - :param endpoint: (optional) Override the endpoint you're using - with the Twitter Streaming API. This is defaulted - to the one that everyone has access to, but if - Twitter <3's you feel free to set this to your - wildest desires. - """ - endpoint = 'https://stream.twitter.com/1/statuses/filter.json' - if 'endpoint' in data: - endpoint = data.pop('endpoint') - - needs_basic_auth = False - if 'username' in data and 'password' in data: - needs_basic_auth = True - username = data.pop('username') - password = data.pop('password') - - if needs_basic_auth: - stream = requests.post(endpoint, - data=data, - auth=(username, password)) - else: - stream = requests.post(endpoint, data=data) - - for line in stream.iter_lines(): - if line: - try: - callback(json.loads(line)) - except ValueError: - raise TwythonError('Response was not valid JSON, unable to decode.') - @staticmethod def unicode2utf8(text): try: