twython/twython/streaming/api.py
2013-06-21 18:39:45 -04:00

180 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
"""
twython.streaming.api
~~~~~~~~~~~~~~~~~~~~~
This module contains functionality for interfacing with streaming
Twitter API calls.
"""
from .. import __version__
from ..compat import json, is_py3
from .types import TwythonStreamerTypes
import requests
from requests_oauthlib import OAuth1
import time
class TwythonStreamer(object):
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
timeout=300, retry_count=None, retry_in=10, client_args=None,
handlers=None):
"""Streaming class for a friendly streaming user experience
Authentication IS required to use the Twitter Streaming API
:param app_key: (required) Your applications key
:param app_secret: (required) Your applications secret key
:param oauth_token: (required) Used with oauth_token_secret to make
authenticated calls
:param oauth_token_secret: (required) Used with oauth_token to make
authenticated calls
:param timeout: (optional) How long (in secs) the streamer should wait
for a response from Twitter Streaming API
:param retry_count: (optional) Number of times the API call should be
retired
:param retry_in: (optional) Amount of time (in secs) the previous
API call should be tried again
:param client_args: (optional) Accepts some requests Session parameters and some requests Request parameters.
See http://docs.python-requests.org/en/latest/api/#sessionapi and requests section below it for details.
[ex. headers, proxies, verify(SSL verification)]
:param handlers: (optional) Array of message types for which
corresponding handlers will be called
"""
self.auth = OAuth1(app_key, app_secret,
oauth_token, oauth_token_secret)
self.client_args = client_args or {}
default_headers = {'User-Agent': 'Twython Streaming v' + __version__}
if not 'headers' in self.client_args:
# If they didn't set any headers, set our defaults for them
self.client_args['headers'] = default_headers
elif 'User-Agent' not in self.client_args['headers']:
# If they set headers, but didn't include User-Agent.. set it for them
self.client_args['headers'].update(default_headers)
self.client_args['timeout'] = timeout
self.client = requests.Session()
self.client.auth = self.auth
self.client.stream = True
# Make a copy of the client args and iterate over them
# Pop out all the acceptable args at this point because they will
# Never be used again.
client_args_copy = self.client_args.copy()
for k, v in client_args_copy.items():
if k in ('cert', 'headers', 'hooks', 'max_redirects', 'proxies'):
setattr(self.client, k, v)
self.client_args.pop(k) # Pop, pop!
self.api_version = '1.1'
self.retry_in = retry_in
self.retry_count = retry_count
# Set up type methods
StreamTypes = TwythonStreamerTypes(self)
self.statuses = StreamTypes.statuses
self.user = StreamTypes.user
self.site = StreamTypes.site
self.connected = False
self.handlers = handlers if handlers else ['delete', 'limit', 'disconnect']
def _request(self, url, method='GET', params=None):
"""Internal stream request handling"""
self.connected = True
retry_counter = 0
method = method.lower()
func = getattr(self.client, method)
def _send(retry_counter):
requests_args = {}
for k, v in self.client_args.items():
# Maybe this should be set as a class variable and only done once?
if k in ('timeout', 'allow_redirects', 'verify'):
requests_args[k] = v
while self.connected:
try:
if method == 'get':
requests_args['params'] = params
else:
requests_args['data'] = params
response = func(url, **requests_args)
except requests.exceptions.Timeout:
self.on_timeout()
else:
if response.status_code != 200:
self.on_error(response.status_code, response.content)
if self.retry_count and (self.retry_count - retry_counter) > 0:
time.sleep(self.retry_in)
retry_counter += 1
_send(retry_counter)
return response
while self.connected:
response = _send(retry_counter)
for line in response.iter_lines():
if not self.connected:
break
if line:
try:
if is_py3:
line = line.decode('utf-8')
data = json.loads(line)
if self.on_success(data):
for message_type in self.handlers:
if message_type in data:
handler = getattr(self, 'on_' + message_type, None)
if handler and callable(handler) and not handler(data.get(message_type)):
break
except ValueError:
self.on_error(response.status_code, 'Unable to decode response, not valid JSON.')
response.close()
def on_success(self, data): # pragma: no cover
"""Called when data has been successfully received from the stream.
Returns True if other handlers for this message should be invoked.
Feel free to override this to handle your streaming data how you
want it handled.
See https://dev.twitter.com/docs/streaming-apis/messages for messages
sent along in stream responses.
:param data: data recieved from the stream
:type data: dict
"""
return True
def on_error(self, status_code, data): # pragma: no cover
"""Called when stream returns non-200 status code
Feel free to override this to handle your streaming data how you
want it handled.
:param status_code: Non-200 status code sent from stream
:type status_code: int
:param data: Error message sent from stream
:type data: dict
"""
return
def on_timeout(self): # pragma: no cover
""" Called when the request has timed out """
return
def disconnect(self):
"""Used to disconnect the streaming client manually"""
self.connected = False