diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto index 6d756dd8c2d2..b953c62bfe2e 100644 --- a/ydb/core/protos/counters_replication.proto +++ b/ydb/core/protos/counters_replication.proto @@ -34,4 +34,5 @@ enum ETxTypes { TXTYPE_DROP_STREAM_RESULT = 8 [(TxTypeOpts) = {Name: "TxDropStreamResult"}]; TXTYPE_DROP_DST_RESULT = 9 [(TxTypeOpts) = {Name: "TxDropDstResult"}]; TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}]; + TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}]; } diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 723df30059c1..17227e475f5e 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -7,10 +7,12 @@ option java_package = "ru.yandex.kikimr.proto"; message TStaticCredentials { optional string User = 1; optional string Password = 2 [(Ydb.sensitive) = true]; + optional string PasswordSecretName = 3; } message TOAuthToken { optional string Token = 1 [(Ydb.sensitive) = true]; + optional string TokenSecretName = 2; } message TConnectionParams { diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index f222d65e8966..d9112a62af6f 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -52,6 +52,7 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvDropStreamResult, Handle); HFunc(TEvPrivate::TEvCreateDstResult, Handle); HFunc(TEvPrivate::TEvDropDstResult, Handle); + HFunc(TEvPrivate::TEvResolveSecretResult, Handle); HFunc(TEvPrivate::TEvResolveTenantResult, Handle); HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); HFunc(TEvPrivate::TEvRunWorkers, Handle); @@ -152,6 +153,11 @@ void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorCon RunTxDropDstResult(ev, ctx); } +void TController::Handle(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxResolveSecretResult(ev, ctx); +} + void TController::Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 8a44d5434849..f0749ac0f1a6 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -75,6 +75,7 @@ class TController void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvRunWorkers::TPtr& ev, const TActorContext& ctx); @@ -103,6 +104,7 @@ class TController class TTxDropStreamResult; class TTxCreateDstResult; class TTxDropDstResult; + class TTxResolveSecretResult; // tx runners void RunTxInitSchema(const TActorContext& ctx); @@ -117,6 +119,7 @@ class TController void RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx); void RunTxCreateDstResult(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx); void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); + void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx); // other template diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index a3313312ef8b..f57d77bf0f20 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -181,6 +181,33 @@ TString TEvPrivate::TEvUpdateTenantNodes::ToString() const { << " }"; } +TEvPrivate::TEvResolveSecretResult::TEvResolveSecretResult(ui64 rid, const TString& secretValue) + : ReplicationId(rid) + , SecretValue(secretValue) + , Success(true) +{ +} + +TEvPrivate::TEvResolveSecretResult::TEvResolveSecretResult(ui64 rid, bool success, const TString& error) + : ReplicationId(rid) + , Success(success) + , Error(error) +{ + Y_ABORT_UNLESS(!success); +} + +TString TEvPrivate::TEvResolveSecretResult::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " Success: " << Success + << " Error: " << Error + << " }"; +} + +bool TEvPrivate::TEvResolveSecretResult::IsSuccess() const { + return Success; +} + } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) { diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index c19ec8e8ce46..6bb2c45fdb19 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -21,6 +21,7 @@ struct TEvPrivate { EvResolveTenantResult, EvUpdateTenantNodes, EvRunWorkers, + EvResolveSecretResult, EvEnd, }; @@ -129,6 +130,19 @@ struct TEvPrivate { struct TEvRunWorkers: public TEventLocal { }; + struct TEvResolveSecretResult: public TEventLocal { + const ui64 ReplicationId; + const TString SecretValue; + const bool Success; + const TString Error; + + explicit TEvResolveSecretResult(ui64 rid, const TString& secretValue); + explicit TEvResolveSecretResult(ui64 rid, bool success, const TString& error); + TString ToString() const override; + + bool IsSuccess() const; + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index 8d5645ec8db5..71e47bc4f1f4 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -1,5 +1,6 @@ #include "private_events.h" #include "replication.h" +#include "secret_resolver.h" #include "target_discoverer.h" #include "target_table.h" #include "tenant_resolver.h" @@ -18,6 +19,14 @@ namespace NKikimr::NReplication::NController { class TReplication::TImpl { friend class TReplication; + void ResolveSecret(const TString& secretName, const TActorContext& ctx) { + if (SecretResolver) { + return; + } + + SecretResolver = ctx.Register(CreateSecretResolver(ctx.SelfID, ReplicationId, PathId, secretName)); + } + template ITarget* CreateTarget(ui64 id, ETargetKind kind, Args&&... args) const { switch (kind) { @@ -95,9 +104,15 @@ class TReplication::TImpl { switch (params.GetCredentialsCase()) { case NKikimrReplication::TConnectionParams::kStaticCredentials: + if (!params.GetStaticCredentials().HasPassword()) { + return ResolveSecret(params.GetStaticCredentials().GetPasswordSecretName(), ctx); + } ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetStaticCredentials())); break; case NKikimrReplication::TConnectionParams::kOAuthToken: + if (!params.GetOAuthToken().HasToken()) { + return ResolveSecret(params.GetOAuthToken().GetTokenSecretName(), ctx); + } ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetOAuthToken().GetToken())); break; default: @@ -139,7 +154,7 @@ class TReplication::TImpl { target->Shutdown(ctx); } - for (auto* x : TVector{&TargetDiscoverer, &TenantResolver, &YdbProxy}) { + for (auto* x : TVector{&SecretResolver, &TargetDiscoverer, &TenantResolver, &YdbProxy}) { if (auto actorId = std::exchange(*x, {})) { ctx.Send(actorId, new TEvents::TEvPoison()); } @@ -165,6 +180,7 @@ class TReplication::TImpl { TString Issue; ui64 NextTargetId = 1; THashMap> Targets; + TActorId SecretResolver; TActorId YdbProxy; TActorId TenantResolver; TActorId TargetDiscoverer; @@ -261,6 +277,20 @@ ui64 TReplication::GetNextTargetId() const { return Impl->NextTargetId; } +void TReplication::UpdateSecret(const TString& secretValue) { + auto& params = *Impl->Config.MutableSrcConnectionParams(); + switch (params.GetCredentialsCase()) { + case NKikimrReplication::TConnectionParams::kStaticCredentials: + params.MutableStaticCredentials()->SetPassword(secretValue); + break; + case NKikimrReplication::TConnectionParams::kOAuthToken: + params.MutableOAuthToken()->SetToken(secretValue); + break; + default: + Y_ABORT("unreachable"); + } +} + void TReplication::SetTenant(const TString& value) { Impl->Tenant = value; Impl->TenantResolver = {}; diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index a8324707988a..e2fc71a19ac3 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -109,6 +109,8 @@ class TReplication: public TSimpleRefCount { void SetNextTargetId(ui64 value); ui64 GetNextTargetId() const; + void UpdateSecret(const TString& secretValue); + void SetTenant(const TString& value); const TString& GetTenant() const; diff --git a/ydb/core/tx/replication/controller/secret_resolver.cpp b/ydb/core/tx/replication/controller/secret_resolver.cpp new file mode 100644 index 000000000000..cbc289ec9fb4 --- /dev/null +++ b/ydb/core/tx/replication/controller/secret_resolver.cpp @@ -0,0 +1,120 @@ +#include "logging.h" +#include "private_events.h" +#include "secret_resolver.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NReplication::NController { + +class TSecretResolver: public TActorBootstrapped { + static NMetadata::NFetcher::ISnapshotsFetcher::TPtr SnapshotFetcher() { + return std::make_shared(); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto* response = ev->Get()->Request.Get(); + + Y_ABORT_UNLESS(response->ResultSet.size() == 1); + const auto& entry = response->ResultSet.front(); + + LOG_T("Handle " << ev->Get()->ToString() + << ": entry# " << entry.ToString()); + + switch (entry.Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + break; + default: + LOG_W("Unexpected status" + << ": entry# " << entry.ToString()); + return Schedule(RetryInterval, new TEvents::TEvWakeup); + } + + if (!entry.SecurityObject) { + return Reply(false, "Empty security object"); + } + + SecretId = NMetadata::NSecret::TSecretId(entry.SecurityObject->GetOwnerSID(), SecretName); + Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), + new NMetadata::NProvider::TEvAskSnapshot(SnapshotFetcher())); + } + + void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { + const auto* snapshot = ev->Get()->GetSnapshotAs(); + + TString secretValue; + if (!snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(SecretId), secretValue)) { + return Reply(false, TStringBuilder() << "Secret '" << SecretName << "' not found"); + } + + Reply(secretValue); + } + + template + void Reply(Args&&... args) { + Send(Parent, new TEvPrivate::TEvResolveSecretResult(ReplicationId, std::forward(args)...)); + PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_SECRET_RESOLVER; + } + + explicit TSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName) + : Parent(parent) + , ReplicationId(rid) + , PathId(pathId) + , SecretName(secretName) + , LogPrefix("SecretResolver", ReplicationId) + { + } + + void Bootstrap() { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + return Reply(false, "Metadata service is not active"); + } + + auto request = MakeHolder(); + + auto& entry = request->ResultSet.emplace_back(); + entry.TableId = PathId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.RedirectRequired = false; + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const ui64 ReplicationId; + const TPathId PathId; + const TString SecretName; + const TActorLogPrefix LogPrefix; + + static constexpr auto RetryInterval = TDuration::Seconds(1); + NMetadata::NSecret::TSecretId SecretId; + +}; // TSecretResolver + +IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName) { + return new TSecretResolver(parent, rid, pathId, secretName); +} + +} diff --git a/ydb/core/tx/replication/controller/secret_resolver.h b/ydb/core/tx/replication/controller/secret_resolver.h new file mode 100644 index 000000000000..6408469c7633 --- /dev/null +++ b/ydb/core/tx/replication/controller/secret_resolver.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NKikimr::NReplication::NController { + +IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName); + +} diff --git a/ydb/core/tx/replication/controller/tx_resolve_secret_result.cpp b/ydb/core/tx/replication/controller/tx_resolve_secret_result.cpp new file mode 100644 index 000000000000..2a25030e2a4c --- /dev/null +++ b/ydb/core/tx/replication/controller/tx_resolve_secret_result.cpp @@ -0,0 +1,73 @@ +#include "controller_impl.h" + +namespace NKikimr::NReplication::NController { + +class TController::TTxResolveSecretResult: public TTxBase { + TEvPrivate::TEvResolveSecretResult::TPtr Ev; + TReplication::TPtr Replication; + +public: + explicit TTxResolveSecretResult(TController* self, TEvPrivate::TEvResolveSecretResult::TPtr& ev) + : TTxBase("TxResolveSecretResult", self) + , Ev(ev) + { + } + + TTxType GetTxType() const override { + return TXTYPE_RESOLVE_SECRET_RESULT; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + + const auto rid = Ev->Get()->ReplicationId; + + Replication = Self->Find(rid); + if (!Replication) { + CLOG_W(ctx, "Unknown replication" + << ": rid# " << rid); + return true; + } + + if (Replication->GetState() != TReplication::EState::Ready) { + CLOG_W(ctx, "Replication state mismatch" + << ": rid# " << rid + << ", state# " << Replication->GetState()); + return true; + } + + if (Ev->Get()->IsSuccess()) { + CLOG_N(ctx, "Secret resolved" + << ": rid# " << rid); + Replication->UpdateSecret(Ev->Get()->SecretValue); + } else { + CLOG_E(ctx, "Resolve secret error" + << ": rid# " << rid + << ", error# " << Ev->Get()->Error); + Replication->SetState(TReplication::EState::Error, Ev->Get()->Error); + + NIceDb::TNiceDb db(txc.DB); + db.Table().Key(Replication->GetId()).Update( + NIceDb::TUpdate(Replication->GetState()), + NIceDb::TUpdate(Replication->GetIssue()) + ); + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + CLOG_D(ctx, "Complete"); + + if (Replication) { + Replication->Progress(ctx); + } + } + +}; // TTxResolveSecretResult + +void TController::RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxResolveSecretResult(this, ev), ctx); +} + +} diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 1500893b4ac4..951cfc1e3828 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -10,6 +10,7 @@ PEERDIR( ydb/core/tx/replication/ydb_proxy ydb/core/util ydb/core/ydb_convert + ydb/services/metadata library/cpp/json ) @@ -21,6 +22,7 @@ SRCS( nodes_manager.cpp private_events.cpp replication.cpp + secret_resolver.cpp session_info.cpp stream_creator.cpp stream_remover.cpp @@ -41,6 +43,7 @@ SRCS( tx_drop_stream_result.cpp tx_init.cpp tx_init_schema.cpp + tx_resolve_secret_result.cpp ) GENERATE_ENUM_SERIALIZATION(replication.h) diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index f0c35abadf36..585fa7b2810d 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1029,5 +1029,6 @@ message TActivity { REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR = 633; REPLICATION_S3_WRITER = 634; BACKUP_CONTROLLER_TABLET = 635; + REPLICATION_CONTROLLER_SECRET_RESOLVER = 636; }; };