Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging: reading from multiple databases concurrently #226

Merged
merged 15 commits into from
Dec 27, 2024
83 changes: 44 additions & 39 deletions api/logging/v1/log_group_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/logging/v1/log_group_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 34 additions & 6 deletions app/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,30 @@
return fmt.Errorf("validate `ydb`: %w", err)
}

if staticConfig := c.GetStatic(); staticConfig != nil {
if err := validateLoggingResolvingStaticConfig(staticConfig); err != nil {
return fmt.Errorf("validate `static`: %w", err)
}
} else {
return fmt.Errorf("missing `static` section")
if c.GetStatic() == nil && c.GetDynamic() == nil {
return fmt.Errorf("you should set either `static` or `dynamic` section")
}

Check warning on line 389 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L388-L389

Added lines #L388 - L389 were not covered by tests

if c.GetStatic() != nil && c.GetDynamic() != nil {
return fmt.Errorf("you should set either `static` or `dynamic` section, not both of them")
}

Check warning on line 393 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L392-L393

Added lines #L392 - L393 were not covered by tests

if err := validateLoggingResolvingStaticConfig(c.GetStatic()); err != nil {
return fmt.Errorf("validate `static`: %w", err)
}

Check warning on line 397 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L396-L397

Added lines #L396 - L397 were not covered by tests

if err := validateLoggingResolvingDynamicConfig(c.GetDynamic()); err != nil {
return fmt.Errorf("validate `dynamic`: %w", err)

Check warning on line 400 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L400

Added line #L400 was not covered by tests
}

return nil
}

