Skip to content

aeon: add connection from the etcd/tcs #1108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* `--path` - to show module executables.

- `tt aeon connect` added tests for connect file/app.
- `tt aeon connect`: add connection from the etcd/tcs config.
- `tt pack `: support `.packignore` file to specify files that should not be included
in package (works the same as `.gitignore`).
- `tt tcm start`: add the tcm command.
Expand All @@ -24,6 +25,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Changed

- The following functions were moved from `cluster/cmd` to `lib/cluster`:
* CreateCollector → lib/cluster/cluster.go,
* ConnectEtcdUriOpts → lib/cluster/etcd.go,
* DoOnStorage → lib/cluster/etcd.go,
* MakeEtcdOptsFromUriOpts → lib/cluster/etcd.go,
* MakeConnectOptsFromUriOpts → lib/cluster/tarantool.go,
* ConnectTarantool → lib/cluster/tarantool.go.
- Added new submodule `lib/connect`.

### Fixed

- Arguments of an internal command are not parsed if it is forced over its existent
Expand Down
4 changes: 4 additions & 0 deletions cli/aeon/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type Ssl struct {

// ConnectCtx keeps context information for aeon connection.
type ConnectCtx struct {
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Password string
// Ssl group of paths to ssl key files.
Ssl Ssl
// Transport is a connection mode.
Expand Down
81 changes: 81 additions & 0 deletions cli/aeon/cmd/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cmd

import (
"errors"
"fmt"

"github.com/mitchellh/mapstructure"
"github.com/tarantool/tt/cli/util"
libcluster "github.com/tarantool/tt/lib/cluster"
libconnect "github.com/tarantool/tt/lib/connect"
)

// FillConnectCtx takes a ConnectCtx object and fills it with data from a
// collected configuration by given instanceName and libconnect.UriOpts.
// It returns an error if fails to collect a configuration,
// instantiate a cluster config or find an instance in the cluster.
func FillConnectCtx(connectCtx *ConnectCtx, uriOpts libconnect.UriOpts,
instanceName string, collectors libcluster.CollectorFactory) error {

connOpts := libcluster.ConnectOpts{
Username: connectCtx.Username,
Password: connectCtx.Password,
}
collector, cancel, err := libcluster.CreateCollector(collectors,
connOpts, uriOpts)
if err != nil {
return err
}
defer cancel()

config, err := collector.Collect()
if err != nil {
return fmt.Errorf("failed to collect a configuration: %w", err)
}

clusterConfig, err := libcluster.MakeClusterConfig(config)
if err != nil {
return err
}

result := libcluster.Instantiate(clusterConfig, instanceName)

dataSsl := []string{"roles_cfg", "aeon.grpc", "advertise"}
data, err := result.Get(dataSsl)
if err != nil {
return err
}

var advertise Advertise
err = mapstructure.Decode(data, &advertise)
if err != nil {
return err
}

if advertise.Uri == "" {
return errors.New("invalid connection url")
}

cleanedURL, err := util.RemoveScheme(advertise.Uri)
if err != nil {
return err
}

connectCtx.Network, connectCtx.Address = libconnect.ParseBaseURI(cleanedURL)

if (advertise.Params.Transport != "ssl") && (advertise.Params.Transport != "plain") {
return errors.New("transport must be ssl or plain")
}

if advertise.Params.Transport == "ssl" {
connectCtx.Transport = TransportSsl

connectCtx.Ssl = Ssl{
KeyFile: advertise.Params.KeyFile,
CertFile: advertise.Params.CertFile,
CaFile: advertise.Params.CaFile,
}
}

return nil
}
85 changes: 4 additions & 81 deletions cli/cluster/cmd/common.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package cmd

import (
"context"
"errors"
"fmt"
"os"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/tt/cli/cluster"
libcluster "github.com/tarantool/tt/lib/cluster"
"github.com/tarantool/tt/lib/connect"
libconnect "github.com/tarantool/tt/lib/connect"
)

// printRawClusterConfig prints a raw cluster configuration or an instance
Expand Down Expand Up @@ -137,87 +135,12 @@ func printConfig(config *libcluster.Config) {
fmt.Print(config.String())
}

// connectOpts is additional connect options specified by a user.
type connectOpts struct {
Username string
Password string
}

// connectTarantool establishes a connection to Tarantool.
func connectTarantool(uriOpts connect.UriOpts, connOpts connectOpts) (tarantool.Connector, error) {
if uriOpts.Username == "" && uriOpts.Password == "" {
uriOpts.Username = connOpts.Username
uriOpts.Password = connOpts.Password
if uriOpts.Username == "" {
uriOpts.Username = os.Getenv(connect.TarantoolUsernameEnv)
}
if uriOpts.Password == "" {
uriOpts.Password = os.Getenv(connect.TarantoolPasswordEnv)
}
}

dialer, connectorOpts, err := MakeConnectOptsFromUriOpts(uriOpts)
if err != nil {
return nil, err
}

ctx := context.Background()
if connectorOpts.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, connectorOpts.Timeout)
defer cancel()
}
conn, err := tarantool.Connect(ctx, dialer, connectorOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to tarantool: %w", err)
}
return conn, nil
}

