Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid reconnecting live nbd sockets; resolves #3138 #3172

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoint_proxy/server/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ void TBootstrap::Init()
Options.NbdRequestTimeout,
Options.NbdReconnectDelay,
Options.WithoutLibnl,
Options.RestartEvents,
},
CreateWallClockTimer(),
Scheduler,
Expand Down
10 changes: 8 additions & 2 deletions cloud/blockstore/libs/endpoint_proxy/server/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,26 @@ TOptions::TOptions()
.StoreResult(&StoredEndpointsPath);

Opts.AddLongOption("nbd-request-timeout")
.OptionalArgument("NUM")
.RequiredArgument("NUM")
.Handler1T<TString>([this] (const auto& s) {
NbdRequestTimeout = TDuration::Parse(s);
});

Opts.AddLongOption("nbd-reconnect-delay")
.OptionalArgument("NUM")
.RequiredArgument("NUM")
.Handler1T<TString>([this] (const auto& s) {
NbdReconnectDelay = TDuration::Parse(s);
});

Opts.AddLongOption("without-libnl")
.NoArgument()
.SetFlag(&WithoutLibnl);

Opts.AddLongOption(
"restart-events",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

К сожалению без этого не получилось надежно воспроизвести проблему в тесте

"debug option. issue multiple restart events for each io error")
.RequiredArgument("NUM")
.StoreResult(&RestartEvents);
}

void TOptions::Parse(int argc, char** argv)
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoint_proxy/server/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TOptions final: TOptionsBase
TDuration NbdRequestTimeout = TDuration::Minutes(10);
TDuration NbdReconnectDelay = TDuration::MilliSeconds(100);
bool WithoutLibnl = false;
ui32 RestartEvents = 1;

TOptions();

Expand Down
38 changes: 28 additions & 10 deletions cloud/blockstore/libs/endpoint_proxy/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ struct TEndpoint: std::enable_shared_from_this<TEndpoint>
NBD::IDevicePtr NbdDevice;
std::unique_ptr<TNetworkAddress> ListenAddress;
IProxyRequestStatsPtr RequestStats;

TString UnixSocketPath;
TString InternalUnixSocketPath;
TString NbdDevicePath;

NBD::TStorageOptions NbdOptions;
std::atomic<ui64> Generation = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -191,16 +190,19 @@ struct TRestartAlarmContext: TRequestContextBase
{
std::weak_ptr<TEndpoint> Endpoint;
grpc::ServerCompletionQueue& CQ;
ui64 Generation;
TBackoffDelayProvider Backoff;
grpc::Alarm Alarm;

TRestartAlarmContext(
std::weak_ptr<TEndpoint> endpoint,
grpc::ServerCompletionQueue& cq,
ui64 generation,
TDuration minReconnectDelay,
TDuration maxReconnectDelay)
: Endpoint(std::move(endpoint))
, CQ(cq)
, Generation(generation)
, Backoff{minReconnectDelay, maxReconnectDelay}
{
SetAlarm();
Expand Down Expand Up @@ -275,27 +277,36 @@ struct TServer: IEndpointProxyServer
{
std::weak_ptr<TEndpoint> Endpoint;
grpc::ServerCompletionQueue& CQ;
ui64 Generation;
TDuration ReconnectDelay;
ui32 RestartEvents;

TErrorHandler(
std::weak_ptr<TEndpoint> endpoint,
grpc::ServerCompletionQueue& cq,
TDuration reconnectDelay)
ui64 generation,
TDuration reconnectDelay,
ui32 restartEvents)
: Endpoint(std::move(endpoint))
, CQ(cq)
, Generation(generation)
, ReconnectDelay(reconnectDelay)
, RestartEvents(restartEvents)
{}

void ProcessException(std::exception_ptr e) override
{
Y_UNUSED(e);

// context will be held by grpc until CQ->Next returns it
new TRestartAlarmContext(
Endpoint,
CQ,
ReconnectDelay,
MAX_RECONNECT_DELAY);
for (ui32 i = 0; i < RestartEvents; i++) {
// context will be held by grpc until CQ->Next returns it
new TRestartAlarmContext(
Endpoint,
CQ,
Generation,
ReconnectDelay,
MAX_RECONNECT_DELAY);
}
}
};

Expand Down Expand Up @@ -945,7 +956,9 @@ struct TServer: IEndpointProxyServer
std::make_shared<TErrorHandler>(
ep.shared_from_this(),
*CQ,
Config.NbdReconnectDelay),
ep.Generation,
Config.NbdReconnectDelay,
Config.RestartEvents),
ep.NbdOptions);

