@@ -53,15 +53,14 @@ type BucketStores interface {
5353
5454// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore.
5555type ThanosBucketStores struct {
56- logger log.Logger
57- cfg tsdb.BlocksStorageConfig
58- limits * validation.Overrides
59- bucket objstore.Bucket
60- logLevel logging.Level
61- bucketStoreMetrics * BucketStoreMetrics
62- cortexBucketStoreMetrics * CortexBucketStoreMetrics
63- metaFetcherMetrics * MetadataFetcherMetrics
64- shardingStrategy ShardingStrategy
56+ logger log.Logger
57+ cfg tsdb.BlocksStorageConfig
58+ limits * validation.Overrides
59+ bucket objstore.Bucket
60+ logLevel logging.Level
61+ bucketStoreMetrics * BucketStoreMetrics
62+ metaFetcherMetrics * MetadataFetcherMetrics
63+ shardingStrategy ShardingStrategy
6564
6665 // Index cache shared across all tenants.
6766 indexCache storecache.IndexCache
@@ -95,6 +94,12 @@ type ThanosBucketStores struct {
9594
9695 // Keeps number of inflight requests
9796 inflightRequests * util.InflightRequestTracker
97+
98+ // Metrics.
99+ syncTimes prometheus.Histogram
100+ syncLastSuccess prometheus.Gauge
101+ tenantsDiscovered prometheus.Gauge
102+ tenantsSynced prometheus.Gauge
98103}
99104
100105var ErrTooManyInflightRequests = status .Error (codes .ResourceExhausted , "too many inflight requests in store gateway" )
@@ -128,21 +133,37 @@ func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shardi
128133 }).Set (float64 (cfg .BucketStore .MaxConcurrent ))
129134
130135 u := & ThanosBucketStores {
131- logger : logger ,
132- cfg : cfg ,
133- limits : limits ,
134- bucket : cachingBucket ,
135- shardingStrategy : shardingStrategy ,
136- stores : map [string ]* store.BucketStore {},
137- storesErrors : map [string ]error {},
138- logLevel : logLevel ,
139- bucketStoreMetrics : NewBucketStoreMetrics (),
140- cortexBucketStoreMetrics : NewCortexBucketStoreMetrics (reg ),
141- metaFetcherMetrics : NewMetadataFetcherMetrics (),
142- queryGate : queryGate ,
143- partitioner : newGapBasedPartitioner (cfg .BucketStore .PartitionerMaxGapBytes , reg ),
144- userTokenBuckets : make (map [string ]* util.TokenBucket ),
145- inflightRequests : util .NewInflightRequestTracker (),
136+ logger : logger ,
137+ cfg : cfg ,
138+ limits : limits ,
139+ bucket : cachingBucket ,
140+ shardingStrategy : shardingStrategy ,
141+ stores : map [string ]* store.BucketStore {},
142+ storesErrors : map [string ]error {},
143+ logLevel : logLevel ,
144+ bucketStoreMetrics : NewBucketStoreMetrics (),
145+ metaFetcherMetrics : NewMetadataFetcherMetrics (),
146+ queryGate : queryGate ,
147+ partitioner : newGapBasedPartitioner (cfg .BucketStore .PartitionerMaxGapBytes , reg ),
148+ userTokenBuckets : make (map [string ]* util.TokenBucket ),
149+ inflightRequests : util .NewInflightRequestTracker (),
150+ syncTimes : promauto .With (reg ).NewHistogram (prometheus.HistogramOpts {
151+ Name : "cortex_bucket_stores_blocks_sync_seconds" ,
152+ Help : "The total time it takes to perform a sync stores" ,
153+ Buckets : []float64 {0.1 , 1 , 10 , 30 , 60 , 120 , 300 , 600 , 900 },
154+ }),
155+ syncLastSuccess : promauto .With (reg ).NewGauge (prometheus.GaugeOpts {
156+ Name : "cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds" ,
157+ Help : "Unix timestamp of the last successful blocks sync." ,
158+ }),
159+ tenantsDiscovered : promauto .With (reg ).NewGauge (prometheus.GaugeOpts {
160+ Name : "cortex_bucket_stores_tenants_discovered" ,
161+ Help : "Number of tenants discovered in the bucket." ,
162+ }),
163+ tenantsSynced : promauto .With (reg ).NewGauge (prometheus.GaugeOpts {
164+ Name : "cortex_bucket_stores_tenants_synced" ,
165+ Help : "Number of tenants synced." ,
166+ }),
146167 }
147168 u .userScanner , err = users .NewScanner (cfg .UsersScanner , bucketClient , logger , reg )
148169 if err != nil {
@@ -232,9 +253,9 @@ func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f f
232253
233254func (u * ThanosBucketStores ) syncUsersBlocks (ctx context.Context , f func (context.Context , * store.BucketStore ) error ) (returnErr error ) {
234255 defer func (start time.Time ) {
235- u .cortexBucketStoreMetrics . syncTimes .Observe (time .Since (start ).Seconds ())
256+ u .syncTimes .Observe (time .Since (start ).Seconds ())
236257 if returnErr == nil {
237- u .cortexBucketStoreMetrics . syncLastSuccess .SetToCurrentTime ()
258+ u .syncLastSuccess .SetToCurrentTime ()
238259 }
239260 }(time .Now ())
240261
@@ -259,8 +280,8 @@ func (u *ThanosBucketStores) syncUsersBlocks(ctx context.Context, f func(context
259280 includeUserIDs [userID ] = struct {}{}
260281 }
261282
262- u .cortexBucketStoreMetrics . tenantsDiscovered .Set (float64 (len (userIDs )))
263- u .cortexBucketStoreMetrics . tenantsSynced .Set (float64 (len (includeUserIDs )))
283+ u .tenantsDiscovered .Set (float64 (len (userIDs )))
284+ u .tenantsSynced .Set (float64 (len (includeUserIDs )))
264285
265286 // Create a pool of workers which will synchronize blocks. The pool size
266287 // is limited in order to avoid to concurrently sync a lot of tenants in
0 commit comments