This repository has been archived by the owner on Dec 16, 2017. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 184
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #54 from technoskald/exceptions
No, use local grequests, sigh.
- Loading branch information
Showing
1 changed file
with
149 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
grequests | ||
~~~~~~~~~ | ||
This module contains an asynchronous replica of ``requests.api``, powered | ||
by gevent. All API methods return a ``Request`` instance (as opposed to | ||
``Response``). A list of requests can be sent with ``map()``. | ||
""" | ||
from functools import partial | ||
|
||
try: | ||
import gevent | ||
from gevent import monkey as curious_george | ||
from gevent.pool import Pool | ||
except ImportError: | ||
raise RuntimeError('Gevent is required for grequests.') | ||
|
||
# Monkey-patch. | ||
curious_george.patch_all(thread=False, select=False) | ||
|
||
from requests import Session | ||
|
||
|
||
__all__ = ( | ||
'map', 'imap', | ||
'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request' | ||
) | ||
|
||
|
||
class AsyncRequest(object): | ||
""" Asynchronous request. | ||
Accept same parameters as ``Session.request`` and some additional: | ||
:param session: Session which will do request | ||
:param callback: Callback called on response. | ||
Same as passing ``hooks={'response': callback}`` | ||
""" | ||
def __init__(self, method, url, **kwargs): | ||
#: Request method | ||
self.method = method | ||
#: URL to request | ||
self.url = url | ||
#: Associated ``Session`` | ||
self.session = kwargs.pop('session', None) | ||
if self.session is None: | ||
self.session = Session() | ||
|
||
callback = kwargs.pop('callback', None) | ||
if callback: | ||
kwargs['hooks'] = {'response': callback} | ||
|
||
#: The rest arguments for ``Session.request`` | ||
self.kwargs = kwargs | ||
#: Resulting ``Response`` | ||
self.response = None | ||
|
||
def send(self, **kwargs): | ||
""" | ||
Prepares request based on parameter passed to constructor and optional ``kwargs```. | ||
Then sends request and saves response to :attr:`response` | ||
:returns: ``Response`` | ||
""" | ||
merged_kwargs = {} | ||
merged_kwargs.update(self.kwargs) | ||
merged_kwargs.update(kwargs) | ||
try: | ||
self.response = self.session.request(self.method, | ||
self.url, **merged_kwargs) | ||
except Exception as e: | ||
self.exception = e | ||
return self | ||
|
||
|
||
def send(r, pool=None, stream=False): | ||
"""Sends the request object using the specified pool. If a pool isn't | ||
specified this method blocks. Pools are useful because you can specify size | ||
and can hence limit concurrency.""" | ||
if pool != None: | ||
return pool.spawn(r.send, stream=stream) | ||
|
||
return gevent.spawn(r.send, stream=stream) | ||
|
||
|
||
# Shortcuts for creating AsyncRequest with appropriate HTTP method | ||
get = partial(AsyncRequest, 'GET') | ||
options = partial(AsyncRequest, 'OPTIONS') | ||
head = partial(AsyncRequest, 'HEAD') | ||
post = partial(AsyncRequest, 'POST') | ||
put = partial(AsyncRequest, 'PUT') | ||
patch = partial(AsyncRequest, 'PATCH') | ||
delete = partial(AsyncRequest, 'DELETE') | ||
|
||
# synonym | ||
def request(method, url, **kwargs): | ||
return AsyncRequest(method, url, **kwargs) | ||
|
||
|
||
def map(requests, stream=False, size=None, exception_handler=None): | ||
"""Concurrently converts a list of Requests to Responses. | ||
:param requests: a collection of Request objects. | ||
:param stream: If True, the content will not be downloaded immediately. | ||
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs. | ||
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception | ||
""" | ||
|
||
requests = list(requests) | ||
|
||
pool = Pool(size) if size else None | ||
jobs = [send(r, pool, stream=stream) for r in requests] | ||
gevent.joinall(jobs) | ||
|
||
ret = [] | ||
|
||
for request in requests: | ||
if request.response: | ||
ret.append(request.response) | ||
elif exception_handler: | ||
exception_handler(request, request.exception) | ||
|
||
return ret | ||
|
||
|
||
def imap(requests, stream=False, size=2, exception_handler=None): | ||
"""Concurrently converts a generator object of Requests to | ||
a generator of Responses. | ||
:param requests: a generator of Request objects. | ||
:param stream: If True, the content will not be downloaded immediately. | ||
:param size: Specifies the number of requests to make at a time. default is 2 | ||
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception | ||
""" | ||
|
||
pool = Pool(size) | ||
|
||
def send(r): | ||
return r.send(stream=stream) | ||
|
||
for request in pool.imap_unordered(send, requests): | ||
if request.response: | ||
yield request.response | ||
elif exception_handler: | ||
exception_handler(request, request.exception) | ||
|
||
pool.join() |