Merge pull request #187 from ryanmcgrath/streaming

Fixing streaming, fixes #144
This commit is contained in:
Mike Helmick 2013-05-04 12:05:12 -07:00
commit 44936d5f09
12 changed files with 292 additions and 81 deletions

2
.gitignore vendored
View file

@ -35,3 +35,5 @@ nosetests.xml
.project
.pydevproject
twython/.DS_Store
test.py

View file

@ -1,6 +1,11 @@
History
-------
2.9.0 (2013-05-04)
++++++++++++++++++
- Fixed streaming issue #144, added ``TwythonStreamer`` to aid users in a friendly streaming experience (streaming examples in ``examples`` and README's have been updated as well)
2.8.0 (2013-04-29)
++++++++++++++++++

View file

@ -103,23 +103,23 @@ except TwythonAuthError as e:
```
##### Streaming API
*Usage is as follows; it's designed to be open-ended enough that you can adapt it to higher-level (read: Twitter must give you access)
streams.*
```python
from twython import Twython
from twython import TwythonStreamer
def on_results(results):
"""A callback to handle passed results. Wheeee.
"""
print results
class MyStreamer(TwythonStreamer):
def on_success(self, data):
print data
Twython.stream({
'username': 'your_username',
'password': 'your_password',
'track': 'python'
}, on_results)
def on_error(self, status_code, data):
print status_code, data
# Requires Authentication as of Twitter API v1.1
stream = MyStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
stream.statuses.filter(track='twitter')
```
Notes

View file

@ -111,24 +111,24 @@ Catching exceptions
Streaming API
~~~~~~~~~~~~~
*Usage is as follows; it's designed to be open-ended enough that you can adapt it to higher-level (read: Twitter must give you access)
streams.*
::
from twython import Twython
from twython import TwythonStreamer
def on_results(results):
"""A callback to handle passed results. Wheeee.
"""
print results
class MyStreamer(TwythonStreamer):
def on_success(self, data):
print data
Twython.stream({
'username': 'your_username',
'password': 'your_password',
'track': 'python'
}, on_results)
def on_error(self, status_code, data):
print status_code, data
# Requires Authentication as of Twitter API v1.1
stream = MyStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
stream.statuses.filter(track='twitter')
Notes

17
examples/stream.py Normal file
View file

@ -0,0 +1,17 @@
from twython import TwythonStreamer
class MyStreamer(TwythonStreamer):
def on_success(self, data):
print data
def on_error(self, status_code, data):
print status_code, data
# Requires Authentication as of Twitter API v1.1
stream = MyStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
stream.statuses.filter(track='twitter')
#stream.user(track='twitter')
#stream.site(follow='twitter')

View file

@ -4,7 +4,7 @@ import sys
from setuptools import setup
__author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '2.8.0'
__version__ = '2.9.0'
packages = [
'twython'

View file

@ -18,7 +18,8 @@ Questions, comments? ryan@venodesigns.net
"""
__author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '2.8.0'
__version__ = '2.9.0'
from .twython import Twython
from .streaming import TwythonStreamer
from .exceptions import TwythonError, TwythonRateLimitError, TwythonAuthError

View file

@ -46,3 +46,8 @@ class TwythonRateLimitError(TwythonError):
if isinstance(retry_after, int):
msg = '%s (Retry after %d seconds)' % (msg, retry_after)
TwythonError.__init__(self, msg, error_code=error_code)
class TwythonStreamError(TwythonError):
"""Test"""
pass

View file

@ -0,0 +1 @@
from .api import TwythonStreamer

163
twython/streaming/api.py Normal file
View file

@ -0,0 +1,163 @@
from .. import __version__
from ..compat import json
from ..exceptions import TwythonStreamError
from .types import TwythonStreamerTypes
import requests
from requests_oauthlib import OAuth1
import time
class TwythonStreamer(object):
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
timeout=300, retry_count=None, retry_in=10, headers=None):
"""Streaming class for a friendly streaming user experience
:param app_key: (required) Your applications key
:param app_secret: (required) Your applications secret key
:param oauth_token: (required) Used with oauth_token_secret to make
authenticated calls
:param oauth_token_secret: (required) Used with oauth_token to make
authenticated calls
:param timeout: (optional) How long (in secs) the streamer should wait
for a response from Twitter Streaming API
:param retry_count: (optional) Number of times the API call should be
retired
:param retry_in: (optional) Amount of time (in secs) the previous
API call should be tried again
:param headers: (optional) Custom headers to send along with the
request
"""
self.auth = OAuth1(app_key, app_secret,
oauth_token, oauth_token_secret)
self.headers = {'User-Agent': 'Twython Streaming v' + __version__}
if headers:
self.headers.update(headers)
self.client = requests.Session()
self.client.auth = self.auth
self.client.headers = self.headers
self.client.stream = True
self.timeout = timeout
self.api_version = '1.1'
self.retry_in = retry_in
self.retry_count = retry_count
# Set up type methods
StreamTypes = TwythonStreamerTypes(self)
self.statuses = StreamTypes.statuses
self.user = StreamTypes.user
self.site = StreamTypes.site
def _request(self, url, method='GET', params=None):
"""Internal stream request handling"""
retry_counter = 0
method = method.lower()
func = getattr(self.client, method)
def _send(retry_counter):
try:
if method == 'get':
response = func(url, params=params, timeout=self.timeout)
else:
response = func(url, data=params, timeout=self.timeout)
except requests.exceptions.Timeout:
self.on_timeout()
else:
if response.status_code != 200:
self.on_error(response.status_code, response.content)
if self.retry_count and (self.retry_count - retry_counter) > 0:
time.sleep(self.retry_in)
retry_counter += 1
_send(retry_counter)
return response
response = _send(retry_counter)
for line in response.iter_lines():
if line:
try:
self.on_success(json.loads(line))
except ValueError:
raise TwythonStreamError('Response was not valid JSON, \
unable to decode.')
def on_success(self, data):
"""Called when data has been successfull received from the stream
Feel free to override this to handle your streaming data how you
want it handled.
See https://dev.twitter.com/docs/streaming-apis/messages for messages
sent along in stream responses.
:param data: dict of data recieved from the stream
"""
if 'delete' in data:
self.on_delete(data.get('delete'))
elif 'limit' in data:
self.on_limit(data.get('limit'))
elif 'disconnect' in data:
self.on_disconnect(data.get('disconnect'))
def on_error(self, status_code, data):
"""Called when stream returns non-200 status code
Feel free to override this to handle your streaming data how you
want it handled.
:param status_code: Non-200 status code sent from stream
:param data: Error message sent from stream
"""
return
def on_delete(self, data):
"""Called when a deletion notice is received
Feel free to override this to handle your streaming data how you
want it handled.
Twitter docs for deletion notices: http://spen.se/8qujd
:param data: dict of data from the 'delete' key recieved from
the stream
"""
return
def on_limit(self, data):
"""Called when a limit notice is received
Feel free to override this to handle your streaming data how you
want it handled.
Twitter docs for limit notices: http://spen.se/hzt0b
:param data: dict of data from the 'limit' key recieved from
the stream
"""
return
def on_disconnect(self, data):
"""Called when a disconnect notice is received
Feel free to override this to handle your streaming data how you
want it handled.
Twitter docs for disconnect notices: http://spen.se/xb6mm
:param data: dict of data from the 'disconnect' key recieved from
the stream
"""
return
def on_timeout(self):
return

View file

@ -0,0 +1,71 @@
class TwythonStreamerTypes(object):
"""Class for different stream endpoints
Not all streaming endpoints have nested endpoints.
User Streams and Site Streams are single streams with no nested endpoints
Status Streams include filter, sample and firehose endpoints
"""
def __init__(self, streamer):
self.streamer = streamer
self.statuses = TwythonStreamerTypesStatuses(streamer)
def user(self, **params):
"""Stream user
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/user
"""
url = 'https://userstream.twitter.com/%s/user.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def site(self, **params):
"""Stream site
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/site
"""
url = 'https://sitestream.twitter.com/%s/site.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
class TwythonStreamerTypesStatuses(object):
"""Class for different statuses endpoints
Available so TwythonStreamer.statuses.filter() is available.
Just a bit cleaner than TwythonStreamer.statuses_filter(),
statuses_sample(), etc. all being single methods in TwythonStreamer
"""
def __init__(self, streamer):
self.streamer = streamer
def filter(self, **params):
"""Stream statuses/filter
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/post/statuses/filter
"""
url = 'https://stream.twitter.com/%s/statuses/filter.json' \
% self.streamer.api_version
self.streamer._request(url, 'POST', params=params)
def sample(self, **params):
"""Stream statuses/sample
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/sample
"""
url = 'https://stream.twitter.com/%s/statuses/sample.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def firehose(self, **params):
"""Stream statuses/filter
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/firehose
"""
url = 'https://stream.twitter.com/%s/statuses/firehose.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)

View file

@ -405,60 +405,6 @@ class Twython(object):
###########################################################################
@staticmethod
def stream(data, callback):
"""A Streaming API endpoint, because requests (by Kenneth Reitz)
makes this not stupidly annoying to implement.
In reality, Twython does absolutely *nothing special* here,
but people new to programming expect this type of function to
exist for this library, so we provide it for convenience.
Seriously, this is nothing special. :)
For the basic stream you're probably accessing, you'll want to
pass the following as data dictionary keys. If you need to use
OAuth (newer streams), passing secrets/etc
as keys SHOULD work...
This is all done over SSL (https://), so you're not left
totally vulnerable by passing your password.
:param username: (required) Username, self explanatory.
:param password: (required) The Streaming API doesn't use OAuth,
so we do this the old school way.
:param callback: (required) Callback function to be fired when
tweets come in (this is an event-based-ish API).
:param endpoint: (optional) Override the endpoint you're using
with the Twitter Streaming API. This is defaulted
to the one that everyone has access to, but if
Twitter <3's you feel free to set this to your
wildest desires.
"""
endpoint = 'https://stream.twitter.com/1/statuses/filter.json'
if 'endpoint' in data:
endpoint = data.pop('endpoint')
needs_basic_auth = False
if 'username' in data and 'password' in data:
needs_basic_auth = True
username = data.pop('username')
password = data.pop('password')
if needs_basic_auth:
stream = requests.post(endpoint,
data=data,
auth=(username, password))
else:
stream = requests.post(endpoint, data=data)
for line in stream.iter_lines():
if line:
try:
callback(json.loads(line))
except ValueError:
raise TwythonError('Response was not valid JSON, unable to decode.')
@staticmethod
def unicode2utf8(text):
try: