Skip to content

Commit

Permalink
Support secrets: core part (ydb-platform#4389)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored May 13, 2024
1 parent 418f880 commit 18ea57a
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 1 deletion.
1 change: 1 addition & 0 deletions ydb/core/protos/counters_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ enum ETxTypes {
TXTYPE_DROP_STREAM_RESULT = 8 [(TxTypeOpts) = {Name: "TxDropStreamResult"}];
TXTYPE_DROP_DST_RESULT = 9 [(TxTypeOpts) = {Name: "TxDropDstResult"}];
TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}];
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
}
2 changes: 2 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ option java_package = "ru.yandex.kikimr.proto";
message TStaticCredentials {
optional string User = 1;
optional string Password = 2 [(Ydb.sensitive) = true];
optional string PasswordSecretName = 3;
}

message TOAuthToken {
optional string Token = 1 [(Ydb.sensitive) = true];
optional string TokenSecretName = 2;
}

message TConnectionParams {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvDropStreamResult, Handle);
HFunc(TEvPrivate::TEvCreateDstResult, Handle);
HFunc(TEvPrivate::TEvDropDstResult, Handle);
HFunc(TEvPrivate::TEvResolveSecretResult, Handle);
HFunc(TEvPrivate::TEvResolveTenantResult, Handle);
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
HFunc(TEvPrivate::TEvRunWorkers, Handle);
Expand Down Expand Up @@ -152,6 +153,11 @@ void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorCon
RunTxDropDstResult(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxResolveSecretResult(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class TController
void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRunWorkers::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -103,6 +104,7 @@ class TController
class TTxDropStreamResult;
class TTxCreateDstResult;
class TTxDropDstResult;
class TTxResolveSecretResult;

// tx runners
void RunTxInitSchema(const TActorContext& ctx);
Expand All @@ -117,6 +119,7 @@ class TController
void RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void RunTxCreateDstResult(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);

// other
template <typename T>
Expand Down
27 changes: 27 additions & 0 deletions ydb/core/tx/replication/controller/private_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,33 @@ TString TEvPrivate::TEvUpdateTenantNodes::ToString() const {
<< " }";
}

TEvPrivate::TEvResolveSecretResult::TEvResolveSecretResult(ui64 rid, const TString& secretValue)
: ReplicationId(rid)
, SecretValue(secretValue)
, Success(true)
{
}

TEvPrivate::TEvResolveSecretResult::TEvResolveSecretResult(ui64 rid, bool success, const TString& error)
: ReplicationId(rid)
, Success(success)
, Error(error)
{
Y_ABORT_UNLESS(!success);
}

TString TEvPrivate::TEvResolveSecretResult::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " ReplicationId: " << ReplicationId
<< " Success: " << Success
<< " Error: " << Error
<< " }";
}

bool TEvPrivate::TEvResolveSecretResult::IsSuccess() const {
return Success;
}

}

Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) {
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/tx/replication/controller/private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TEvPrivate {
EvResolveTenantResult,
EvUpdateTenantNodes,
EvRunWorkers,
EvResolveSecretResult,

EvEnd,
};
Expand Down Expand Up @@ -129,6 +130,19 @@ struct TEvPrivate {
struct TEvRunWorkers: public TEventLocal<TEvRunWorkers, EvRunWorkers> {
};

struct TEvResolveSecretResult: public TEventLocal<TEvResolveSecretResult, EvResolveSecretResult> {
const ui64 ReplicationId;
const TString SecretValue;
const bool Success;
const TString Error;

explicit TEvResolveSecretResult(ui64 rid, const TString& secretValue);
explicit TEvResolveSecretResult(ui64 rid, bool success, const TString& error);
TString ToString() const override;

bool IsSuccess() const;
};

}; // TEvPrivate

}
32 changes: 31 additions & 1 deletion ydb/core/tx/replication/controller/replication.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "private_events.h"
#include "replication.h"
#include "secret_resolver.h"
#include "target_discoverer.h"
#include "target_table.h"
#include "tenant_resolver.h"
Expand All @@ -18,6 +19,14 @@ namespace NKikimr::NReplication::NController {
class TReplication::TImpl {
friend class TReplication;

void ResolveSecret(const TString& secretName, const TActorContext& ctx) {
if (SecretResolver) {
return;
}

SecretResolver = ctx.Register(CreateSecretResolver(ctx.SelfID, ReplicationId, PathId, secretName));
}

template <typename... Args>
ITarget* CreateTarget(ui64 id, ETargetKind kind, Args&&... args) const {
switch (kind) {
Expand Down Expand Up @@ -95,9 +104,15 @@ class TReplication::TImpl {

switch (params.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
if (!params.GetStaticCredentials().HasPassword()) {
return ResolveSecret(params.GetStaticCredentials().GetPasswordSecretName(), ctx);
}
ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetStaticCredentials()));
break;
case NKikimrReplication::TConnectionParams::kOAuthToken:
if (!params.GetOAuthToken().HasToken()) {
return ResolveSecret(params.GetOAuthToken().GetTokenSecretName(), ctx);
}
ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetOAuthToken().GetToken()));
break;
default:
Expand Down Expand Up @@ -139,7 +154,7 @@ class TReplication::TImpl {
target->Shutdown(ctx);
}

for (auto* x : TVector<TActorId*>{&TargetDiscoverer, &TenantResolver, &YdbProxy}) {
for (auto* x : TVector<TActorId*>{&SecretResolver, &TargetDiscoverer, &TenantResolver, &YdbProxy}) {
if (auto actorId = std::exchange(*x, {})) {
ctx.Send(actorId, new TEvents::TEvPoison());
}
Expand All @@ -165,6 +180,7 @@ class TReplication::TImpl {
TString Issue;
ui64 NextTargetId = 1;
THashMap<ui64, THolder<ITarget>> Targets;
TActorId SecretResolver;
TActorId YdbProxy;
TActorId TenantResolver;
TActorId TargetDiscoverer;
Expand Down Expand Up @@ -261,6 +277,20 @@ ui64 TReplication::GetNextTargetId() const {
return Impl->NextTargetId;
}

void TReplication::UpdateSecret(const TString& secretValue) {
auto& params = *Impl->Config.MutableSrcConnectionParams();
switch (params.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
params.MutableStaticCredentials()->SetPassword(secretValue);
break;
case NKikimrReplication::TConnectionParams::kOAuthToken:
params.MutableOAuthToken()->SetToken(secretValue);
break;
default:
Y_ABORT("unreachable");
}
}

void TReplication::SetTenant(const TString& value) {
Impl->Tenant = value;
Impl->TenantResolver = {};
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/controller/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
void SetNextTargetId(ui64 value);
ui64 GetNextTargetId() const;

void UpdateSecret(const TString& secretValue);

void SetTenant(const TString& value);
const TString& GetTenant() const;

Expand Down
120 changes: 120 additions & 0 deletions ydb/core/tx/replication/controller/secret_resolver.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "logging.h"
#include "private_events.h"
#include "secret_resolver.h"

#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/services/metadata/secret/fetcher.h>
#include <ydb/services/metadata/secret/secret.h>
#include <ydb/services/metadata/secret/snapshot.h>
#include <ydb/services/metadata/service.h>

namespace NKikimr::NReplication::NController {

class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
static NMetadata::NFetcher::ISnapshotsFetcher::TPtr SnapshotFetcher() {
return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>();
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto* response = ev->Get()->Request.Get();

Y_ABORT_UNLESS(response->ResultSet.size() == 1);
const auto& entry = response->ResultSet.front();

LOG_T("Handle " << ev->Get()->ToString()
<< ": entry# " << entry.ToString());

switch (entry.Status) {
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
break;
default:
LOG_W("Unexpected status"
<< ": entry# " << entry.ToString());
return Schedule(RetryInterval, new TEvents::TEvWakeup);
}

if (!entry.SecurityObject) {
return Reply(false, "Empty security object");
}

SecretId = NMetadata::NSecret::TSecretId(entry.SecurityObject->GetOwnerSID(), SecretName);
Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()),
new NMetadata::NProvider::TEvAskSnapshot(SnapshotFetcher()));
}

void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
const auto* snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>();

TString secretValue;
if (!snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(SecretId), secretValue)) {
return Reply(false, TStringBuilder() << "Secret '" << SecretName << "' not found");
}

Reply(secretValue);
}

template <typename... Args>
void Reply(Args&&... args) {
Send(Parent, new TEvPrivate::TEvResolveSecretResult(ReplicationId, std::forward<Args>(args)...));
PassAway();
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_SECRET_RESOLVER;
}

explicit TSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName)
: Parent(parent)
, ReplicationId(rid)
, PathId(pathId)
, SecretName(secretName)
, LogPrefix("SecretResolver", ReplicationId)
{
}

void Bootstrap() {
if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
return Reply(false, "Metadata service is not active");
}

auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();

auto& entry = request->ResultSet.emplace_back();
entry.TableId = PathId;
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
entry.RedirectRequired = false;

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
Become(&TThis::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
sFunc(TEvents::TEvWakeup, Bootstrap);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
const TActorId Parent;
const ui64 ReplicationId;
const TPathId PathId;
const TString SecretName;
const TActorLogPrefix LogPrefix;

static constexpr auto RetryInterval = TDuration::Seconds(1);
NMetadata::NSecret::TSecretId SecretId;

}; // TSecretResolver

IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName) {
return new TSecretResolver(parent, rid, pathId, secretName);
}

}
9 changes: 9 additions & 0 deletions ydb/core/tx/replication/controller/secret_resolver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/core/base/defs.h>

namespace NKikimr::NReplication::NController {

IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName);

}
Loading

0 comments on commit 18ea57a

Please sign in to comment.