10
10
#include " kafka/server/tests/produce_consume_utils.h"
11
11
#include " model/fundamental.h"
12
12
#include " random/generators.h"
13
- #include " redpanda/tests/fixture.h"
13
+ #include " storage/disk_log_impl.h"
14
+ #include " storage/segment.h"
15
+ #include " storage/tests/storage_e2e_fixture.h"
14
16
#include " test_utils/fixture.h"
15
- #include " test_utils/scoped_config.h"
16
17
18
+ #include < seastar/core/future.hh>
19
+ #include < seastar/core/io_priority_class.hh>
17
20
#include < seastar/core/lowres_clock.hh>
18
21
19
22
#include < boost/test/tools/old/interface.hpp>
23
26
24
27
using namespace std ::chrono_literals;
25
28
26
- struct storage_e2e_fixture : public redpanda_thread_fixture {
27
- scoped_config test_local_cfg;
28
- };
29
-
30
29
namespace {
31
-
32
- // Produces to the given fixture's partition for 10 seconds.
33
- ss::future<> produce_to_fixture (
34
- storage_e2e_fixture* fix, model::topic topic_name, int * incomplete) {
35
- tests::kafka_produce_transport producer (co_await fix->make_kafka_client ());
36
- co_await producer.start ();
37
- const int cardinality = 10 ;
38
- auto now = ss::lowres_clock::now ();
39
- while (ss::lowres_clock::now () < now + 5s) {
40
- for (int i = 0 ; i < cardinality; i++) {
41
- co_await producer.produce_to_partition (
42
- topic_name, model::partition_id (0 ), tests::kv_t::sequence (i, 1 ));
43
- }
30
+ ss::future<> force_roll_log (storage::disk_log_impl* log) {
31
+ try {
32
+ co_await log ->force_roll (ss::default_priority_class ());
33
+ } catch (...) {
44
34
}
45
- *incomplete -= 1 ;
46
35
}
36
+
47
37
} // namespace
48
38
49
39
FIXTURE_TEST (test_compaction_segment_ms, storage_e2e_fixture) {
@@ -69,7 +59,7 @@ FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) {
69
59
produces.reserve (5 );
70
60
int incomplete = 5 ;
71
61
for (int i = 0 ; i < 5 ; i++) {
72
- auto fut = produce_to_fixture (this , topic_name, &incomplete);
62
+ auto fut = produce_to_fixture (topic_name, &incomplete);
73
63
produces.emplace_back (std::move (fut));
74
64
}
75
65
auto partition = app.partition_manager .local ().get (ntp);
@@ -177,3 +167,31 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) {
177
167
// final round of eviction.
178
168
BOOST_REQUIRE_LE (log ->segment_count (), 1 );
179
169
}
170
+
171
+ FIXTURE_TEST (test_concurrent_segment_roll_and_close, storage_e2e_fixture) {
172
+ const auto topic_name = model::topic (" tapioca" );
173
+ const auto ntp = model::ntp (model::kafka_namespace, topic_name, 0 );
174
+
175
+ cluster::topic_properties props;
176
+ add_topic ({model::kafka_namespace, topic_name}, 1 , props).get ();
177
+ wait_for_leader (ntp).get ();
178
+
179
+ auto partition = app.partition_manager .local ().get (ntp);
180
+ auto * log = dynamic_cast <storage::disk_log_impl*>(partition->log ().get ());
181
+ auto seg = log ->segments ().back ();
182
+
183
+ // Hold a read lock, which will force release_appender() to go through
184
+ // release_appender_in_background()
185
+ auto read_lock_holder = seg->read_lock ().get ();
186
+
187
+ auto roll_fut = force_roll_log (log );
188
+ auto release_holder_fut = ss::sleep (100ms).then (
189
+ [read_locker_holder = std::move (read_lock_holder)] {});
190
+ auto remove_segment_fut = remove_segment_permanently (log , seg);
191
+
192
+ ss::when_all (
193
+ std::move (roll_fut),
194
+ std::move (remove_segment_fut),
195
+ std::move (release_holder_fut))
196
+ .get ();
197
+ }
0 commit comments