Fixing streaming

This commit is contained in:
Mike Helmick 2013-05-02 15:12:36 -04:00
parent e18bff97d3
commit c3e84bc8ee
10 changed files with 290 additions and 81 deletions

2
.gitignore vendored
View file

@ -35,3 +35,5 @@ nosetests.xml
.project .project
.pydevproject .pydevproject
twython/.DS_Store twython/.DS_Store
test.py

View file

@ -1,6 +1,11 @@
History History
------- -------
2.9.0 (2013-05-xx)
++++++++++++++++++
- Fixed streaming issue #144, added ``TwythonStreamer`` and ``TwythonStreamHandler`` to aid users in a friendly streaming experience
2.8.0 (2013-04-29) 2.8.0 (2013-04-29)
++++++++++++++++++ ++++++++++++++++++

View file

@ -103,23 +103,26 @@ except TwythonAuthError as e:
``` ```
##### Streaming API ##### Streaming API
*Usage is as follows; it's designed to be open-ended enough that you can adapt it to higher-level (read: Twitter must give you access)
streams.*
```python ```python
from twython import Twython from twython import TwythonStreamer, TwythonStreamHandler
def on_results(results):
"""A callback to handle passed results. Wheeee.
"""
print results class MyHandler(TwythonStreamHandler):
def on_success(self, data):
print data
Twython.stream({ def on_error(self, status_code, data):
'username': 'your_username', print status_code, data
'password': 'your_password',
'track': 'python' handler = MyHandler()
}, on_results)
# Requires Authentication as of Twitter API v1.1
stream = TwythonStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
handler)
stream.statuses.filter(track='twitter')
``` ```
Notes Notes

View file

@ -111,24 +111,27 @@ Catching exceptions
Streaming API Streaming API
~~~~~~~~~~~~~ ~~~~~~~~~~~~~
*Usage is as follows; it's designed to be open-ended enough that you can adapt it to higher-level (read: Twitter must give you access)
streams.*
:: ::
from twython import Twython from twython import TwythonStreamer, TwythonStreamHandler
def on_results(results):
"""A callback to handle passed results. Wheeee.
"""
print results
Twython.stream({ class MyHandler(TwythonStreamHandler):
'username': 'your_username', def on_success(self, data):
'password': 'your_password', print data
'track': 'python'
}, on_results) def on_error(self, status_code, data):
print status_code, data
handler = MyHandler()
# Requires Authentication as of Twitter API v1.1
stream = TwythonStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
handler)
stream.statuses.filter(track='twitter')
Notes Notes

20
examples/stream.py Normal file
View file

@ -0,0 +1,20 @@
from twython import TwythonStreamer, TwythonStreamHandler
class MyHandler(TwythonStreamHandler):
def on_success(self, data):
print data
def on_error(self, status_code, data):
print status_code, data
handler = MyHandler()
# Requires Authentication as of Twitter API v1.1
stream = TwythonStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
handler)
stream.statuses.filter(track='twitter')
#stream.user(track='twitter')
#stream.site(follow='twitter')

View file