func validateLoggingResolvingStaticConfig(c *config.TLoggingConfig_TStaticResolving) error {
if c == nil {
return nil
}

Check warning on line 409 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L408-L409

Added lines #L408 - L409 were not covered by tests

if len(c.Databases) == 0 {
// it's kind of OK to have empty list of databases
return nil
Expand Down Expand Up @@ -429,6 +441,22 @@
return nil
}

func validateLoggingResolvingDynamicConfig(c *config.TLoggingConfig_TDynamicResolving) error {
if c == nil {
return nil
}

if c.LoggingEndpoint.Host == "" {
return fmt.Errorf("missing `logging_endpoint.host`")
}

Check warning on line 451 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L449-L451

Added lines #L449 - L451 were not covered by tests

if c.LoggingEndpoint.Port == 0 {
return fmt.Errorf("missing `logging_endpoint.port`")
}

Check warning on line 455 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L453-L455

Added lines #L453 - L455 were not covered by tests

return nil

Check warning on line 457 in app/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

app/server/config/config.go#L457

Added line #L457 was not covered by tests
}

func validateExponentialBackoff(c *config.TExponentialBackoffConfig) error {
if c == nil {
return fmt.Errorf("required section is missing")
Expand Down
18 changes: 8 additions & 10 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (dsc *DataSourceCollection) DoReadSplit(
}
}

func (dsc *DataSourceCollection) Close() error {
return dsc.rdbms.Close()
}

func readSplit[T paging.Acceptor](
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
Expand All @@ -101,34 +105,28 @@ func readSplit[T paging.Acceptor](
return fmt.Errorf("new columnar buffer factory: %w", err)
}

trafficTracker := paging.NewTrafficTracker[T](cfg.Paging)

sink, err := paging.NewSink(
sinkFactory := paging.NewSinkFactory[T](
stream.Context(),
logger,
trafficTracker,
cfg.Paging,
columnarBufferFactory,
readLimiterFactory.MakeReadLimiter(logger),
int(cfg.Paging.PrefetchQueueCapacity),
)
if err != nil {
return fmt.Errorf("new sink: %w", err)
}

streamer := streaming.NewStreamer(
logger,
stream,
request,
split,
sink,
sinkFactory,
dataSource,
)

if err := streamer.Run(); err != nil {
return fmt.Errorf("run paging streamer: %w", err)
}

readStats := trafficTracker.DumpStats(true)
readStats := sinkFactory.FinalStats()

logger.Debug(
"split reading finished",
Expand Down
5 changes: 3 additions & 2 deletions app/server/datasource/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Factory[T paging.Acceptor] interface {
logger *zap.Logger,
dataSourceType api_common.EGenericDataSourceKind,
) (DataSource[T], error)
Close() error
}

// DataSource is an abstraction over external data storage that is available for data and metadata extraction.
Expand All @@ -37,8 +38,8 @@ type DataSource[T paging.Acceptor] interface {
logger *zap.Logger,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sink paging.Sink[T],
)
sinkFactory paging.SinkFactory[T],
) error
}

type TypeMapper interface {
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
_ *zap.Logger,
_ *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
pagingWriter paging.Sink[T],
) {
m.Called(split, pagingWriter)
sinkFactory paging.SinkFactory[T],
) error {
return m.Called(split, sinkFactory).Error(0)

Check warning on line 35 in app/server/datasource/mock.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/mock.go#L34-L35

Added lines #L34 - L35 were not covered by tests
}
11 changes: 9 additions & 2 deletions app/server/datasource/rdbms/clickhouse/connection_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

type connectionHTTP struct {
*sql.DB
logger common.QueryLogger
logger common.QueryLogger
databaseName string
tableName string
}

var _ rdbms_utils.Rows = (*rows)(nil)
Expand Down Expand Up @@ -70,11 +72,16 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Row
return &rows{Rows: out}, nil
}

func (c *connectionHTTP) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

func makeConnectionHTTP(
ctx context.Context,
logger *zap.Logger,
cfg *config.TClickHouseConfig,
dsi *api_common.TGenericDataSourceInstance,
tableName string,
queryLogger common.QueryLogger,
) (rdbms_utils.Connection, error) {
opts := &clickhouse.Options{
Expand Down Expand Up @@ -122,5 +129,5 @@ func makeConnectionHTTP(
conn.SetMaxOpenConns(maxOpenConns)
conn.SetConnMaxLifetime(connMaxLifetime)

return &connectionHTTP{DB: conn, logger: queryLogger}, nil
return &connectionHTTP{DB: conn, logger: queryLogger, databaseName: dsi.Database, tableName: tableName}, nil
}
35 changes: 24 additions & 11 deletions app/server/datasource/rdbms/clickhouse/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,39 @@
}

func (c *connectionManager) Make(
params *rdbms_utils.ConnectionParamsMakeParams,
) (rdbms_utils.Connection, error) {
dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger

if dsi.GetCredentials().GetBasic() == nil {
params *rdbms_utils.ConnectionManagerMakeParams,
) ([]rdbms_utils.Connection, error) {
if params.DataSourceInstance.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}

switch dsi.Protocol {
var (
conn rdbms_utils.Connection
err error
)

switch params.DataSourceInstance.Protocol {
case api_common.EGenericProtocol_NATIVE:
return makeConnectionNative(ctx, logger, c.cfg, dsi, c.QueryLoggerFactory.Make(logger))
conn, err = makeConnectionNative(
params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger))
case api_common.EGenericProtocol_HTTP:
return makeConnectionHTTP(ctx, logger, c.cfg, dsi, c.QueryLoggerFactory.Make(logger))
conn, err = makeConnectionHTTP(
params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger))
default:
return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", dsi.Protocol)
return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", params.DataSourceInstance.Protocol)

Check warning on line 42 in app/server/datasource/rdbms/clickhouse/connection_manager.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/clickhouse/connection_manager.go#L42

Added line #L42 was not covered by tests
}

if err != nil {
return nil, fmt.Errorf("make connection: %w", err)
}

return []rdbms_utils.Connection{conn}, nil
}

func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) {
common.LogCloserError(logger, conn, "close clickhouse connection")
func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) {
for _, conn := range cs {
common.LogCloserError(logger, conn, "close clickhouse connection")
}
}

func NewConnectionManager(
Expand Down
11 changes: 9 additions & 2 deletions app/server/datasource/rdbms/clickhouse/connection_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ var _ rdbms_utils.Connection = (*connectionNative)(nil)

type connectionNative struct {
driver.Conn
logger common.QueryLogger
logger common.QueryLogger
databaseName string
tableName string
}

var _ rdbms_utils.Rows = (*rowsNative)(nil)
Expand Down Expand Up @@ -72,11 +74,16 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.R
return &rowsNative{Rows: out}, nil
}

func (c *connectionNative) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

func makeConnectionNative(
ctx context.Context,
logger *zap.Logger,
cfg *config.TClickHouseConfig,
dsi *api_common.TGenericDataSourceInstance,
tableName string,
queryLogger common.QueryLogger,
) (rdbms_utils.Connection, error) {
opts := &clickhouse.Options{
Expand Down Expand Up @@ -117,5 +124,5 @@ func makeConnectionNative(
return nil, fmt.Errorf("conn ping: %w", err)
}

return &connectionNative{Conn: conn, logger: queryLogger}, nil
return &connectionNative{Conn: conn, logger: queryLogger, databaseName: dsi.Database, tableName: tableName}, nil
}
4 changes: 2 additions & 2 deletions app/server/datasource/rdbms/clickhouse/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return sanitizedIdent
}

func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) {
return f.SanitiseIdentifier(params.TableName), nil
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

func NewSQLFormatter() rdbms_utils.SQLFormatter {
Expand Down
Loading
Loading