From f1f122a535e9dc5eb60d3472c8a55bdb2a0102d8 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 22 Apr 2021 10:11:30 +0200 Subject: [PATCH 1/2] tests: add api tests with heavy concurrency (#53367) --- tests/test_api_concurrency.py | 209 ++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 tests/test_api_concurrency.py diff --git a/tests/test_api_concurrency.py b/tests/test_api_concurrency.py new file mode 100644 index 0000000..8ae8536 --- /dev/null +++ b/tests/test_api_concurrency.py @@ -0,0 +1,209 @@ +import datetime +import multiprocessing +import random +import re +import statistics +import sys +import time +from urllib.parse import urljoin + +import mock +import pytest +import requests +from django.contrib.auth.models import User +from django.utils.functional import cached_property + +from chrono.agendas.models import Agenda, Desk, Event, MeetingType, TimePeriod + + +class TestMeetingsApi: + concurrency = 4 + + @pytest.fixture + def app(self, settings, live_server, transactional_db): + """Use the threaded WSGI server provided by live_server then create + `self.concurrency` requests clients to produce contention during + concurrent fillslot on a single agenda. transactional_db is mandatory + for all threads and processes to see the same database state.""" + + settings.REST_FRAMEWORK['DEFAULT_AUTHENTICATION_CLASSES'] = [ + 'rest_framework.authentication.SessionAuthentication' + ] + user = User(username='john.doe', first_name=u'John', last_name=u'Doe', email='john.doe@example.net') + user.set_password('password') + user.save() + + agenda = Agenda.objects.create( + label='foo', kind='meetings', minimal_booking_delay=2, maximal_booking_delay=9 + ) + MeetingType.objects.create(agenda=agenda, label='mt30', duration=30) + MeetingType.objects.create(agenda=agenda, label='mt15', duration=5) + desk = Desk.objects.create(agenda=agenda, label='desk') + + for weekday in range(7): + TimePeriod.objects.create( + weekday=weekday, + start_time=datetime.time(8, 0), + end_time=datetime.time(12, 0), + desk=desk, + ) + TimePeriod.objects.create( + weekday=weekday, + start_time=datetime.time(14, 0), + end_time=datetime.time(18, 0), + desk=desk, + ) + + class App: + # "simulate" webtest app with requests + logged = False + + @cached_property + def session(self): + return requests.Session() + + def login(self): + if self.logged: + return + response = self.get('/login/') + csrfmiddlewaretoken = re.search(r'csrfmiddlewaretoken.*value="([^"]*)', response.text).group( + 1 + ) + response = self.post( + '/login/', + { + 'csrfmiddlewaretoken': csrfmiddlewaretoken, + 'username': 'john.doe', + 'password': 'password', + }, + allow_redirects=False, + ) + assert response.status_code == 302 + assert self.session.cookies['sessionid'] + self.logged = True + + def get(self, url): + resp = self.session.get(urljoin(str(live_server), url)) + resp.raise_for_status() + return resp + + def post(self, url, *args, **kwargs): + resp = self.session.post(urljoin(str(live_server), url), *args, **kwargs) + resp.raise_for_status() + return resp + + with mock.patch('rest_framework.authentication.SessionAuthentication.enforce_csrf'): + # prevent CSRF check on SessionAuthentication + yield App() + + def test_lots_of_clients(self, app, transactional_db): + # Create a meeting of 30 minutes from 09:45 to 10:15 and look for available + # 5 minutes meeting between 10:00 and 10:30, there should be only 3 if the + # exclusion from 30 minutes event is enforced. + + app.login() + app.session.close() + + class Counters: + datetimes_timings_queue = multiprocessing.Queue() + fillslot_timings_queue = multiprocessing.Queue() + fillslot_failed = multiprocessing.Array('i', [0] * self.concurrency, lock=False) + fillslot_success = multiprocessing.Array('i', [0] * self.concurrency, lock=False) + + def worker(i): + mt_choices = ['mt15', 'mt30'] + + datetimes_timings = [] + fillslot_timings = [] + while True: + if not mt_choices: + break + mt = random.choice(mt_choices) + start = time.time() + resp = app.get('/api/agenda/foo/meetings/%s/datetimes/?hide_disabled=true' % mt) + datetimes_timings.append(time.time() - start) + available = len(resp.json()['data']) + if available == 0: + mt_choices.remove(mt) + continue + choice = random.randint(0, available - 1) + slot = resp.json()['data'][choice] + start = time.time() + resp = app.post(slot['api']['fillslot_url']) + fillslot_timings.append(time.time() - start) + if resp.json()['err']: + Counters.fillslot_failed[i] += 1 + else: + Counters.fillslot_success[i] += 1 + print( + 'Thread', + i, + 'choice', + slot['id'], + 'fillslot_failed', + Counters.fillslot_failed[i], + 'fillslot_success', + Counters.fillslot_success[i], + 'available', + available, + 'booked', + sum(Counters.fillslot_success), + ) + # stop when we have done at least 100 bookings + if sum(Counters.fillslot_success) > 200: + break + # yeld to scheduler + # time.sleep(0.0001) + Counters.datetimes_timings_queue.put(datetimes_timings) + Counters.fillslot_timings_queue.put(fillslot_timings) + + threads = [multiprocessing.Process(target=worker, args=(i,)) for i in range(self.concurrency)] + start = time.time() + for thread in threads: + thread.start() + for thread in threads: + thread.join() + datetimes_timings = [] + while not Counters.datetimes_timings_queue.empty(): + datetimes_timings.extend(Counters.datetimes_timings_queue.get()) + fillslot_timings = [] + while not Counters.fillslot_timings_queue.empty(): + fillslot_timings.extend(Counters.fillslot_timings_queue.get()) + duration = time.time() - start + print('Fillslot failed', sum(Counters.fillslot_failed)) + print('Fillslot success', sum(Counters.fillslot_success)) + print( + 'Percent of failure', + '%2.0d %%' + % ( + sum(Counters.fillslot_failed) + / float(sum(Counters.fillslot_failed) + sum(Counters.fillslot_success)) + * 100 + ), + ) + print( + 'Fillslot per second', (sum(Counters.fillslot_failed) + sum(Counters.fillslot_success)) / duration + ) + args = ( + 'Datetimes', + 'avg duration', + statistics.mean(datetimes_timings), + ) + if sys.version_info >= (3, 8): + args += ( + 'quantiles', + statistics.quantiles(datetimes_timings, n=10), + ) + print(*args) + args = ( + 'Fillslot', + 'avg duration', + statistics.mean(fillslot_timings), + ) + if sys.version_info >= (3, 8): + args += ( + 'quantiles', + statistics.quantiles(fillslot_timings, n=10), + ) + print(*args) + assert Event.objects.count() == sum(Counters.fillslot_success) -- 2.31.0