1
|
import configparser
|
2
|
import os
|
3
|
import random
|
4
|
import shutil
|
5
|
import tempfile
|
6
|
|
7
|
import psycopg2
|
8
|
|
9
|
from webtest import TestApp
|
10
|
from quixote import cleanup, get_publisher
|
11
|
from django.conf import settings
|
12
|
|
13
|
from wcs.qommon.publisher import set_publisher_class
|
14
|
import wcs
|
15
|
import wcs.wsgi
|
16
|
from wcs.qommon.http_request import HTTPRequest
|
17
|
from wcs import publisher, compat, sql
|
18
|
|
19
|
import wcs.middleware
|
20
|
|
21
|
wcs.middleware.AfterJobsMiddleware.ASYNC = False
|
22
|
|
23
|
|
24
|
def create_temporary_pub():
|
25
|
config = configparser.ConfigParser()
|
26
|
APP_DIR = tempfile.mkdtemp()
|
27
|
compat.CompatWcsPublisher.APP_DIR = APP_DIR
|
28
|
compat.CompatWcsPublisher.DATA_DIR = os.path.abspath(
|
29
|
os.path.join(os.path.dirname(wcs.__file__), '..', 'data')
|
30
|
)
|
31
|
compat.CompatWcsPublisher.cronjobs = None
|
32
|
config.add_section('extra')
|
33
|
config.set('extra', 'auquotidien', os.path.join(os.path.dirname(__file__), '..', 'auquotidien'))
|
34
|
compat.CompatWcsPublisher._initialized = False
|
35
|
compat.CompatWcsPublisher.configure(config)
|
36
|
compat.CompatWcsPublisher.init_publisher_class()
|
37
|
pub = compat.CompatWcsPublisher.create_publisher()
|
38
|
pub.app_dir = os.path.join(APP_DIR, 'example.net')
|
39
|
os.mkdir(pub.app_dir)
|
40
|
|
41
|
# set classes
|
42
|
pub.user_class = sql.SqlUser
|
43
|
pub.role_class = sql.Role
|
44
|
pub.token_class = sql.Token
|
45
|
pub.tracking_code_class = sql.TrackingCode
|
46
|
pub.session_class = sql.Session
|
47
|
pub.custom_view_class = sql.CustomView
|
48
|
pub.snapshot_class = sql.Snapshot
|
49
|
pub.loggederror_class = sql.LoggedError
|
50
|
|
51
|
conn = psycopg2.connect(user=os.environ['USER'], dbname='postgres')
|
52
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
53
|
i = 0
|
54
|
while True:
|
55
|
dbname = 'wcstests%d' % random.randint(0, 100000)
|
56
|
try:
|
57
|
cur = conn.cursor()
|
58
|
cur.execute('CREATE DATABASE %s' % dbname)
|
59
|
break
|
60
|
except psycopg2.Error:
|
61
|
if i < 5:
|
62
|
i += 1
|
63
|
continue
|
64
|
raise
|
65
|
finally:
|
66
|
cur.close()
|
67
|
|
68
|
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
|
69
|
pub.write_cfg()
|
70
|
|
71
|
sql.do_user_table()
|
72
|
sql.do_role_table()
|
73
|
sql.do_tokens_table()
|
74
|
sql.do_tracking_code_table()
|
75
|
sql.do_session_table()
|
76
|
sql.do_transient_data_table()
|
77
|
sql.do_custom_views_table()
|
78
|
sql.do_snapshots_table()
|
79
|
sql.do_loggederrors_table()
|
80
|
sql.do_meta_table()
|
81
|
sql.init_global_table()
|
82
|
conn.close()
|
83
|
|
84
|
return pub
|
85
|
|
86
|
|
87
|
def clean_temporary_pub():
|
88
|
pub = get_publisher()
|
89
|
conn = psycopg2.connect(user=os.environ['USER'], dbname='postgres')
|
90
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
91
|
try:
|
92
|
cur = conn.cursor()
|
93
|
cur.execute('DROP DATABASE %s' % pub.cfg['postgresql']['database'])
|
94
|
cur.close()
|
95
|
except psycopg2.Error as e:
|
96
|
print(e)
|
97
|
shutil.rmtree(pub.app_dir)
|
98
|
pub.cleanup()
|
99
|
|
100
|
|
101
|
def get_app(pub, https=False):
|
102
|
extra_environ = {'HTTP_HOST': 'example.net', 'REMOTE_ADDR': '127.0.0.1'}
|
103
|
if https:
|
104
|
settings.SECURE_PROXY_SSL_HEADER = ('HTTPS', 'on')
|
105
|
extra_environ['HTTPS'] = 'on'
|
106
|
else:
|
107
|
extra_environ['HTTPS'] = 'off'
|
108
|
return TestApp(wcs.wsgi.application, extra_environ=extra_environ)
|
109
|
|
110
|
|
111
|
def login(app, username='admin', password='admin'):
|
112
|
login_page = app.get('/login/')
|
113
|
login_form = login_page.forms['login-form']
|
114
|
login_form['username'] = username
|
115
|
login_form['password'] = password
|
116
|
resp = login_form.submit()
|
117
|
assert resp.status_int == 302
|
118
|
return app
|