poolrequests.py 5.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import requests
  2. from itertools import cycle
  3. from threading import RLock, local
  4. from searx import settings
  5. from time import time
  6. class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
  7. def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
  8. pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
  9. max_retries=requests.adapters.DEFAULT_RETRIES,
  10. pool_block=requests.adapters.DEFAULT_POOLBLOCK,
  11. **conn_params):
  12. if max_retries == requests.adapters.DEFAULT_RETRIES:
  13. self.max_retries = requests.adapters.Retry(0, read=False)
  14. else:
  15. self.max_retries = requests.adapters.Retry.from_int(max_retries)
  16. self.config = {}
  17. self.proxy_manager = {}
  18. super(requests.adapters.HTTPAdapter, self).__init__()
  19. self._pool_connections = pool_connections
  20. self._pool_maxsize = pool_maxsize
  21. self._pool_block = pool_block
  22. self._conn_params = conn_params
  23. self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
  24. def __setstate__(self, state):
  25. # Can't handle by adding 'proxy_manager' to self.__attrs__ because
  26. # because self.poolmanager uses a lambda function, which isn't pickleable.
  27. self.proxy_manager = {}
  28. self.config = {}
  29. for attr, value in state.items():
  30. setattr(self, attr, value)
  31. self.init_poolmanager(self._pool_connections, self._pool_maxsize,
  32. block=self._pool_block, **self._conn_params)
  33. threadLocal = local()
  34. connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
  35. maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
  36. if settings['outgoing'].get('source_ips'):
  37. http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  38. source_address=(source_ip, 0))
  39. for source_ip in settings['outgoing']['source_ips'])
  40. https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  41. source_address=(source_ip, 0))
  42. for source_ip in settings['outgoing']['source_ips'])
  43. else:
  44. http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  45. https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  46. class SessionSinglePool(requests.Session):
  47. def __init__(self):
  48. super(SessionSinglePool, self).__init__()
  49. # reuse the same adapters
  50. with RLock():
  51. self.adapters.clear()
  52. self.mount('https://', next(https_adapters))
  53. self.mount('http://', next(http_adapters))
  54. def close(self):
  55. """Call super, but clear adapters since there are managed globaly"""
  56. self.adapters.clear()
  57. super(SessionSinglePool, self).close()
  58. def set_timeout_for_thread(timeout, start_time=None):
  59. threadLocal.timeout = timeout
  60. threadLocal.start_time = start_time
  61. def reset_time_for_thread():
  62. threadLocal.total_time = 0
  63. def get_time_for_thread():
  64. return threadLocal.total_time
  65. def request(method, url, **kwargs):
  66. """same as requests/requests/api.py request(...)"""
  67. time_before_request = time()
  68. # session start
  69. session = SessionSinglePool()
  70. # proxies
  71. kwargs['proxies'] = settings['outgoing'].get('proxies') or None
  72. # timeout
  73. if 'timeout' in kwargs:
  74. timeout = kwargs['timeout']
  75. else:
  76. timeout = getattr(threadLocal, 'timeout', None)
  77. if timeout is not None:
  78. kwargs['timeout'] = timeout
  79. # do request
  80. response = session.request(method=method, url=url, **kwargs)
  81. time_after_request = time()
  82. # is there a timeout for this engine ?
  83. if timeout is not None:
  84. timeout_overhead = 0.2 # seconds
  85. # start_time = when the user request started
  86. start_time = getattr(threadLocal, 'start_time', time_before_request)
  87. search_duration = time_after_request - start_time
  88. if search_duration > timeout + timeout_overhead:
  89. raise requests.exceptions.Timeout(response=response)
  90. # session end
  91. session.close()
  92. if hasattr(threadLocal, 'total_time'):
  93. threadLocal.total_time += time_after_request - time_before_request
  94. return response
  95. def get(url, **kwargs):
  96. kwargs.setdefault('allow_redirects', True)
  97. return request('get', url, **kwargs)
  98. def options(url, **kwargs):
  99. kwargs.setdefault('allow_redirects', True)
  100. return request('options', url, **kwargs)
  101. def head(url, **kwargs):
  102. kwargs.setdefault('allow_redirects', False)
  103. return request('head', url, **kwargs)
  104. def post(url, data=None, **kwargs):
  105. return request('post', url, data=data, **kwargs)
  106. def put(url, data=None, **kwargs):
  107. return request('put', url, data=data, **kwargs)
  108. def patch(url, data=None, **kwargs):
  109. return request('patch', url, data=data, **kwargs)
  110. def delete(url, **kwargs):
  111. return request('delete', url, **kwargs)