// TODO fix StartEndpoint signature - it's actually synchronous
Expand Down Expand Up @@ -1007,6 +1020,11 @@ struct TServer: IEndpointProxyServer
bool ProcessAlarm(TRestartAlarmContext* context)
{
if (auto ep = context->Endpoint.lock()) {
if (context->Generation != ep->Generation) {
return false;
}
ep->Generation++;

const auto tag = TStringBuilder()
<< "UnixSocketPath: " << ep->UnixSocketPath.Quote() << " - ";

Expand Down
5 changes: 4 additions & 1 deletion cloud/blockstore/libs/endpoint_proxy/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct TEndpointProxyServerConfig
TDuration NbdRequestTimeout;
TDuration NbdReconnectDelay;
bool WithoutLibnl;
ui32 RestartEvents;

TEndpointProxyServerConfig(
ui16 port,
Expand All @@ -42,7 +43,8 @@ struct TEndpointProxyServerConfig
TString storedEndpointsPath,
TDuration nbdRequestTimeout,
TDuration nbdReconnectDelay,
bool withoutLibnl)
bool withoutLibnl,
ui32 restartEvents)
: Port(port)
, SecurePort(securePort)
, RootCertsFile(std::move(rootCertsFile))
Expand All @@ -54,6 +56,7 @@ struct TEndpointProxyServerConfig
, NbdRequestTimeout(nbdRequestTimeout)
, NbdReconnectDelay(nbdReconnectDelay)
, WithoutLibnl(withoutLibnl)
, RestartEvents(restartEvents)
{
}
};
Expand Down
88 changes: 87 additions & 1 deletion cloud/blockstore/tests/e2e-tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def init(
stored_endpoints_path=None,
nbd_request_timeout=None,
nbd_reconnect_delay=None,
restart_interval=None,
proxy_restart_events=None,
):
server_config_patch = TServerConfig()
server_config_patch.NbdEnabled = True
Expand Down Expand Up @@ -71,11 +73,13 @@ def init(
server_app_config=server,
storage_config_patches=None,
use_in_memory_pdisks=True,
restart_interval=restart_interval,
with_endpoint_proxy=with_endpoint_proxy,
with_netlink=with_netlink,
stored_endpoints_path=stored_endpoints_path,
nbd_request_timeout=nbd_request_timeout,
nbd_reconnect_delay=nbd_reconnect_delay)
nbd_reconnect_delay=nbd_reconnect_delay,
proxy_restart_events=proxy_restart_events)

client_config_path = Path(yatest_common.output_path()) / "client-config.txt"
client_config = TClientAppConfig()
Expand Down Expand Up @@ -127,6 +131,88 @@ def log_called_process_error(exc):
)


def test_multiple_errors():
env, run = init(
restart_interval=5,
with_netlink=True,
with_endpoint_proxy=True,
nbd_request_timeout="1s",
proxy_restart_events=2)

volume_name = "test-disk"
block_size = 4096
blocks_count = 1024
nbd_device = "/dev/nbd0"
socket_path = "/tmp/nbd.sock"
runtime = 10
numjobs = blocks_count

try:
result = run(
"createvolume",
"--disk-id",
volume_name,
"--blocks-count",
str(blocks_count),
"--block-size",
str(block_size),
)
assert result.returncode == 0

result = run(
"startendpoint",
"--disk-id",
volume_name,
"--socket",
socket_path,
"--ipc-type",
"nbd",
"--persistent",
"--nbd-device",
nbd_device
)
assert result.returncode == 0

proc = subprocess.Popen(
[
"fio",
"--name=fio",
"--ioengine=sync",
"--direct=1",
"--time_based=1",
"--rw=randrw",
"--rwmixread=50",
"--filename=" + nbd_device,
"--runtime=" + str(runtime),
"--blocksize=" + str(block_size),
"--numjobs=" + str(numjobs),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
proc.communicate(timeout=60)
assert proc.returncode == 0

except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise

finally:
run(
"stopendpoint",
"--socket",
socket_path,
)

result = run(
"destroyvolume",
"--disk-id",
volume_name,
input=volume_name,
)

cleanup_after_test(env)


def test_stop_start():
env, run = init(
with_netlink=True,
Expand Down
6 changes: 5 additions & 1 deletion cloud/blockstore/tests/python/lib/endpoint_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ def __init__(
with_netlink,
stored_endpoints_path,
nbd_request_timeout,
nbd_reconnect_delay
nbd_reconnect_delay,
restart_events,
):
command = [yatest_common.binary_path(
"cloud/blockstore/apps/endpoint_proxy/blockstore-endpoint-proxy")]
Expand All @@ -30,6 +31,9 @@ def __init__(
if nbd_reconnect_delay:
command += ["--nbd-reconnect-delay", nbd_reconnect_delay]

if restart_events:
command += ["--restart-events", str(restart_events)]

super(EndpointProxy, self).__init__(
commands=[command],
cwd=working_dir,
Expand Down
4 changes: 3 additions & 1 deletion cloud/blockstore/tests/python/lib/loadtest_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
stored_endpoints_path=None,
nbd_request_timeout=None,
nbd_reconnect_delay=None,
proxy_restart_events=None,
):

self.__endpoint = endpoint
Expand Down Expand Up @@ -131,7 +132,8 @@ def __init__(
with_netlink=with_netlink,
stored_endpoints_path=stored_endpoints_path,
nbd_request_timeout=nbd_request_timeout,
nbd_reconnect_delay=nbd_reconnect_delay)
nbd_reconnect_delay=nbd_reconnect_delay,
restart_events=proxy_restart_events)

if run_kikimr:
self.nbs.setup_cms(self.kikimr_cluster.client)
Expand Down
Loading