Added disconnect to TwythonStreamer, more tests, update example
* Stream and Twython core tests * Import TwythonStreamError from twython See more in 2.10.1 section of HISTORY.rst
This commit is contained in:
parent
815393cc33
commit
c8b1202880
9 changed files with 122 additions and 55 deletions
|
|
@ -1,7 +1,11 @@
|
|||
import unittest
|
||||
import os
|
||||
from twython import(
|
||||
Twython, TwythonStreamer, TwythonError,
|
||||
TwythonAuthError, TwythonStreamError
|
||||
)
|
||||
|
||||
from twython import Twython, TwythonError, TwythonAuthError
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
|
||||
app_key = os.environ.get('APP_KEY')
|
||||
app_secret = os.environ.get('APP_SECRET')
|
||||
|
|
@ -94,10 +98,17 @@ class TwythonAPITestCase(unittest.TestCase):
|
|||
|
||||
def test_search_gen(self):
|
||||
'''Test looping through the generator results works, at least once that is'''
|
||||
search = self.api.search_gen('python')
|
||||
for result in search:
|
||||
if result:
|
||||
break
|
||||
search = self.api.search_gen('twitter', count=1)
|
||||
counter = 0
|
||||
while counter < 2:
|
||||
counter += 1
|
||||
result = search.next()
|
||||
new_id_str = int(result['id_str'])
|
||||
if counter == 1:
|
||||
prev_id_str = new_id_str
|
||||
time.sleep(1) # Give time for another tweet to come into search
|
||||
if counter == 2:
|
||||
self.assertTrue(new_id_str > prev_id_str)
|
||||
|
||||
def test_encode(self):
|
||||
'''Test encoding UTF-8 works'''
|
||||
|
|
@ -480,5 +491,47 @@ class TwythonAPITestCase(unittest.TestCase):
|
|||
self.api.get_closest_trends(lat='37', long='-122')
|
||||
|
||||
|
||||
class TwythonStreamTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
class MyStreamer(TwythonStreamer):
|
||||
def on_success(self, data):
|
||||
self.disconnect()
|
||||
|
||||
def on_error(self, status_code, data):
|
||||
raise TwythonStreamError(data)
|
||||
|
||||
def on_delete(self, data):
|
||||
return
|
||||
|
||||
def on_limit(self, data):
|
||||
return
|
||||
|
||||
def on_disconnect(self, data):
|
||||
return
|
||||
|
||||
def on_timeout(self, data):
|
||||
return
|
||||
|
||||
self.api = MyStreamer(app_key, app_secret,
|
||||
oauth_token, oauth_token_secret)
|
||||
|
||||
def test_stream_status_filter(self):
|
||||
self.api.statuses.filter(track='twitter')
|
||||
|
||||
def test_stream_status_sample(self):
|
||||
self.api.statuses.sample()
|
||||
|
||||
def test_stream_status_firehose(self):
|
||||
self.assertRaises(TwythonStreamError, self.api.statuses.firehose,
|
||||
track='twitter')
|
||||
|
||||
def test_stream_site(self):
|
||||
self.assertRaises(TwythonStreamError, self.api.site,
|
||||
follow='twitter')
|
||||
|
||||
def test_stream_user(self):
|
||||
self.api.user(track='twitter')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue