Refactor message handling in TwythonStreamer #225

Merged
extesy merged 2 commits from master into master 2013-06-21 07:37:48 -07:00
Showing only changes of commit 009779dda0 - Show all commits

View file

@ -20,7 +20,7 @@ import time
class TwythonStreamer(object): class TwythonStreamer(object):
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret, def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
timeout=300, retry_count=None, retry_in=10, headers=None): timeout=300, retry_count=None, retry_in=10, headers=None, handlers=None):
"""Streaming class for a friendly streaming user experience """Streaming class for a friendly streaming user experience
Authentication IS required to use the Twitter Streaming API Authentication IS required to use the Twitter Streaming API
@ -38,6 +38,8 @@ class TwythonStreamer(object):
API call should be tried again API call should be tried again
:param headers: (optional) Custom headers to send along with the :param headers: (optional) Custom headers to send along with the
request request
:param handlers: (optional) Array of message types for which
corresponding handlers will be called
""" """
self.auth = OAuth1(app_key, app_secret, self.auth = OAuth1(app_key, app_secret,
@ -67,6 +69,8 @@ class TwythonStreamer(object):
self.connected = False self.connected = False
self.handlers = handlers if handlers else ['delete', 'limit', 'disconnect']
def _request(self, url, method='GET', params=None): def _request(self, url, method='GET', params=None):
"""Internal stream request handling""" """Internal stream request handling"""
self.connected = True self.connected = True
@ -103,18 +107,24 @@ class TwythonStreamer(object):
break break
if line: if line:
try: try:
if not is_py3: if is_py3:
self.on_success(json.loads(line))
else:
line = line.decode('utf-8') line = line.decode('utf-8')
self.on_success(json.loads(line)) data = json.loads(line)
if self.on_success(data):
for message_type in self.handlers:
if message_type in data:
handler = getattr(self, 'on_' + message_type, None)
if handler and callable(handler):
if not handler(data.get(message_type)):
break
except ValueError: except ValueError:
self.on_error(response.status_code, 'Unable to decode response, not vaild JSON.') self.on_error(response.status_code, 'Unable to decode response, not valid JSON.')
response.close() response.close()
def on_success(self, data): # pragma: no cover def on_success(self, data): # pragma: no cover
"""Called when data has been successfull received from the stream """Called when data has been successfully received from the stream.
Returns True if other handlers for this message should be invoked.
Feel free to override this to handle your streaming data how you Feel free to override this to handle your streaming data how you
want it handled. want it handled.
@ -124,13 +134,7 @@ class TwythonStreamer(object):
:param data: data recieved from the stream :param data: data recieved from the stream
:type data: dict :type data: dict
""" """
return True
if 'delete' in data:
self.on_delete(data.get('delete'))
elif 'limit' in data:
self.on_limit(data.get('limit'))
elif 'disconnect' in data:
self.on_disconnect(data.get('disconnect'))
def on_error(self, status_code, data): # pragma: no cover def on_error(self, status_code, data): # pragma: no cover
"""Called when stream returns non-200 status code """Called when stream returns non-200 status code
@ -146,45 +150,6 @@ class TwythonStreamer(object):
""" """
return return
def on_delete(self, data): # pragma: no cover
"""Called when a deletion notice is received
Feel free to override this to handle your streaming data how you
want it handled.
Twitter docs for deletion notices: http://spen.se/8qujd
:param data: data from the 'delete' key recieved from the stream
:type data: dict
"""
return
def on_limit(self, data): # pragma: no cover
"""Called when a limit notice is received
Feel free to override this to handle your streaming data how you
want it handled.
Twitter docs for limit notices: http://spen.se/hzt0b
:param data: data from the 'limit' key recieved from the stream
:type data: dict
"""
return
def on_disconnect(self, data): # pragma: no cover
"""Called when a disconnect notice is received
Feel free to override this to handle your streaming data how you
want it handled.
Twitter docs for disconnect notices: http://spen.se/xb6mm
:param data: data from the 'disconnect' key recieved from the stream
:type data: dict
"""
return
def on_timeout(self): # pragma: no cover def on_timeout(self): # pragma: no cover
""" Called when the request has timed out """ """ Called when the request has timed out """
return return