Projet

Général

Profil

0001-utils-add-an-atomic_write-context-manager-32413.patch

Benjamin Dauvergne, 18 avril 2019 11:33

Télécharger (4,39 ko)

Voir les différences:

Subject: [PATCH 1/3] utils: add an atomic_write() context manager (#32413)

 passerelle/utils/files.py | 53 +++++++++++++++++++++++++++++++++++++++
 tests/test_utils_files.py | 48 +++++++++++++++++++++++++++++++++++
 2 files changed, 101 insertions(+)
 create mode 100644 passerelle/utils/files.py
 create mode 100644 tests/test_utils_files.py
passerelle/utils/files.py
1
# Copyright (C) 2018  Entr'ouvert
2
#
3
# This program is free software: you can redistribute it and/or modify it
4
# under the terms of the GNU Affero General Public License as published
5
# by the Free Software Foundation, either version 3 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU Affero General Public License for more details.
12
#
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15

  
16
import os.path
17
import contextlib
18
import tempfile
19
import errno
20

  
21
from django.conf import settings
22

  
23

  
24
@contextlib.contextmanager
25
def atomic_write(filepath, **kwargs):
26
    '''Return a file descriptor to a temporary file using NamedTemporaryFile
27
       which will be atomically renamed to filepath if possible.
28

  
29
       Atomic renaming is only possible on the same filesystem, so the
30
       temporary file will be created in the same directory as the target file
31

  
32
       You can pass any possible argument to NamedTemporaryFile with kwargs.
33
    '''
34

  
35
    tmp_dir = kwargs.pop('dir', None)
36
    if not tmp_dir:
37
        tmp_dir = os.path.join(settings.MEDIA_ROOT, 'tmp')
38
        if not os.path.exists(tmp_dir):
39
            try:
40
                os.makedirs(tmp_dir)
41
            except OSError as e:
42
                if e.errno != errno.EEXIST:
43
                    raise
44
    fd = tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False, **kwargs)
45
    try:
46
        with fd:
47
            yield fd
48
            fd.flush()
49
            os.fsync(fd.fileno())
50
        os.rename(fd.name, filepath)
51
    except Exception:
52
        os.unlink(fd.name)
53
        raise
tests/test_utils_files.py
1
# Copyright (C) 2018  Entr'ouvert
2
#
3
# This program is free software: you can redistribute it and/or modify it
4
# under the terms of the GNU Affero General Public License as published
5
# by the Free Software Foundation, either version 3 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU Affero General Public License for more details.
12
#
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15

  
16
import os
17
import errno
18

  
19
import pytest
20

  
21
from passerelle.utils.files import atomic_write
22

  
23

  
24
def test_atomic_write(settings, tmpdir):
25
    settings.MEDIA_ROOT = str(tmpdir.mkdir('media'))
26
    target_dir = tmpdir.mkdir('target')
27
    filepath = str(target_dir.join('test'))
28

  
29
    assert not os.path.exists(os.path.join(settings.MEDIA_ROOT, 'tmp'))
30
    with pytest.raises(Exception):
31
        with atomic_write(filepath) as fd:
32
            fd.write('coucou')
33
            raise Exception()
34
    assert os.path.exists(os.path.join(settings.MEDIA_ROOT, 'tmp'))
35
    assert not os.path.exists(filepath)
36
    assert os.listdir(str(target_dir)) == []
37

  
38
    with atomic_write(filepath) as fd:
39
        fd.write('coucou')
40

  
41
    assert os.path.exists(filepath)
42
    with open(filepath) as fd:
43
        assert fd.read() == 'coucou'
44

  
45
    with pytest.raises(OSError) as exc_info:
46
        with atomic_write(filepath, dir=str(tmpdir) + '/tmp2/') as fd:
47
            fd.write('coucou')
48
    assert exc_info.value.errno == errno.ENOENT
0
-