forked from wal-e/wal-e
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_tar_upload_pool.py
141 lines (91 loc) · 3.27 KB
/
test_tar_upload_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import pytest
from wal_e import exception
from wal_e import worker
class FakeTarPartition(object):
"""Implements enough protocol to test concurrency semantics."""
def __init__(self, num_members, explosive=False):
self._explosive = explosive
self.num_members = num_members
def __len__(self):
return self.num_members
class FakeUploader(object):
"""A no-op uploader that makes affordance for fault injection."""
def __call__(self, tpart):
if tpart._explosive:
raise tpart._explosive
return tpart
class Explosion(Exception):
"""Marker type of injected faults."""
pass
def make_pool(max_concurrency, max_members):
"""Set up a pool with a FakeUploader"""
return worker.TarUploadPool(FakeUploader(),
max_concurrency, max_members)
def test_simple():
"""Simple case of uploading one partition."""
pool = make_pool(1, 1)
pool.put(FakeTarPartition(1))
pool.join()
def test_not_enough_resources():
"""Detect if a too-large segment can never complete."""
pool = make_pool(1, 1)
with pytest.raises(exception.UserCritical):
pool.put(FakeTarPartition(2))
pool.join()
def test_simple_concurrency():
"""Try a pool that cannot execute all submitted jobs at once."""
pool = make_pool(1, 1)
for i in range(3):
pool.put(FakeTarPartition(1))
pool.join()
def test_fault_midstream():
"""Test if a previous upload fault is detected in calling .put.
This case is seen while pipelining many uploads in excess of the
maximum concurrency.
NB: This test is critical as to prevent failed uploads from
failing to notify a caller that the entire backup is incomplete.
"""
pool = make_pool(1, 1)
# Set up upload doomed to fail.
tpart = FakeTarPartition(1, explosive=Explosion('Boom'))
pool.put(tpart)
# Try to receive the error through adding another upload.
tpart = FakeTarPartition(1)
with pytest.raises(Explosion):
pool.put(tpart)
def test_fault_join():
"""Test if a fault is detected when .join is used.
This case is seen at the end of a series of uploads.
NB: This test is critical as to prevent failed uploads from
failing to notify a caller that the entire backup is incomplete.
"""
pool = make_pool(1, 1)
# Set up upload doomed to fail.
tpart = FakeTarPartition(1, explosive=Explosion('Boom'))
pool.put(tpart)
# Try to receive the error while finishing up.
with pytest.raises(Explosion):
pool.join()
def test_put_after_join():
"""New jobs cannot be submitted after a .join
This is mostly a re-check to detect programming errors.
"""
pool = make_pool(1, 1)
pool.join()
with pytest.raises(exception.UserCritical):
pool.put(FakeTarPartition(1))
def test_pool_concurrent_success():
pool = make_pool(4, 4)
for i in range(30):
pool.put(FakeTarPartition(1))
pool.join()
def test_pool_concurrent_failure():
pool = make_pool(4, 4)
parts = [FakeTarPartition(1) for i in range(30)]
exc = Explosion('boom')
parts[27]._explosive = exc
with pytest.raises(Explosion) as e:
for part in parts:
pool.put(part)
pool.join()
assert e.value is exc