Skip to content

Commit

Permalink
Merge pull request locustio#87 from EnTeQuAk/locust_07
Browse files Browse the repository at this point in the history
 Merged gevent/zmq updates, ported to requests >= 1.2
  • Loading branch information
heyman committed Aug 15, 2013
2 parents 08796d9 + 83b246f commit 4aea348
Show file tree
Hide file tree
Showing 20 changed files with 147 additions and 129 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ dist/**
*.ipr
.vagrant
build/
.coverage
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ python:
# command to install dependencies
install:
- sudo apt-get install -y libevent-dev
- pip install -r requirements.txt --use-mirrors
- pip install -r locust/test/requirements.txt
# command to run tests
script: python setup.py test
2 changes: 1 addition & 1 deletion locust/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from core import Locust, TaskSet, WebLocust, SubLocust, task
from exception import InterruptTaskSet, ResponseError, RescheduleTaskImmediately

version = "0.6.2"
version = "0.7.0"
74 changes: 36 additions & 38 deletions locust/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from urlparse import urlparse, urlunparse

import requests
from requests import Response
from requests import Response, Request
from requests.packages.urllib3.response import HTTPResponse
from requests.auth import HTTPBasicAuth
from requests.exceptions import (RequestException, ConnectionError, HTTPError,
Expand All @@ -17,6 +17,19 @@
absolute_http_url_regexp = re.compile(r"^https?://", re.I)


def timedelta_to_ms(td):
"python 2.7 has a total_seconds method for timedelta objects. This is here for py<2.7 compat."
return int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**3)


class LocustResponse(Response):

def raise_for_status(self):
if hasattr(self, 'error') and self.error:
raise self.error
Response.raise_for_status(self)


class HttpSession(requests.Session):
"""
Class for performing web requests and holding (session-) cookies between requests (in order
Expand All @@ -41,6 +54,8 @@ class HttpSession(requests.Session):
and then mark it as successful even if the response code was not (i.e 500 or 404).
"""
def __init__(self, base_url, *args, **kwargs):
requests.Session.__init__(self, *args, **kwargs)

self.base_url = base_url

# Check for basic authentication
Expand All @@ -53,19 +68,7 @@ def __init__(self, base_url, *args, **kwargs):
# remove username and password from the base_url
self.base_url = urlunparse((parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
# configure requests to use basic auth
kwargs["auth"] = HTTPBasicAuth(parsed_url.username, parsed_url.password)

# requests config
config = {
"max_retries": 0,
"keep_alive": False,
"safe_mode": True,
}
if "config" in kwargs:
config.update(kwargs["config"])
kwargs["config"] = config

super(HttpSession, self).__init__(*args, **kwargs)
self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)

def _build_url(self, path):
""" prepend url with hostname unless it's already an absolute URL """
Expand Down Expand Up @@ -98,7 +101,7 @@ def request(self, method, url, name=None, catch_response=False, **kwargs):
:param proxies: (optional) Dictionary mapping protocol to the URL of the proxy.
:param return_response: (optional) If False, an un-sent Request object will returned.
:param config: (optional) A configuration dictionary. See ``request.defaults`` for allowed keys and their default values.
:param prefetch: (optional) whether to immediately download the response content. Defaults to ``True``.
:param stream: (optional) whether to immediately download the response content. Defaults to ``False``.
:param verify: (optional) if ``True``, the SSL cert will be verified. A CA_BUNDLE path can also be provided.
:param cert: (optional) if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair.
"""
Expand All @@ -110,34 +113,30 @@ def request(self, method, url, name=None, catch_response=False, **kwargs):
request_meta = {}

# set up pre_request hook for attaching meta data to the request object
def on_pre_request(request):
request_meta["method"] = request.method
request_meta["name"] = name or request.path_url
request_meta["start_time"] = time.time()

kwargs["hooks"] = {"pre_request":on_pre_request}
request_meta["start_time"] = time.time()

# make the request using a wrapper that works around a bug in python-requests causing
# safe_mode to not work when making requests through Session instances
response = self._send_request_safe_mode(method, url, **kwargs)

request_meta["method"] = response.request.method
request_meta["name"] = name or response.request.path_url

# record the consumed time
request_meta["response_time"] = int((time.time() - request_meta["start_time"]) * 1000)
request_meta["response_time"] = timedelta_to_ms(response.elapsed)

# get the length of the content, but if the argument prefetch is set to False, we take
# get the length of the content, but if the argument stream is set to True, we take
# the size from the content-length header, in order to not trigger fetching of the body
if kwargs.get("prefetch", True):
request_meta["content_size"] = len(response.content or "")
else:
if kwargs.get("stream", False):
request_meta["content_size"] = int(response.headers.get("content-length") or 0)
else:
request_meta["content_size"] = len(response.content or "")

if catch_response:
response.locust_request_meta = request_meta
return ResponseContextManager(response)
else:
try:
response.raise_for_status()
except RequestException, e:
except RequestException as e:
events.request_failure.fire(request_meta["method"], request_meta["name"], request_meta["response_time"], e, None)
else:
events.request_success.fire(
Expand All @@ -152,23 +151,22 @@ def _send_request_safe_mode(self, method, url, **kwargs):
"""
Send an HTTP request, and catch any exception that might occur due to connection problems.
This is equivalent of python-requests' safe_mode, which due to a bug, does currently *not*
work together with Sessions. Once the issue is fixed in python-requests, this method should
be removed. See: https://github.com/kennethreitz/requests/issues/888
Safe mode has been removed from requests 1.x.
"""
try:
return super(HttpSession, self).request(method, url, **kwargs)
return requests.Session.request(self, method, url, **kwargs)
except (MissingSchema, InvalidSchema, InvalidURL):
raise
except (RequestException, ConnectionError, HTTPError,
socket.timeout, socket.gaierror) as e:
r = Response()
except RequestException as e:
r = LocustResponse()
r.error = e
r.raw = HTTPResponse() # otherwise, tests fail
r.status_code = 0 # with this status_code, content returns None
r.request = Request(method, url).prepare()
return r

class ResponseContextManager(requests.Response):

class ResponseContextManager(LocustResponse):
"""
A Response class that also acts as a context manager that provides the ability to manually
control if an HTTP request should be marked as successful or a failure in Locust's statistics
Expand Down Expand Up @@ -201,7 +199,7 @@ def __exit__(self, exc, value, traceback):
else:
try:
self.raise_for_status()
except requests.exceptions.RequestException, e:
except requests.exceptions.RequestException as e:
self.failure(e)
else:
self.success()
Expand Down
8 changes: 4 additions & 4 deletions locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def run(self):
self.task_set(self).run()
except StopLocust:
pass
except (RescheduleTask, RescheduleTaskImmediately), e:
except (RescheduleTask, RescheduleTaskImmediately) as e:
raise LocustError, LocustError("A task inside a Locust class' main TaskSet (`%s.task_set` of type `%s`) seems to have called interrupt() or raised an InterruptTaskSet exception. The interrupt() function is used to hand over execution to a parent TaskSet, and should never be called in the main TaskSet which a Locust class' task_set attribute points to." % (type(self).__name__, self.task_set.__name__)), sys.exc_info()[2]


Expand Down Expand Up @@ -253,7 +253,7 @@ def run(self, *args, **kwargs):
self.wait()
else:
self.wait()
except InterruptTaskSet, e:
except InterruptTaskSet as e:
if e.reschedule:
raise RescheduleTaskImmediately, e, sys.exc_info()[2]
else:
Expand All @@ -262,7 +262,7 @@ def run(self, *args, **kwargs):
raise
except GreenletExit:
raise
except Exception, e:
except Exception as e:
events.locust_error.fire(self, e, sys.exc_info()[2])
sys.stderr.write("\n" + traceback.format_exc())
self.wait()
Expand All @@ -273,7 +273,7 @@ def execute_next_task(self):

def execute_task(self, task, *args, **kwargs):
# check if the function is a method bound to the current locust, and if so, don't pass self as first argument
if hasattr(task, "im_self") and task.im_self == self:
if hasattr(task, "im_self") and task.__self__ == self:
# task is a bound method on self
task(*args, **kwargs)
elif hasattr(task, "tasks") and issubclass(task, TaskSet):
Expand Down
2 changes: 1 addition & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def sig_term_handler():
logger.info("Starting Locust %s" % version)
main_greenlet.join()
shutdown(0)
except KeyboardInterrupt, e:
except KeyboardInterrupt as e:
shutdown(0)

if __name__ == '__main__':
Expand Down
6 changes: 3 additions & 3 deletions locust/rpc/socketrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def _send_obj(sock, msg):
packed = struct.pack('!i', len(data)) + data
try:
sock.sendall(packed)
except Exception, e:
except Exception as e:
try:
sock.close()
except:
Expand All @@ -52,7 +52,7 @@ def handle():
try:
while True:
self.command_queue.put_nowait(_recv_obj(sock))
except Exception, e:
except Exception as e:
try:
sock.close()
except:
Expand Down Expand Up @@ -99,7 +99,7 @@ def handle_slave(sock):
try:
while True:
self.event_queue.put_nowait(_recv_obj(sock))
except Exception, e:
except Exception as e:
logger.info("Slave disconnected")
slaves.remove(sock)
if self.slave_index == len(slaves) and len(slaves) > 0:
Expand Down
2 changes: 1 addition & 1 deletion locust/rpc/zmqrpc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import zmq.green as zmq

from .protocol import Message


Expand All @@ -20,6 +19,7 @@ def recv(self):
data = self.receiver.recv()
return Message.unserialize(data)


class Client(object):
def __init__(self, host):
context = zmq.Context()
Expand Down
14 changes: 11 additions & 3 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def running(self):

self.server = rpc.Server()
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(self.noop)
self.greenlet.spawn(self.client_listener).link_exception(receiver=self.noop)

# listener that gathers info on how many locust users the slaves has spawned
def on_slave_report(client_id, data):
Expand All @@ -254,6 +254,9 @@ def on_quitting():
self.quit()
events.quitting += on_quitting

def noop(self, *args, **kw):
pass

@property
def user_count(self):
return sum([c.user_count for c in self.clients.itervalues()])
Expand Down Expand Up @@ -325,15 +328,20 @@ def slave_count(self):
return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)

class SlaveLocustRunner(DistributedLocustRunner):

def noop(self, *args, **kw):
pass

def __init__(self, *args, **kwargs):
super(SlaveLocustRunner, self).__init__(*args, **kwargs)
self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000))).hexdigest()

self.client = rpc.Client(self.master_host)
self.greenlet = Group()
self.greenlet.spawn(self.worker).link_exception(self.noop)

self.greenlet.spawn(self.worker).link_exception(receiver=self.noop)
self.client.send(Message("client_ready", None, self.client_id))
self.greenlet.spawn(self.stats_reporter).link_exception(self.noop)
self.greenlet.spawn(self.stats_reporter).link_exception(receiver=self.noop)

# register listener for when all locust users have hatched, and report it to the master node
def on_hatch_complete(count):
Expand Down
2 changes: 1 addition & 1 deletion locust/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def get_stripped_report(self):

def __str__(self):
try:
fail_percent = (self.num_failures/float(self.num_requests))*100
fail_percent = (self.num_failures/float(self.num_requests + self.num_failures))*100
except ZeroDivisionError:
fail_percent = 0

Expand Down
1 change: 0 additions & 1 deletion locust/test/requirements.txt

This file was deleted.

1 change: 1 addition & 0 deletions locust/test/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from test_taskratio import TestTaskRatio
from test_client import TestHttpSession
from test_web import TestWebUI
from test_average import MovingAverageTest

if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 4aea348

Please sign in to comment.