diff --git a/twython/streaming/api.py b/twython/streaming/api.py index 21b3a7f..9545935 100644 --- a/twython/streaming/api.py +++ b/twython/streaming/api.py @@ -20,7 +20,7 @@ import time class TwythonStreamer(object): def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret, - timeout=300, retry_count=None, retry_in=10, headers=None): + timeout=300, retry_count=None, retry_in=10, headers=None, handlers=None): """Streaming class for a friendly streaming user experience Authentication IS required to use the Twitter Streaming API @@ -38,6 +38,8 @@ class TwythonStreamer(object): API call should be tried again :param headers: (optional) Custom headers to send along with the request + :param handlers: (optional) Array of message types for which + corresponding handlers will be called """ self.auth = OAuth1(app_key, app_secret, @@ -67,6 +69,8 @@ class TwythonStreamer(object): self.connected = False + self.handlers = handlers if handlers else ['delete', 'limit', 'disconnect'] + def _request(self, url, method='GET', params=None): """Internal stream request handling""" self.connected = True @@ -103,18 +107,24 @@ class TwythonStreamer(object): break if line: try: - if not is_py3: - self.on_success(json.loads(line)) - else: + if is_py3: line = line.decode('utf-8') - self.on_success(json.loads(line)) + data = json.loads(line) + if self.on_success(data): + for message_type in self.handlers: + if message_type in data: + handler = getattr(self, 'on_' + message_type, None) + if handler and callable(handler): + if not handler(data.get(message_type)): + break except ValueError: - self.on_error(response.status_code, 'Unable to decode response, not vaild JSON.') + self.on_error(response.status_code, 'Unable to decode response, not valid JSON.') response.close() def on_success(self, data): # pragma: no cover - """Called when data has been successfull received from the stream + """Called when data has been successfully received from the stream. + Returns True if other handlers for this message should be invoked. Feel free to override this to handle your streaming data how you want it handled. @@ -124,13 +134,7 @@ class TwythonStreamer(object): :param data: data recieved from the stream :type data: dict """ - - if 'delete' in data: - self.on_delete(data.get('delete')) - elif 'limit' in data: - self.on_limit(data.get('limit')) - elif 'disconnect' in data: - self.on_disconnect(data.get('disconnect')) + return True def on_error(self, status_code, data): # pragma: no cover """Called when stream returns non-200 status code @@ -146,45 +150,6 @@ class TwythonStreamer(object): """ return - def on_delete(self, data): # pragma: no cover - """Called when a deletion notice is received - - Feel free to override this to handle your streaming data how you - want it handled. - - Twitter docs for deletion notices: http://spen.se/8qujd - - :param data: data from the 'delete' key recieved from the stream - :type data: dict - """ - return - - def on_limit(self, data): # pragma: no cover - """Called when a limit notice is received - - Feel free to override this to handle your streaming data how you - want it handled. - - Twitter docs for limit notices: http://spen.se/hzt0b - - :param data: data from the 'limit' key recieved from the stream - :type data: dict - """ - return - - def on_disconnect(self, data): # pragma: no cover - """Called when a disconnect notice is received - - Feel free to override this to handle your streaming data how you - want it handled. - - Twitter docs for disconnect notices: http://spen.se/xb6mm - - :param data: data from the 'disconnect' key recieved from the stream - :type data: dict - """ - return - def on_timeout(self): # pragma: no cover """ Called when the request has timed out """ return