Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 48cde7c

Browse files
Add batching for traces
Batching will help to achieve better ingest performance, especially if traces are sent one by one (which is the case for Jaeger collector). Batch is flushed either on timeout or when full. Adds async support for traces meaning that client doesn't need to wait for DB write. This increases ingest performance with a small risk of data loss. New CLI flag `tracing.async-acks` added. Flags to control batch size: `tracing.max-batch-size` and `tracing.batch-timeout`. Flags to control batch workers: `tracing.batch-workers`
1 parent 51b6326 commit 48cde7c

28 files changed

+808
-195
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ We use the following categories for changes:
1717

1818
### Added
1919
- Implement Jaeger gRPC remote storage writer interface [#1543]
20+
- Batching for traces to improve ingest performance along with CLI flags for better control [#1554]
2021
- Helm chart now ships a JSON Schema for imposing a structure of the values.yaml file [#1551]
2122

2223
### Changed

docs/configuration.md

+16-7
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,22 @@ The following subsections cover all CLI flags which promscale supports. You can
4242

4343
### General flags
4444

45-
| Flag | Type | Default | Description |
46-
|---------------------------------|:------------------------------:|:-------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
47-
| cache.memory-target | unsigned-integer or percentage | 80% | Target for max amount of memory to use. Specified in bytes or as a percentage of system memory (e.g. 80%). |
48-
| config | string | config.yml | YAML configuration file path for Promscale. |
49-
| enable-feature | string | "" | Enable one or more experimental promscale features (as a comma-separated list). Current experimental features are `promql-at-modifier`, `promql-negative-offset` and `promql-per-step-stats`. For more information, please consult the following resources: [promql-at-modifier](https://prometheus.io/docs/prometheus/latest/feature_flags/#modifier-in-promql), [promql-negative-offset](https://prometheus.io/docs/prometheus/latest/feature_flags/#negative-offset-in-promql), [promql-per-step-stats](https://prometheus.io/docs/prometheus/latest/feature_flags/#per-step-stats). |
50-
| thanos.store-api.server-address | string | "" (disabled) | Address to listen on for Thanos Store API endpoints. |
51-
| tracing.otlp.server-address | string | ":9202" | Address to listen on for OpenTelemetry OTLP GRPC server. |
45+
| Flag | Type | Default | Description |
46+
|---------------------------------------------------------------------------------------------------|:------------------------------:|:-------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
47+
| cache.memory-target | unsigned-integer or percentage | 80% | Target for max amount of memory to use. Specified in bytes or as a percentage of system memory (e.g. 80%). |
48+
| config | string | config.yml | YAML configuration file path for Promscale. |
49+
| enable-feature | string | "" | Enable one or more experimental promscale features (as a comma-separated list). Current experimental features are `promql-at-modifier`, `promql-negative-offset` and `promql-per-step-stats`. For more information, please consult the following resources: [promql-at-modifier](https://prometheus.io/docs/prometheus/latest/feature_flags/#modifier-in-promql), [promql-negative-offset](https://prometheus.io/docs/prometheus/latest/feature_flags/#negative-offset-in-promql), [promql-per-step-stats](https://prometheus.io/docs/prometheus/latest/feature_flags/#per-step-stats). |
50+
| thanos.store-api.server-address | string | "" (disabled) | Address to listen on for Thanos Store API endpoints. |
51+
| tracing.otlp.server-address | string | ":9202" | Address |
52+
| to listen on for OpenTelemetry OTLP GRPC server. | | | |
53+
| | | | |
54+
| tracing.async-acks | boolean | false | |
55+
| Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of | | | |
56+
| traces data in the database. This increases throughput at the cost of a small chance of data loss | | | |
57+
| tracing.max-batch-size | integer | 5000 | Maximum size of trace batch that is written to DB |
58+
| tracing.batch-timeout | duration | 250ms | Timeout after new trace batch is created |
59+
60+
| tracing.batch-workers | integer | num of available cpus | Number of workers responsible for creating trace batches. Defaults to number of CPUs.
5261

5362
### Auth flags
5463

pkg/jaeger/store/store.go

+11
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func (p *Store) SpanWriter() spanstore.Writer {
4444
return p
4545
}
4646

47+
func (p *Store) StreamingSpanWriter() spanstore.Writer {
48+
return p
49+
}
50+
4751
func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
4852
batches := []*model.Batch{
4953
{
@@ -57,6 +61,13 @@ func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
5761
return p.inserter.IngestTraces(ctx, traces)
5862
}
5963

64+
// Close performs graceful shutdown of SpanWriter on Jaeger collector shutdown.
65+
// In our case we have nothing to do
66+
// Noop impl avoid getting GRPC error message when Jaeger collector shuts down.
67+
func (p *Store) Close() error {
68+
return nil
69+
}
70+
6071
func (p *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
6172
code := "5xx"
6273
start := time.Now()

pkg/pgclient/client.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,12 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
199199
c := ingestor.Cfg{
200200
NumCopiers: numCopiers,
201201
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
202-
AsyncAcks: cfg.AsyncAcks,
202+
MetricsAsyncAcks: cfg.MetricsAsyncAcks,
203+
TracesAsyncAcks: cfg.TracesAsyncAcks,
203204
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
205+
TracesBatchTimeout: cfg.TracesBatchTimeout,
206+
TracesMaxBatchSize: cfg.TracesMaxBatchSize,
207+
TracesBatchWorkers: cfg.TracesBatchWorkers,
204208
}
205209

206210
var writerConn pgxconn.PgxConn

pkg/pgclient/config.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/jackc/pgx/v4"
1717
"github.com/timescale/promscale/pkg/limits"
1818
"github.com/timescale/promscale/pkg/pgmodel/cache"
19+
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
1920
"github.com/timescale/promscale/pkg/version"
2021
)
2122

@@ -31,7 +32,8 @@ type Config struct {
3132
SslMode string
3233
DbConnectionTimeout time.Duration
3334
IgnoreCompressedChunks bool
34-
AsyncAcks bool
35+
MetricsAsyncAcks bool
36+
TracesAsyncAcks bool
3537
WriteConnections int
3638
WriterPoolSize int
3739
WriterSynchronousCommit bool
@@ -40,6 +42,9 @@ type Config struct {
4042
UsesHA bool
4143
DbUri string
4244
EnableStatementsCache bool
45+
TracesBatchTimeout time.Duration
46+
TracesMaxBatchSize int
47+
TracesBatchWorkers int
4348
}
4449

4550
const (
@@ -79,7 +84,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
7984
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
8085
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
8186
"However, setting this to true will save your resources that may be required during decompression. ")
82-
fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics to database. "+
87+
fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics/traces to database. "+
8388
"By default, this will be set based on the number of CPUs available to the DB Promscale is connected to.")
8489
fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+
8590
"allowed by the database.")
@@ -92,7 +97,11 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
9297
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
9398
fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
9499
"Disable if using PgBouncer")
95-
fs.BoolVar(&cfg.AsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
100+
fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
101+
fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
102+
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
103+
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
104+
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
96105
return cfg
97106
}
98107

0 commit comments

Comments
 (0)