Skip to content

Commit

Permalink
Merge pull request #2 from mathieu-pousse/multi-region
Browse files Browse the repository at this point in the history
  • Loading branch information
YoannMa authored Jul 20, 2022
2 parents baeff46 + 5982637 commit d8d7795
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 124 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
# vendor/

bin/
scaleway_exporter

*.env
*.env
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PLATFORM=local

REPOSITORY=yoannm/scaleway_exporter
VERSION=0.1.0
VERSION=0.2.0

export DOCKER_BUILDKIT=1
export COMPOSE_DOCKER_CLI_BUILD=1
Expand Down
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,30 @@

Prometheus exporter for various metrics about your [Scaleway Elements](https://www.scaleway.com/en/elements/) loadbalancers and managed databases, written in Go.

## How to

```
$ export SCALEWAY_ACCESS_KEY=<access key goes here>
$ export SCALEWAY_SECRET_KEY=<secret key goes here>
$ ./scaleway_exporter
level=info ts=2022-07-19T13:25:40.352520863Z caller=main.go:83 msg="Scaleway Region is set to ALL"
level=info ts=2022-07-19T13:25:40.352550422Z caller=main.go:89 msg="starting scaleway_exporter" version= revision= buildDate= goVersion=go1.18.3
level=info ts=2022-07-19T13:25:40.352691527Z caller=main.go:145 msg=listening addr=:9503
```

By default, all the collectors are enabled (buckets, databases, loadbalancer) over all Scaleway regions.
If needed, you can disable certain collections by adding the `disable-bucket-collector`, `disable-database-collector` or `disable-loadbalancer-collector` flags to the command line.
You can also limit the scraped region by setting the environment variable `SCALEWAY_REGION=fr-par` for instance.

## TODO

- [ ] Add more documentation
- [ ] Example prometheus rules
- [ ] Example grafana dashboard
- [ ] Proper CI
- [ ] Cross Region metrics pulling
- [x] Cross Region metrics pulling
- [ ] More metrics ? (Container Registry size is available)
- [ ] Ability to filter the kind of product (only database for example)
- [x] Ability to filter the kind of product (only database for example)
- [ ] Register a new default port as it's using one from [another Scaleway Exporter](https://github.com/promhippie/scw_exporter) ? (see [prometheus documentation](https://github.com/prometheus/prometheus/wiki/Default-port-allocations))

## Acknowledgements
Expand Down
156 changes: 87 additions & 69 deletions collector/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,61 @@ import (

// BucketCollector collects metrics about all buckets.
type BucketCollector struct {
logger log.Logger
errors *prometheus.CounterVec
client *scw.Client
region *scw.Region
s3Client *s3.S3
timeout time.Duration
logger log.Logger
errors *prometheus.CounterVec
endpoints []Endpoint
timeout time.Duration

ObjectCount *prometheus.Desc
Bandwidth *prometheus.Desc
StorageUsageStandard *prometheus.Desc
StorageUsageGlacier *prometheus.Desc
}

type Endpoint struct {
client *scw.Client
region scw.Region
s3Client *s3.S3
}

// NewBucketCollector returns a new BucketCollector.
func NewBucketCollector(logger log.Logger, errors *prometheus.CounterVec, client *scw.Client, timeout time.Duration) *BucketCollector {
func NewBucketCollector(logger log.Logger, errors *prometheus.CounterVec, client *scw.Client, timeout time.Duration, regions []scw.Region) *BucketCollector {
errors.WithLabelValues("bucket").Add(0)

region, _ := client.GetDefaultRegion()

accessKey, _ := client.GetAccessKey()

secretKey, _ := client.GetSecretKey()

newSession, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
Region: aws.String(fmt.Sprint(region)),
})
endpoints := make([]Endpoint, len(regions))

if err != nil {
_ = level.Error(logger).Log("msg", "can't create a S3 client", "err", err)
os.Exit(1)
}
for i, region := range regions {

s3Client := s3.New(newSession, &aws.Config{
Endpoint: aws.String("https://s3." + fmt.Sprint(region) + ".scw.cloud"),
S3ForcePathStyle: aws.Bool(true),
})
newSession, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
Region: aws.String(fmt.Sprint(region)),
})

if err != nil {
_ = level.Error(logger).Log("msg", "can't create a S3 client", "err", err)
os.Exit(1)
}

s3Client := s3.New(newSession, &aws.Config{
Endpoint: aws.String("https://s3." + fmt.Sprint(region) + ".scw.cloud"),
S3ForcePathStyle: aws.Bool(true),
})

endpoints[i] = Endpoint{
client: client,
s3Client: s3Client,
region: region,
}
}
return &BucketCollector{
region: &region,
logger: logger,
errors: errors,
client: client,
s3Client: s3Client,
timeout: timeout,
logger: logger,
errors: errors,
endpoints: endpoints,
timeout: timeout,

ObjectCount: prometheus.NewDesc(
"scaleway_s3_object_total",
Expand Down Expand Up @@ -142,13 +152,15 @@ type HandleSimpleMetricOptions struct {
MetricName MetricName
Desc *prometheus.Desc
labels []string
Endpoint Endpoint
}

type HandleMultiMetricsOptions struct {
Bucket string
MetricName MetricName
DescMatrix map[string]*prometheus.Desc
labels []string
Endpoint Endpoint
}

// Collect is called by the Prometheus registry when collecting metrics.
Expand All @@ -157,69 +169,72 @@ func (c *BucketCollector) Collect(ch chan<- prometheus.Metric) {
_, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

buckets, err := c.s3Client.ListBuckets(&s3.ListBucketsInput{})
for _, endpoint := range c.endpoints {

if err != nil {
c.errors.WithLabelValues("bucket").Add(1)
_ = level.Warn(c.logger).Log("msg", "can't fetch the list of buckets", "err", err)
buckets, err := endpoint.s3Client.ListBuckets(&s3.ListBucketsInput{})

return
}
if err != nil {
c.errors.WithLabelValues("bucket").Add(1)
_ = level.Warn(c.logger).Log("msg", "can't fetch the list of buckets", "err", err)

scwReq := &scw.ScalewayRequest{
Method: "POST",
Path: "/object-private/v1/regions/" + fmt.Sprint(c.region) + "/buckets-info/",
}
return
}

var bucketNames []string
scwReq := &scw.ScalewayRequest{
Method: "POST",
Path: "/object-private/v1/regions/" + fmt.Sprint(endpoint.region) + "/buckets-info/",
}

for _, bucket := range buckets.Buckets {
var bucketNames []string

bucketNames = append(bucketNames, *bucket.Name)
}
for _, bucket := range buckets.Buckets {

projectId := strings.Split(*buckets.Owner.ID, ":")[0]
bucketNames = append(bucketNames, *bucket.Name)
}

_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("found %d buckets under projectID %s : %s", len(bucketNames), projectId, bucketNames))
projectId := strings.Split(*buckets.Owner.ID, ":")[0]

err = scwReq.SetBody(&BucketInfoRequestBody{ProjectId: projectId, BucketsName: bucketNames})
_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("found %d buckets under projectID %s : %s", len(bucketNames), projectId, bucketNames))

if err != nil {
c.errors.WithLabelValues("bucket").Add(1)
_ = level.Warn(c.logger).Log("msg", "can't fetch details of buckets", "err", err)
err = scwReq.SetBody(&BucketInfoRequestBody{ProjectId: projectId, BucketsName: bucketNames})

return
}
if err != nil {
c.errors.WithLabelValues("bucket").Add(1)
_ = level.Warn(c.logger).Log("msg", "can't fetch details of buckets", "err", err)

var response BucketInfoList
return
}

err = c.client.Do(scwReq, &response)
var response BucketInfoList

if err != nil {
c.errors.WithLabelValues("bucket").Add(1)
_ = level.Warn(c.logger).Log("msg", "can't fetch details of buckets", "err", err)
err = endpoint.client.Do(scwReq, &response)

return
}
if err != nil {
c.errors.WithLabelValues("bucket").Add(1)
_ = level.Warn(c.logger).Log("msg", "can't fetch details of buckets", "err", err)

var wg sync.WaitGroup
defer wg.Wait()
return
}

for name, bucket := range response.Buckets {
var wg sync.WaitGroup
defer wg.Wait()

wg.Add(1)
for name, bucket := range response.Buckets {

_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Fetching metrics for bucket : %s", name))
wg.Add(1)

go c.FetchMetricsForBucket(&wg, ch, name, bucket)
_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Fetching metrics for bucket : %s", name))

go c.FetchMetricsForBucket(&wg, ch, name, bucket, endpoint)
}
}
}

func (c *BucketCollector) FetchMetricsForBucket(parentWg *sync.WaitGroup, ch chan<- prometheus.Metric, name string, bucket BucketInfo) {
func (c *BucketCollector) FetchMetricsForBucket(parentWg *sync.WaitGroup, ch chan<- prometheus.Metric, name string, bucket BucketInfo, endpoint Endpoint) {

defer parentWg.Done()

labels := []string{name, fmt.Sprint(c.region), fmt.Sprint(bucket.IsPublic)}
labels := []string{name, fmt.Sprint(endpoint.region), fmt.Sprint(bucket.IsPublic)}

// TODO check if it is possible to add bucket tag as labels
//for _, tags := range instance.Tags {
Expand All @@ -236,20 +251,23 @@ func (c *BucketCollector) FetchMetricsForBucket(parentWg *sync.WaitGroup, ch cha
MetricName: ObjectCount,
labels: labels,
Desc: c.ObjectCount,
Endpoint: endpoint,
})

go c.HandleSimpleMetric(&wg, ch, &HandleSimpleMetricOptions{
Bucket: name,
MetricName: BytesSent,
labels: labels,
Desc: c.Bandwidth,
Endpoint: endpoint,
})

go c.HandleMultiMetrics(&wg, ch, &HandleMultiMetricsOptions{
Bucket: name,
MetricName: StorageUsage,
labels: labels,
DescMatrix: map[string]*prometheus.Desc{"STANDARD": c.StorageUsageStandard, "GLACIER": c.StorageUsageGlacier},
Endpoint: endpoint,
})
}

Expand All @@ -259,7 +277,7 @@ func (c *BucketCollector) HandleSimpleMetric(parentWg *sync.WaitGroup, ch chan<-

var response Metric

err := c.FetchMetric(options.Bucket, options.MetricName, &response)
err := c.FetchMetric(options.Bucket, options.MetricName, &response, options.Endpoint)

if err != nil {

Expand Down Expand Up @@ -304,7 +322,7 @@ func (c *BucketCollector) HandleMultiMetrics(parentWg *sync.WaitGroup, ch chan<-

var response Metric

err := c.FetchMetric(options.Bucket, options.MetricName, &response)
err := c.FetchMetric(options.Bucket, options.MetricName, &response, options.Endpoint)

if err != nil {

Expand Down Expand Up @@ -355,7 +373,7 @@ func (c *BucketCollector) HandleMultiMetrics(parentWg *sync.WaitGroup, ch chan<-
}
}

func (c *BucketCollector) FetchMetric(Bucket string, MetricName MetricName, response *Metric) error {
func (c *BucketCollector) FetchMetric(Bucket string, MetricName MetricName, response *Metric, endpoint Endpoint) error {

query := url.Values{}

Expand All @@ -365,11 +383,11 @@ func (c *BucketCollector) FetchMetric(Bucket string, MetricName MetricName, resp

scwReq := &scw.ScalewayRequest{
Method: "GET",
Path: "/object-private/v1/regions/" + fmt.Sprint(c.region) + "/buckets/" + Bucket + "/metrics",
Path: "/object-private/v1/regions/" + fmt.Sprint(endpoint.region) + "/buckets/" + Bucket + "/metrics",
Query: query,
}

err := c.client.Do(scwReq, &response)
err := endpoint.client.Do(scwReq, &response)

if err != nil {

Expand Down
Loading

0 comments on commit d8d7795

Please sign in to comment.