From 4b6177ec9632cbe43994300125510d6f844b6fe3 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 22 Apr 2021 10:11:30 +0200 Subject: [PATCH 1/2] tests: add api tests with heavy concurrency (#53367) --- tests/test_api_concurrency.py | 174 ++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 tests/test_api_concurrency.py diff --git a/tests/test_api_concurrency.py b/tests/test_api_concurrency.py new file mode 100644 index 0000000..5a1eaa3 --- /dev/null +++ b/tests/test_api_concurrency.py @@ -0,0 +1,174 @@ +import datetime +import multiprocessing +import random +import statistics +import sys +import time +from urllib.parse import urljoin + +import pytest +import requests +from django.contrib.auth.models import User + +from chrono.agendas.models import Agenda, Desk, Event, MeetingType, TimePeriod + + +class TestMeetingsApi: + concurrency = 30 + + @pytest.fixture + def app(self, live_server, transactional_db): + """Use the threaded WSGI server provided by live_server then create + `self.concurrency` requests clients to produce contention during + concurrent fillslot on a single agenda. transactional_db is mandatory + for all threads and processes to see the same database state.""" + + user = User(username='john.doe', first_name=u'John', last_name=u'Doe', email='john.doe@example.net') + user.set_password('password') + user.save() + + agenda = Agenda.objects.create( + label='foo', kind='meetings', minimal_booking_delay=2, maximal_booking_delay=9 + ) + MeetingType.objects.create(agenda=agenda, label='mt30', duration=30) + MeetingType.objects.create(agenda=agenda, label='mt15', duration=5) + desk = Desk.objects.create(agenda=agenda, label='desk') + + for weekday in range(7): + TimePeriod.objects.create( + weekday=weekday, + start_time=datetime.time(8, 0), + end_time=datetime.time(12, 0), + desk=desk, + ) + TimePeriod.objects.create( + weekday=weekday, + start_time=datetime.time(14, 0), + end_time=datetime.time(18, 0), + desk=desk, + ) + + auth = ('john.doe', 'password') + + class App: + # "simulate" webtest app with requests + def get(self, url): + resp = requests.get(urljoin(str(live_server), url), auth=auth) + resp.raise_for_status() + return resp + + def post(self, url): + resp = requests.post(urljoin(str(live_server), url), auth=auth) + resp.raise_for_status() + return resp + + return App() + + def test_lots_of_clients(self, app, transactional_db): + # Create a meeting of 30 minutes from 09:45 to 10:15 and look for available + # 5 minutes meeting between 10:00 and 10:30, there should be only 3 if the + # exclusion from 30 minutes event is enforced. + + class Counters: + datetimes_timings_queue = multiprocessing.Queue() + fillslot_timings_queue = multiprocessing.Queue() + fillslot_failed = multiprocessing.Array('i', [0] * self.concurrency, lock=False) + fillslot_success = multiprocessing.Array('i', [0] * self.concurrency, lock=False) + + def worker(i): + mt_choices = ['mt15', 'mt30'] + + datetimes_timings = [] + fillslot_timings = [] + while True: + if not mt_choices: + break + mt = random.choice(mt_choices) + start = time.time() + resp = app.get('/api/agenda/foo/meetings/%s/datetimes/?hide_disabled=true' % mt) + datetimes_timings.append(time.time() - start) + available = len(resp.json()['data']) + if available == 0: + mt_choices.remove(mt) + continue + choice = random.randint(0, available - 1) + slot = resp.json()['data'][choice] + start = time.time() + resp = app.post(slot['api']['fillslot_url']) + fillslot_timings.append(time.time() - start) + if resp.json()['err']: + Counters.fillslot_failed[i] += 1 + else: + Counters.fillslot_success[i] += 1 + print( + 'Thread', + i, + 'choice', + slot['id'], + 'fillslot_failed', + Counters.fillslot_failed[i], + 'fillslot_success', + Counters.fillslot_success[i], + 'available', + available, + 'booked', + sum(Counters.fillslot_success), + ) + # stop when we have done at least 100 bookings + if sum(Counters.fillslot_success) > 100: + break + # yeld to scheduler + time.sleep(0.0001) + Counters.datetimes_timings_queue.put(datetimes_timings) + Counters.fillslot_timings_queue.put(fillslot_timings) + + threads = [multiprocessing.Process(target=worker, args=(i,)) for i in range(self.concurrency)] + start = time.time() + for thread in threads: + thread.start() + datetimes_timings = [] + for i in range(self.concurrency): + datetimes_timings.extend(Counters.datetimes_timings_queue.get()) + fillslot_timings = [] + for i in range(self.concurrency): + fillslot_timings.extend(Counters.fillslot_timings_queue.get()) + for thread in threads: + thread.join() + duration = time.time() - start + print('Fillslot failed', sum(Counters.fillslot_failed)) + print('Fillslot success', sum(Counters.fillslot_success)) + print( + 'Percent of failure', + '%2.0d %%' + % ( + sum(Counters.fillslot_failed) + / float(sum(Counters.fillslot_failed) + sum(Counters.fillslot_success)) + * 100 + ), + ) + print( + 'Fillslot per second', (sum(Counters.fillslot_failed) + sum(Counters.fillslot_success)) / duration + ) + args = ( + 'Datetimes', + 'avg duration', + statistics.mean(datetimes_timings), + ) + if sys.version_info >= (3, 8): + args += ( + 'quantiles', + statistics.quantiles(datetimes_timings, n=10), + ) + print(*args) + args = ( + 'Fillslot', + 'avg duration', + statistics.mean(fillslot_timings), + ) + if sys.version_info >= (3, 8): + args += ( + 'quantiles', + statistics.quantiles(fillslot_timings, n=10), + ) + print(*args) + assert Event.objects.count() == sum(Counters.fillslot_success) -- 2.31.0