@ -4,7 +4,7 @@ import sys
from setuptools import setup from setuptools import setup
__author__ = 'Ryan McGrath <ryan@venodesigns.net>' __author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '2.8.0' __version__ = '2.9.0'
packages = [ packages = [
'twython' 'twython'

View file

@ -18,7 +18,8 @@ Questions, comments? ryan@venodesigns.net
""" """
__author__ = 'Ryan McGrath <ryan@venodesigns.net>' __author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '2.8.0' __version__ = '2.9.0'
from .twython import Twython from .twython import Twython
from .streaming import TwythonStreamer, TwythonStreamHandler
from .exceptions import TwythonError, TwythonRateLimitError, TwythonAuthError from .exceptions import TwythonError, TwythonRateLimitError, TwythonAuthError

View file

@ -46,3 +46,8 @@ class TwythonRateLimitError(TwythonError):
if isinstance(retry_after, int): if isinstance(retry_after, int):
msg = '%s (Retry after %d seconds)' % (msg, retry_after) msg = '%s (Retry after %d seconds)' % (msg, retry_after)
TwythonError.__init__(self, msg, error_code=error_code) TwythonError.__init__(self, msg, error_code=error_code)
class TwythonStreamError(TwythonError):
"""Test"""
pass

224
twython/streaming.py Normal file
View file

@ -0,0 +1,224 @@
from . import __version__
from .compat import json
from .exceptions import TwythonStreamError
import requests
from requests_oauthlib import OAuth1
import time
class TwythonStreamHandler(object):
def on_success(self, data):
"""Called when data has been successfull received from the stream
Feel free to override this in your own handler.
See https://dev.twitter.com/docs/streaming-apis/messages for messages
sent along in stream responses.
:param data: dict of data recieved from the stream
"""
if 'delete' in data:
self.on_delete(data.get('delete'))
elif 'limit' in data:
self.on_limit(data.get('limit'))
elif 'disconnect' in data:
self.on_disconnect(data.get('disconnect'))
def on_error(self, status_code, data):
"""Called when stream returns non-200 status code
:param status_code: Non-200 status code sent from stream
:param data: Error message sent from stream
"""
return
def on_delete(self, data):
"""Called when a deletion notice is received
Twitter docs for deletion notices: http://spen.se/8qujd
:param data: dict of data from the 'delete' key recieved from
the stream
"""
return data
def on_limit(self, data):
"""Called when a limit notice is received
Twitter docs for limit notices: http://spen.se/hzt0b
:param data: dict of data from the 'limit' key recieved from
the stream
"""
return data
def on_disconnect(self, data):
"""Called when a disconnect notice is received
Twitter docs for disconnect notices: http://spen.se/xb6mm
:param data: dict of data from the 'disconnect' key recieved from
the stream
"""
return data
def on_timeout(self):
return
class TwythonStreamStatuses(object):
"""Class for different statuses endpoints
Available so TwythonStreamer.statuses.filter() is available.
Just a bit cleaner than TwythonStreamer.statuses_filter(),
statuses_sample(), etc. all being single methods in TwythonStreamer
"""
def __init__(self, streamer):
self.streamer = streamer
def filter(self, **params):
"""Stream statuses/filter
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/post/statuses/filter
"""
url = 'https://stream.twitter.com/%s/statuses/filter.json' \
% self.streamer.api_version
self.streamer._request(url, 'POST', params=params)
def sample(self, **params):
"""Stream statuses/sample
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/sample
"""
url = 'https://stream.twitter.com/%s/statuses/sample.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def firehose(self, **params):
"""Stream statuses/filter
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/firehose
"""
url = 'https://stream.twitter.com/%s/statuses/firehose.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
class TwythonStreamTypes(object):
"""Class for different stream endpoints
Not all streaming endpoints have nested endpoints.
User Streams and Site Streams are single streams with no nested endpoints
Status Streams include filter, sample and firehose endpoints
"""
def __init__(self, streamer):
self.streamer = streamer
self.statuses = TwythonStreamStatuses(streamer)
def user(self, **params):
"""Stream user
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/user
"""
url = 'https://userstream.twitter.com/%s/user.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def site(self, **params):
"""Stream site
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/site
"""
url = 'https://sitestream.twitter.com/%s/site.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
class TwythonStreamer(object):
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
handler, timeout=300, retry_count=None, retry_in=10,
headers=None):
"""Streaming class for a friendly streaming user experience
:param app_key: (required) Your applications key
:param app_secret: (required) Your applications secret key
:param oauth_token: (required) Used with oauth_token_secret to make
authenticated calls
:param oauth_token_secret: (required) Used with oauth_token to make
authenticated calls
:param handler: (required) Instance of TwythonStreamHandler to handle
stream responses
:param headers: (optional) Custom headers to send along with the
request
"""
self.auth = OAuth1(app_key, app_secret,
oauth_token, oauth_token_secret)
self.headers = {'User-Agent': 'Twython Streaming v' + __version__}
if headers:
self.headers.update(headers)
self.client = requests.Session()
self.client.auth = self.auth
self.client.headers = self.headers
self.client.stream = True
self.timeout = timeout
self.api_version = '1.1'
self.handler = handler
self.retry_in = retry_in
self.retry_count = retry_count
# Set up type methods
StreamTypes = TwythonStreamTypes(self)
self.statuses = StreamTypes.statuses
self.__dict__['user'] = StreamTypes.user
self.__dict__['site'] = StreamTypes.site
def _request(self, url, method='GET', params=None):
"""Internal stream request handling"""
retry_counter = 0
method = method.lower()
func = getattr(self.client, method)
def _send(retry_counter):
try:
if method == 'get':
response = func(url, params=params, timeout=self.timeout)
else:
response = func(url, data=params, timeout=self.timeout)
except requests.exceptions.Timeout:
self.handler.on_timeout()
else:
if response.status_code != 200:
self.handler.on_error(response.status_code,
response.content)
if self.retry_count and (self.retry_count - retry_counter) > 0:
time.sleep(self.retry_in)
retry_counter += 1
_send(retry_counter)
return response
response = _send(retry_counter)
for line in response.iter_lines():
if line:
try:
self.handler.on_success(json.loads(line))
except ValueError:
raise TwythonStreamError('Response was not valid JSON, \
unable to decode.')

View file

@ -405,60 +405,6 @@ class Twython(object):
########################################################################### ###########################################################################
@staticmethod
def stream(data, callback):
"""A Streaming API endpoint, because requests (by Kenneth Reitz)
makes this not stupidly annoying to implement.
In reality, Twython does absolutely *nothing special* here,
but people new to programming expect this type of function to
exist for this library, so we provide it for convenience.
Seriously, this is nothing special. :)
For the basic stream you're probably accessing, you'll want to
pass the following as data dictionary keys. If you need to use
OAuth (newer streams), passing secrets/etc
as keys SHOULD work...
This is all done over SSL (https://), so you're not left
totally vulnerable by passing your password.
:param username: (required) Username, self explanatory.
:param password: (required) The Streaming API doesn't use OAuth,
so we do this the old school way.
:param callback: (required) Callback function to be fired when
tweets come in (this is an event-based-ish API).
:param endpoint: (optional) Override the endpoint you're using
with the Twitter Streaming API. This is defaulted
to the one that everyone has access to, but if
Twitter <3's you feel free to set this to your
wildest desires.
"""
endpoint = 'https://stream.twitter.com/1/statuses/filter.json'
if 'endpoint' in data:
endpoint = data.pop('endpoint')
needs_basic_auth = False
if 'username' in data and 'password' in data:
needs_basic_auth = True
username = data.pop('username')
password = data.pop('password')
if needs_basic_auth:
stream = requests.post(endpoint,
data=data,
auth=(username, password))
else:
stream = requests.post(endpoint, data=data)
for line in stream.iter_lines():
if line:
try:
callback(json.loads(line))
except ValueError:
raise TwythonError('Response was not valid JSON, unable to decode.')
@staticmethod @staticmethod
def unicode2utf8(text): def unicode2utf8(text):
try: try: