|
15 | 15 | #include "storage/tests/storage_e2e_fixture.h"
|
16 | 16 | #include "test_utils/fixture.h"
|
17 | 17 |
|
| 18 | +#include <seastar/core/future.hh> |
18 | 19 | #include <seastar/core/io_priority_class.hh>
|
19 | 20 | #include <seastar/core/lowres_clock.hh>
|
20 | 21 |
|
|
25 | 26 |
|
26 | 27 | using namespace std::chrono_literals;
|
27 | 28 |
|
| 29 | +namespace { |
| 30 | +ss::future<> force_roll_log(storage::disk_log_impl* log) { |
| 31 | + try { |
| 32 | + co_await log->force_roll(ss::default_priority_class()); |
| 33 | + } catch (...) { |
| 34 | + } |
| 35 | +} |
| 36 | + |
| 37 | +} // namespace |
| 38 | + |
28 | 39 | FIXTURE_TEST(test_compaction_segment_ms, storage_e2e_fixture) {
|
29 | 40 | test_local_cfg.get("log_segment_ms_min")
|
30 | 41 | .set_value(std::chrono::duration_cast<std::chrono::milliseconds>(1ms));
|
@@ -156,3 +167,31 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) {
|
156 | 167 | // final round of eviction.
|
157 | 168 | BOOST_REQUIRE_LE(log->segment_count(), 1);
|
158 | 169 | }
|
| 170 | + |
| 171 | +FIXTURE_TEST(test_concurrent_segment_roll_and_close, storage_e2e_fixture) { |
| 172 | + const auto topic_name = model::topic("tapioca"); |
| 173 | + const auto ntp = model::ntp(model::kafka_namespace, topic_name, 0); |
| 174 | + |
| 175 | + cluster::topic_properties props; |
| 176 | + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); |
| 177 | + wait_for_leader(ntp).get(); |
| 178 | + |
| 179 | + auto partition = app.partition_manager.local().get(ntp); |
| 180 | + auto* log = dynamic_cast<storage::disk_log_impl*>(partition->log().get()); |
| 181 | + auto seg = log->segments().back(); |
| 182 | + |
| 183 | + // Hold a read lock, which will force release_appender() to go through |
| 184 | + // release_appender_in_background() |
| 185 | + auto read_lock_holder = seg->read_lock().get(); |
| 186 | + |
| 187 | + auto roll_fut = force_roll_log(log); |
| 188 | + auto release_holder_fut = ss::sleep(100ms).then( |
| 189 | + [read_locker_holder = std::move(read_lock_holder)] {}); |
| 190 | + auto remove_segment_fut = remove_segment_permanently(log, seg); |
| 191 | + |
| 192 | + ss::when_all( |
| 193 | + std::move(roll_fut), |
| 194 | + std::move(remove_segment_fut), |
| 195 | + std::move(release_holder_fut)) |
| 196 | + .get(); |
| 197 | +} |
0 commit comments