diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..624f4a6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +.coverage +MANIFEST +coverage.xml +nosetests.xml +junit-report.xml +pylint.txt +toy.py +violations.pyflakes.txt +cover/ +docs/_build +grequests.egg-info/ +*.py[cx] +*.swp +env/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c4f010d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,27 @@ +dist: xenial +language: python + + +python: + - "2.7" + - "3.6" + - "3.7" + + +install: + - pip install -r requirements.txt + - pip install pytest + +script: + - pytest tests.py + + +deploy: + provider: pypi + distributions: "sdist bdist_wheel" + user: "spyoungtech" + password: + secure: "QtuuH0X/A/iQI23MxvqsnxUy63XD5awJHDkeQNmUDIGGQqIox2DTYKoc6x354I5wpqprtODQRYRqIsA9+2cpRcF49Ft50cvi3cmuoeozkID3ybQyLHCIcJ4CKt6X+h2LFbrgqyyBcny7tKQlYr4/nsjeQegPblnJ6OTljJgJyE0=" + on: + tags: true + python: 3.6 \ No newline at end of file diff --git a/AUTHORS.rst b/AUTHORS.rst new file mode 100644 index 0000000..b93507b --- /dev/null +++ b/AUTHORS.rst @@ -0,0 +1,12 @@ +GRequests is written and maintained by Kenneth Reitz and +various contributors: + +Development Lead +```````````````` + +- Kenneth Reitz + +Patches and Suggestions +``````````````````````` +- Kracekumar +- Spencer Young diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..92b0996 --- /dev/null +++ b/LICENSE @@ -0,0 +1,8 @@ +Copyright (c) 2012, Kenneth Reitz +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. +Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..3ea3e86 --- /dev/null +++ b/README.rst @@ -0,0 +1,67 @@ +GRequests: Asynchronous Requests +=============================== + +GRequests allows you to use Requests with Gevent to make asynchronous HTTP +Requests easily. + +**Note**: You should probably use `requests-threads `_ or `requests-futures `_ instead. + + +Usage +----- + +Usage is simple: + +.. code-block:: python + + import grequests + + urls = [ + 'http://www.heroku.com', + 'http://python-tablib.org', + 'http://httpbin.org', + 'http://python-requests.org', + 'http://fakedomain/', + 'http://kennethreitz.com' + ] + +Create a set of unsent Requests: + +.. code-block:: python + + >>> rs = (grequests.get(u) for u in urls) + +Send them all at the same time: + +.. code-block:: python + + >>> grequests.map(rs) + [, , , , None, ] + +Optionally, in the event of a timeout or any other exception during the connection of +the request, you can add an exception handler that will be called with the request and +exception inside the main thread: + +.. code-block:: python + + >>> def exception_handler(request, exception): + ... print "Request failed" + + >>> reqs = [ + ... grequests.get('http://httpbin.org/delay/1', timeout=0.001), + ... grequests.get('http://fakedomain/'), + ... grequests.get('http://httpbin.org/status/500')] + >>> grequests.map(reqs, exception_handler=exception_handler) + Request failed + Request failed + [None, None, ] + +For some speed/performance gains, you may also want to use `imap` instead of `map`. `imap` returns a generator of responses. Order of these responses does not map to the order of the requests you send out. The API for `imap` is equivalent to the API for `map`. + +Installation +------------ + +Installation is easy with pip:: + + $ pip install grequests + ✨🍰✨ diff --git a/grequests.py b/grequests.py new file mode 100755 index 0000000..582cffb --- /dev/null +++ b/grequests.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- + +""" +grequests +~~~~~~~~~ + +This module contains an asynchronous replica of ``requests.api``, powered +by gevent. All API methods return a ``Request`` instance (as opposed to +``Response``). A list of requests can be sent with ``map()``. +""" +from functools import partial +import traceback +try: + import gevent + from gevent import monkey as curious_george + from gevent.pool import Pool +except ImportError: + raise RuntimeError('Gevent is required for grequests.') + +# Monkey-patch. +curious_george.patch_all(thread=False, select=False) + +from requests import Session + + +__all__ = ( + 'map', 'imap', + 'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request' +) + + +class AsyncRequest(object): + """ Asynchronous request. + + Accept same parameters as ``Session.request`` and some additional: + + :param session: Session which will do request + :param callback: Callback called on response. + Same as passing ``hooks={'response': callback}`` + """ + def __init__(self, method, url, **kwargs): + #: Request method + self.method = method + #: URL to request + self.url = url + #: Associated ``Session`` + self.session = kwargs.pop('session', None) + if self.session is None: + self.session = Session() + + callback = kwargs.pop('callback', None) + if callback: + kwargs['hooks'] = {'response': callback} + + #: The rest arguments for ``Session.request`` + self.kwargs = kwargs + #: Resulting ``Response`` + self.response = None + + def send(self, **kwargs): + """ + Prepares request based on parameter passed to constructor and optional ``kwargs```. + Then sends request and saves response to :attr:`response` + + :returns: ``Response`` + """ + merged_kwargs = {} + merged_kwargs.update(self.kwargs) + merged_kwargs.update(kwargs) + try: + self.response = self.session.request(self.method, + self.url, **merged_kwargs) + except Exception as e: + self.exception = e + self.traceback = traceback.format_exc() + return self + + +def send(r, pool=None, stream=False): + """Sends the request object using the specified pool. If a pool isn't + specified this method blocks. Pools are useful because you can specify size + and can hence limit concurrency.""" + if pool is not None: + return pool.spawn(r.send, stream=stream) + + return gevent.spawn(r.send, stream=stream) + + +# Shortcuts for creating AsyncRequest with appropriate HTTP method +get = partial(AsyncRequest, 'GET') +options = partial(AsyncRequest, 'OPTIONS') +head = partial(AsyncRequest, 'HEAD') +post = partial(AsyncRequest, 'POST') +put = partial(AsyncRequest, 'PUT') +patch = partial(AsyncRequest, 'PATCH') +delete = partial(AsyncRequest, 'DELETE') + +# synonym +def request(method, url, **kwargs): + return AsyncRequest(method, url, **kwargs) + + +def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None): + """Concurrently converts a list of Requests to Responses. + + :param requests: a collection of Request objects. + :param stream: If True, the content will not be downloaded immediately. + :param size: Specifies the number of requests to make at a time. If None, no throttling occurs. + :param exception_handler: Callback function, called when exception occured. Params: Request, Exception + :param gtimeout: Gevent joinall timeout in seconds. (Note: unrelated to requests timeout) + """ + + requests = list(requests) + + pool = Pool(size) if size else None + jobs = [send(r, pool, stream=stream) for r in requests] + gevent.joinall(jobs, timeout=gtimeout) + + ret = [] + + for request in requests: + if request.response is not None: + ret.append(request.response) + elif exception_handler and hasattr(request, 'exception'): + ret.append(exception_handler(request, request.exception)) + else: + ret.append(None) + + return ret + + +def imap(requests, stream=False, size=2, exception_handler=None): + """Concurrently converts a generator object of Requests to + a generator of Responses. + + :param requests: a generator of Request objects. + :param stream: If True, the content will not be downloaded immediately. + :param size: Specifies the number of requests to make at a time. default is 2 + :param exception_handler: Callback function, called when exception occurred. Params: Request, Exception + """ + + pool = Pool(size) + + def send(r): + return r.send(stream=stream) + + for request in pool.imap_unordered(send, requests): + if request.response is not None: + yield request.response + elif exception_handler: + ex_result = exception_handler(request, request.exception) + if ex_result is not None: + yield ex_result + + pool.join() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ebe8eeb --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests +gevent +nose diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..defc7cf --- /dev/null +++ b/setup.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +""" +GRequests allows you to use Requests with Gevent to make asynchronous HTTP +Requests easily. + +Usage +----- + +Usage is simple:: + + import grequests + + urls = [ + 'http://www.heroku.com', + 'http://tablib.org', + 'http://httpbin.org', + 'http://python-requests.org', + 'http://kennethreitz.com' + ] + +Create a set of unsent Requests:: + + >>> rs = (grequests.get(u) for u in urls) + +Send them all at the same time:: + + >>> grequests.map(rs) + [, , , , ] + +""" + +from setuptools import setup + +setup( + name='grequests', + version='0.4.0', + url='https://github.com/kennethreitz/grequests', + license='BSD', + author='Kenneth Reitz', + author_email='me@kennethreitz.com', + description='Requests + Gevent', + long_description=__doc__, + install_requires=[ + 'gevent', + 'requests' + ], + tests_require = ['nose'], + test_suite = 'nose.collector', + py_modules=['grequests'], + zip_safe=False, + include_package_data=True, + platforms='any', + classifiers=[ + 'Environment :: Web Environment', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Topic :: Internet :: WWW/HTTP :: Dynamic Content', + 'Topic :: Software Development :: Libraries :: Python Modules' + ] +) diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..2fda406 --- /dev/null +++ b/tests.py @@ -0,0 +1,224 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +from grequests import get, map, imap +from nose.tools import ok_ + +########### Constants ############ +urls = [ + 'http://github.com', + 'http://www.google.com', + 'http://www.psf.org' + ] +############# tests ############## +def test_get(): + global urls + to_fetch = (get(url) for url in urls) + map(to_fetch) + for fetched in to_fetch: + ok_(fetched.ok, True) + +def test_imap_with_size(): + global urls + to_fetch = (get(url) for url in urls) + imap(to_fetch, size = len(urls) - 1) + for fetching in to_fetch: + ok_(fetching.send(), True) + +import os +import time +import unittest + +import requests +from requests.exceptions import Timeout +import grequests + +HTTPBIN_URL = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/') + +def httpbin(*suffix): + """Returns url for HTTPBIN resource.""" + return HTTPBIN_URL + '/'.join(suffix) + + +N = 5 +URLS = [httpbin('get?p=%s' % i) for i in range(N)] + + +class GrequestsCase(unittest.TestCase): + + def test_map(self): + reqs = [grequests.get(url) for url in URLS] + resp = grequests.map(reqs, size=N) + self.assertEqual([r.url for r in resp], URLS) + + def test_imap(self): + reqs = (grequests.get(url) for url in URLS) + i = 0 + for i, r in enumerate(grequests.imap(reqs, size=N)): + self.assertTrue(r.url in URLS) + self.assertEqual(i, N - 1) + + def test_hooks(self): + result = {} + + def hook(r, **kwargs): + result[r.url] = True + return r + + reqs = [grequests.get(url, hooks={'response': [hook]}) for url in URLS] + grequests.map(reqs, size=N) + self.assertEqual(sorted(result.keys()), sorted(URLS)) + + def test_callback_kwarg(self): + result = {'ok': False} + + def callback(r, **kwargs): + result['ok'] = True + return r + + self.get(URLS[0], callback=callback) + self.assertTrue(result['ok']) + + def test_session_and_cookies(self): + c1 = {'k1': 'v1'} + r = self.get(httpbin('cookies/set'), params=c1).json() + self.assertEqual(r['cookies'], c1) + s = requests.Session() + r = self.get(httpbin('cookies/set'), session=s, params=c1).json() + self.assertEqual(dict(s.cookies), c1) + + # ensure all cookies saved + c2 = {'k2': 'v2'} + c1.update(c2) + r = self.get(httpbin('cookies/set'), session=s, params=c2).json() + self.assertEqual(dict(s.cookies), c1) + + # ensure new session is created + r = self.get(httpbin('cookies')).json() + self.assertEqual(r['cookies'], {}) + + # cookies as param + c3 = {'p1': '42'} + r = self.get(httpbin('cookies'), cookies=c3).json() + self.assertEqual(r['cookies'], c3) + + def test_calling_request(self): + reqs = [grequests.request('POST', httpbin('post'), data={'p': i}) + for i in range(N)] + resp = grequests.map(reqs, size=N) + self.assertEqual([int(r.json()['form']['p']) for r in resp], list(range(N))) + + def test_stream_enabled(self): + r = grequests.map([grequests.get(httpbin('stream/10'))], + size=2, stream=True)[0] + self.assertFalse(r._content_consumed) + + def test_concurrency_with_delayed_url(self): + t = time.time() + n = 10 + reqs = [grequests.get(httpbin('delay/1')) for _ in range(n)] + grequests.map(reqs, size=n) + self.assertLess((time.time() - t), n) + + def test_map_timeout_no_exception_handler(self): + """ + compliance with existing 0.2.0 behaviour + """ + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001), grequests.get(httpbin('/'))] + responses = grequests.map(reqs) + self.assertIsNone(responses[0]) + self.assertTrue(responses[1].ok) + self.assertEqual(len(responses), 2) + + def test_map_timeout_exception_handler_no_return(self): + """ + ensure default behaviour for a handler that returns None + """ + def exception_handler(request, exception): + pass + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001), grequests.get(httpbin('/'))] + responses = grequests.map(reqs, exception_handler=exception_handler) + self.assertIsNone(responses[0]) + self.assertTrue(responses[1].ok) + self.assertEqual(len(responses), 2) + + def test_map_timeout_exception_handler_returns_exception(self): + """ + ensure returned value from exception handler is stuffed in the map result + """ + def exception_handler(request, exception): + return exception + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001), grequests.get(httpbin('/'))] + responses = grequests.map(reqs, exception_handler=exception_handler) + self.assertIsInstance(responses[0], Timeout) + self.assertTrue(responses[1].ok) + self.assertEqual(len(responses), 2) + + def test_imap_timeout_no_exception_handler(self): + """ + compliance with existing 0.2.0 behaviour + """ + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001)] + out = [] + try: + for r in grequests.imap(reqs): + out.append(r) + except Timeout: + pass + self.assertEquals(out, []) + + def test_imap_timeout_exception_handler_no_return(self): + """ + ensure imap-default behaviour for a handler that returns None + """ + def exception_handler(request, exception): + pass + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001)] + out = [] + for r in grequests.imap(reqs, exception_handler=exception_handler): + out.append(r) + self.assertEquals(out, []) + + + def test_imap_timeout_exception_handler_returns_value(self): + """ + ensure behaviour for a handler that returns a value + """ + def exception_handler(request, exception): + return 'a value' + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001)] + out = [] + for r in grequests.imap(reqs, exception_handler=exception_handler): + out.append(r) + self.assertEquals(out, ['a value']) + + def test_map_timeout_exception(self): + class ExceptionHandler: + def __init__(self): + self.counter = 0 + + def callback(self, request, exception): + self.counter += 1 + eh = ExceptionHandler() + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001)] + list(grequests.map(reqs, exception_handler=eh.callback)) + self.assertEqual(eh.counter, 1) + + def test_imap_timeout_exception(self): + class ExceptionHandler: + def __init__(self): + self.counter = 0 + + def callback(self, request, exception): + self.counter += 1 + eh = ExceptionHandler() + reqs = [grequests.get(httpbin('delay/1'), timeout=0.001)] + list(grequests.imap(reqs, exception_handler=eh.callback)) + self.assertEqual(eh.counter, 1) + + def get(self, url, **kwargs): + return grequests.map([grequests.get(url, **kwargs)])[0] + + +if __name__ == '__main__': + unittest.main()