Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Remove distinction between primary and archive storage in jaeger-v1 #6567

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
befc6bd
Change ES Options To Only Hold One Namespace
mahadzaryab1 Jan 17, 2025
ab1180b
Add Configuration Fields For Suffixes
mahadzaryab1 Jan 17, 2025
e1d6f7d
Adjust ES Factory To Hold One Config
mahadzaryab1 Jan 17, 2025
5172646
Change Meta Factory To Initialize Archive Separately
mahadzaryab1 Jan 18, 2025
820b360
Phase Out Distinction In Cassandra Factory
mahadzaryab1 Jan 18, 2025
6dcd077
Remove Comment
mahadzaryab1 Jan 18, 2025
2dc7d19
Remove Archive Factory From Blackhole
mahadzaryab1 Jan 18, 2025
00c105c
Remove Archive Factory From Memory
mahadzaryab1 Jan 18, 2025
b57e488
Remove Archive Helper From Adapter
mahadzaryab1 Jan 18, 2025
e90efe7
Remove Concept of Archive From GRPC Storage
mahadzaryab1 Jan 18, 2025
1780ea5
Remove Usages of InitArchiveStorage
mahadzaryab1 Jan 18, 2025
1468260
Remove Archive Factory
mahadzaryab1 Jan 18, 2025
43b7604
Fix Cassandra Integration Test
mahadzaryab1 Jan 18, 2025
7c4c43e
Fix ES Integration Test
mahadzaryab1 Jan 18, 2025
0b2280a
Fix GRPC Integration Tests
mahadzaryab1 Jan 18, 2025
5a06df6
Fix Typo
mahadzaryab1 Jan 18, 2025
6936344
Fix Integration Test Cleanup
mahadzaryab1 Jan 18, 2025
9b8571d
Move Init Function To Factory Struct
mahadzaryab1 Jan 18, 2025
8fc4680
Refactor Cassandra Configuration
mahadzaryab1 Jan 18, 2025
71273b5
Refactor ES Configuration
mahadzaryab1 Jan 18, 2025
03a0768
Simplify Construction of Factory
mahadzaryab1 Jan 18, 2025
2195e1d
Remove Unused Archive Fields
mahadzaryab1 Jan 18, 2025
42c5751
Fix GRPC Handler And Client Tests
mahadzaryab1 Jan 19, 2025
0e4f7f7
Fix ES Unit Tests
mahadzaryab1 Jan 19, 2025
a14a649
Fix Cassandra Tests
mahadzaryab1 Jan 19, 2025
bac94c8
Fix Blackhole Test
mahadzaryab1 Jan 19, 2025
f049b5a
Fix Meta Factory Tests
mahadzaryab1 Jan 19, 2025
efb4353
Fix GRPC Factory Tests
mahadzaryab1 Jan 19, 2025
3a271cb
Remove Archive Mocks
mahadzaryab1 Jan 19, 2025
2120635
Run Formatter
mahadzaryab1 Jan 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
}
if conf.IsSet("cassandra") {
cfg.Cassandra = &cassandra.Options{
Primary: cassandra.NamespaceConfig{
NamespaceConfig: cassandra.NamespaceConfig{

Check warning on line 77 in cmd/jaeger/internal/extension/jaegerstorage/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/config.go#L77

Added line #L77 was not covered by tests
Configuration: casCfg.DefaultConfiguration(),
Enabled: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ backends:
`)
cfg := createDefaultConfig().(*Config)
require.NoError(t, conf.Unmarshal(cfg))
assert.NotEmpty(t, cfg.TraceBackends["some_storage"].Cassandra.Primary.Connection.Servers)
assert.NotEmpty(t, cfg.TraceBackends["some_storage"].Cassandra.Configuration.Connection.Servers)
}

func TestConfigDefaultElasticsearch(t *testing.T) {
Expand Down
18 changes: 11 additions & 7 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down Expand Up @@ -139,9 +139,13 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
}

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseFactory, logger *zap.Logger) *querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory *storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
ar, aw := storageFactory.InitArchiveStorage(logger)
if ar != nil && aw != nil {
opts.ArchiveSpanReader = ar
opts.ArchiveSpanWriter = aw
} else {
logger.Info("Archive storage not initialized")
}

Expand All @@ -150,13 +154,13 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseF
return opts
}

func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(storageFactory storage.BaseFactory, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(storageFactory *storage.Factory, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
opts := &v2querysvc.QueryServiceOptions{}

ar, aw := v1adapter.InitializeArchiveStorage(storageFactory, logger)
ar, aw := storageFactory.InitArchiveStorage(logger)
if ar != nil && aw != nil {
opts.ArchiveTraceReader = ar
opts.ArchiveTraceWriter = aw
opts.ArchiveTraceReader = v1adapter.NewTraceReader(ar)
opts.ArchiveTraceWriter = v1adapter.NewTraceWriter(aw)
} else {
logger.Info("Archive storage not initialized")
}
Expand Down
33 changes: 0 additions & 33 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (
"errors"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
Expand Down Expand Up @@ -164,36 +161,6 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {
}
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
return false
}
reader, err := archiveFactory.CreateArchiveSpanReader()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return false
}
writer, err := archiveFactory.CreateArchiveSpanWriter()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return false
}
opts.ArchiveSpanReader = reader
opts.ArchiveSpanWriter = writer
return true
}

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptions) hasArchiveStorage() bool {
return opts.ArchiveSpanReader != nil && opts.ArchiveSpanWriter != nil
Expand Down
52 changes: 1 addition & 51 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,64 +451,14 @@ func TestGetCapabilitiesWithSupportsArchive(t *testing.T) {

type fakeStorageFactory1 struct{}

type fakeStorageFactory2 struct {
fakeStorageFactory1
r spanstore.Reader
w spanstore.Writer
rErr error
wErr error
}

func (*fakeStorageFactory1) Initialize(metrics.Factory, *zap.Logger) error {
return nil
}
func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error) { return nil, nil }
func (*fakeStorageFactory1) CreateSpanWriter() (spanstore.Writer, error) { return nil, nil }
func (*fakeStorageFactory1) CreateDependencyReader() (dependencystore.Reader, error) { return nil, nil }

func (f *fakeStorageFactory2) CreateArchiveSpanReader() (spanstore.Reader, error) { return f.r, f.rErr }
func (f *fakeStorageFactory2) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr }

var (
_ storage.Factory = new(fakeStorageFactory1)
_ storage.ArchiveFactory = new(fakeStorageFactory2)
)

func TestInitArchiveStorageErrors(t *testing.T) {
opts := &QueryServiceOptions{}
logger := zap.NewNop()

assert.False(t, opts.InitArchiveStorage(new(fakeStorageFactory1), logger))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{rErr: storage.ErrArchiveStorageNotConfigured},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{rErr: errors.New("error")},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{wErr: storage.ErrArchiveStorageNotConfigured},
logger,
))
assert.False(t, opts.InitArchiveStorage(
&fakeStorageFactory2{wErr: errors.New("error")},
logger,
))
}

func TestInitArchiveStorage(t *testing.T) {
opts := &QueryServiceOptions{}
logger := zap.NewNop()
reader := &spanstoremocks.Reader{}
writer := &spanstoremocks.Writer{}
assert.True(t, opts.InitArchiveStorage(
&fakeStorageFactory2{r: reader, w: writer},
logger,
))
assert.Equal(t, reader, opts.ArchiveSpanReader)
assert.Equal(t, writer, opts.ArchiveSpanWriter)
}
var _ storage.Factory = new(fakeStorageFactory1)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/token_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func runQueryService(t *testing.T, esURL string) *Server {
}))
f.InitFromViper(v, flagsSvc.Logger)
// set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc
f.Options.Primary.Authentication.BearerTokenAuthentication.AllowFromContext = true
f.Options.Config.Authentication.BearerTokenAuthentication.AllowFromContext = true
require.NoError(t, f.Initialize(telset.Metrics, telset.Logger))
defer f.Close()

Expand Down
16 changes: 6 additions & 10 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -37,7 +36,7 @@ type Server struct {
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
func NewServer(options *Options, storageFactory *storage.Factory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, telset.Logger)
if err != nil {
return nil, err
Expand All @@ -55,7 +54,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy
}, nil
}

func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
func createGRPCHandler(f *storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
Expand All @@ -76,12 +75,9 @@ func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCH
StreamingSpanWriter: func() spanstore.Writer { return nil },
}

// borrow code from Query service for archive storage
qOpts := &querysvc.QueryServiceOptions{}
// when archive storage not initialized (returns false), the reader/writer will be nil
_ = qOpts.InitArchiveStorage(f, logger)
impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader }
impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter }
ar, aw := f.InitArchiveStorage(logger)
impl.ArchiveSpanReader = func() spanstore.Reader { return ar }
impl.ArchiveSpanWriter = func() spanstore.Writer { return aw }

handler := shared.NewGRPCHandler(impl)
return handler, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ type Configuration struct {
// UseReadWriteAliases, if set to true, will use read and write aliases for indices.
// Use this option with Elasticsearch rollover API. It requires an external component
// to create aliases before startup and then performing its management.
UseReadWriteAliases bool `mapstructure:"use_aliases"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
ReadAliasSuffix string `mapstructure:"-"`
WriteAliasSuffix string `mapstructure:"-"`
// CreateIndexTemplates, if set to true, creates index templates at application startup.
// This configuration should be set to false when templates are installed manually.
CreateIndexTemplates bool `mapstructure:"create_mappings"`
Expand Down
12 changes: 4 additions & 8 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ const (
)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)

// TODO badger could implement archive storage
// _ storage.ArchiveFactory = (*Factory)(nil)

_ storage.Factory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
)

Expand Down
16 changes: 2 additions & 14 deletions plugin/storage/blackhole/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
)
// interface comformance checks
var _ storage.Factory = (*Factory)(nil)

// Factory implements storage.Factory and creates blackhole storage components.
type Factory struct {
Expand Down Expand Up @@ -48,16 +46,6 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return f.store, nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return f.store, nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return f.store, nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
Expand Down
6 changes: 0 additions & 6 deletions plugin/storage/blackhole/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ func TestStorageFactory(t *testing.T) {
writer, err := f.CreateSpanWriter()
require.NoError(t, err)
assert.Equal(t, f.store, writer)
reader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.Equal(t, f.store, reader)
writer, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.Equal(t, f.store, writer)
depReader, err := f.CreateDependencyReader()
require.NoError(t, err)
assert.Equal(t, f.store, depReader)
Expand Down
Loading
Loading