// connectEtcd establishes a connection to etcd.
func connectEtcd(uriOpts connect.UriOpts, connOpts connectOpts) (*clientv3.Client, error) {
etcdOpts := MakeEtcdOptsFromUriOpts(uriOpts)
if etcdOpts.Username == "" && etcdOpts.Password == "" {
etcdOpts.Username = connOpts.Username
etcdOpts.Password = connOpts.Password
if etcdOpts.Username == "" {
etcdOpts.Username = os.Getenv(connect.EtcdUsernameEnv)
}
if etcdOpts.Password == "" {
etcdOpts.Password = os.Getenv(connect.EtcdPasswordEnv)
}
}

etcdcli, err := libcluster.ConnectEtcd(etcdOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %w", err)
}
return etcdcli, nil
}

// doOnStorage determines a storage based on the opts.
func doOnStorage(connOpts connectOpts, opts connect.UriOpts,
tarantoolFunc func(tarantool.Connector) error, etcdFunc func(*clientv3.Client) error) error {
etcdcli, errEtcd := connectEtcd(opts, connOpts)
if errEtcd == nil {
return etcdFunc(etcdcli)
}

conn, errTarantool := connectTarantool(opts, connOpts)
if errTarantool == nil {
return tarantoolFunc(conn)
}

return fmt.Errorf("failed to establish a connection to tarantool or etcd: %w, %w",
errTarantool, errEtcd)
}

// createPublisherAndCollector creates a new data publisher and collector based on UriOpts.
func createPublisherAndCollector(
publishers libcluster.DataPublisherFactory,
collectors libcluster.CollectorFactory,
connOpts connectOpts,
opts connect.UriOpts) (libcluster.DataPublisher, libcluster.Collector, func(), error) {
connOpts libcluster.ConnectOpts,
opts libconnect.UriOpts) (libcluster.DataPublisher, libcluster.Collector, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Params["key"], opts.Timeout

var (
Expand Down Expand Up @@ -265,7 +188,7 @@ func createPublisherAndCollector(
return nil
}

if err := doOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
if err := libcluster.DoOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
return nil, nil, nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion cli/cluster/cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func PublishUri(publishCtx PublishCtx, opts connect.UriOpts) error {
return err
}

connOpts := connectOpts{
connOpts := libcluster.ConnectOpts{
Username: publishCtx.Username,
Password: publishCtx.Password,
}
Expand Down
12 changes: 6 additions & 6 deletions cli/cluster/cmd/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func pickPatchKey(keys []string, force bool, pathMsg string) (int, error) {
func createDataCollectorAndKeyPublisher(
collectors libcluster.DataCollectorFactory,
publishers libcluster.DataPublisherFactory,
opts connect.UriOpts, connOpts connectOpts) (
opts connect.UriOpts, connOpts libcluster.ConnectOpts) (
libcluster.DataCollector, replicaset.DataPublisher, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Params["key"], opts.Timeout
var (
Expand Down Expand Up @@ -145,7 +145,7 @@ func createDataCollectorAndKeyPublisher(
return nil
}

if err := doOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
if err := libcluster.DoOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
return nil, nil, nil, err
}

Expand All @@ -158,7 +158,7 @@ func Promote(url string, ctx PromoteCtx) error {
if err != nil {
return fmt.Errorf("invalid URL %q: %w", url, err)
}
connOpts := connectOpts{
connOpts := libcluster.ConnectOpts{
Username: ctx.Username,
Password: ctx.Password,
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func Demote(url string, ctx DemoteCtx) error {
if err != nil {
return fmt.Errorf("invalid URL %q: %w", url, err)
}
connOpts := connectOpts{
connOpts := libcluster.ConnectOpts{
Username: ctx.Username,
Password: ctx.Password,
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func Expel(url string, ctx ExpelCtx) error {
if err != nil {
return fmt.Errorf("invalid URL %q: %w", url, err)
}
connOpts := connectOpts{
connOpts := libcluster.ConnectOpts{
Username: ctx.Username,
Password: ctx.Password,
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func ChangeRole(url string, ctx RolesChangeCtx, action replicaset.RolesChangerAc
if err != nil {
return fmt.Errorf("invalid URL %q: %w", url, err)
}
connOpts := connectOpts{
connOpts := libcluster.ConnectOpts{
Username: ctx.Username,
Password: ctx.Password,
}
Expand Down
2 changes: 1 addition & 1 deletion cli/cluster/cmd/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ShowCtx struct {

// ShowUri shows a configuration from URI.
func ShowUri(showCtx ShowCtx, opts connect.UriOpts) error {
connOpts := connectOpts{
connOpts := libcluster.ConnectOpts{
Username: showCtx.Username,
Password: showCtx.Password,
}
Expand Down
Loading