Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ jobs:
strategy:
matrix:
go:
- ^1.21
- ^1.22
- ^1.23
- ^1.24
- ^1.25
- ^1.26
- ^1
steps:

Expand Down Expand Up @@ -110,9 +108,9 @@ jobs:
strategy:
matrix:
os:
- debian:10
- debian:11
- debian:12
- debian:13
- ubuntu:20.04
- ubuntu:22.04
- ubuntu:24.04
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Image which contains the binary artefacts
#
FROM golang:bookworm AS build
FROM golang:trixie AS build


COPY . ./grafsy
Expand Down
14 changes: 7 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c Client) saveChannelToRetry(ch chan string, size int, carbonAddr string)
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
f, err := os.OpenFile(retFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
c.Lc.lg.Println(err.Error())
c.Lc.lg.Println(err)
}
defer f.Close()

Expand All @@ -93,7 +93,7 @@ func (c Client) saveChannelToRetry(ch chan string, size int, carbonAddr string)
_, err = f.WriteString(<-ch + "\n")
if err != nil {
dropped++
c.Lc.lg.Println(err.Error())
c.Lc.lg.Println(err)
} else {
saved++
}
Expand Down Expand Up @@ -125,11 +125,11 @@ func (c Client) removeOldDataFromRetryFile(carbonAddr string) error {
// Attempt to send metric to graphite server via connection
func (c *Client) tryToSendToGraphite(metric string, carbonAddr string, conn net.Conn) error {
// If at any point "HOSTNAME" was used instead of real hostname - replace it
metric = strings.Replace(metric, "HOSTNAME", c.Lc.hostname, -1)
metric = strings.ReplaceAll(metric, "HOSTNAME", c.Lc.hostname)

_, err := conn.Write([]byte(metric + "\n"))
if err != nil {
c.Lc.lg.Println("Write to server failed:", err.Error())
c.Lc.lg.Println("Write to server failed:", err)
return err
}
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].sent, 1)
Expand All @@ -156,7 +156,7 @@ func (c Client) runBackend(carbonAddr string) {
// Try to dial to Graphite server. If ClientSendInterval is 10 seconds - dial should be no longer than 1 second
conn, err := net.DialTimeout("tcp", carbonAddr, time.Duration(c.Conf.ConnectTimeout)*time.Second)
if err != nil {
c.Lc.lg.Println("Can not connect to graphite server: ", err.Error())
c.Lc.lg.Println("Can not connect to graphite server:", err)
c.saveChannelToRetry(monChannel, len(monChannel), carbonAddr)
c.saveChannelToRetry(mainChannel, len(mainChannel), carbonAddr)
c.removeOldDataFromRetryFile(carbonAddr)
Expand All @@ -166,7 +166,7 @@ func (c Client) runBackend(carbonAddr string) {
// We set dead line for connection to write. It should be the rest of we have for client interval
err = conn.SetWriteDeadline(time.Now().Add(time.Duration(c.Conf.ClientSendInterval-c.Conf.ConnectTimeout-1) * time.Second))
if err != nil {
c.Lc.lg.Println("Can not set deadline for connection: ", err.Error())
c.Lc.lg.Println("Can not set deadline for connection:", err)
connectionFailed = true
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func (c Client) runBackend(carbonAddr string) {
bufSize = len(mainChannel)

if !connectionFailed {
for processedMainBuff := 0; processedMainBuff < bufSize; processedMainBuff = processedMainBuff + 1 {
for processedMainBuff := 0; processedMainBuff < bufSize; processedMainBuff++ {
metric := <-mainChannel

err = c.tryToSendToGraphite(metric, carbonAddr, conn)
Expand Down
23 changes: 11 additions & 12 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"

"github.com/pelletier/go-toml/v2"
"github.com/pkg/errors"
)

// ConfigPath is the default path to the configuration file
Expand Down Expand Up @@ -154,7 +153,7 @@ func (conf *Config) LoadConfig(configFile string) error {

if conf.ClientSendInterval < 1 || conf.AggrInterval < 1 || conf.AggrPerSecond < 1 ||
conf.MetricsPerSecond < 1 || conf.ConnectTimeout < 1 {
return errors.New("ClientSendInterval, AggrInterval, AggrPerSecond, ClientSendInterval, " +
return fmt.Errorf("ClientSendInterval, AggrInterval, AggrPerSecond, ClientSendInterval, " +
"MetricsPerSecond, ConnectTimeout must be greater than 0")
}

Expand Down Expand Up @@ -182,17 +181,17 @@ func (conf *Config) prepareEnvironment() error {
// Create directory with default permissions first
err = os.MkdirAll(conf.MetricDir, os.ModePerm)
if err != nil {
return errors.Wrap(err, "Failed to create MetricDir: "+conf.MetricDir)
return fmt.Errorf("failed to create MetricDir %s: %w", conf.MetricDir, err)
}
// Then explicitly set the desired permissions to avoid issues with umask
err = os.Chmod(conf.MetricDir, 0777|os.ModeSticky)
if err != nil {
return errors.Wrap(err, "Failed to chmod MetricDir after creation: "+conf.MetricDir)
return fmt.Errorf("failed to chmod MetricDir after creation %s: %w", conf.MetricDir, err)
}
} else {
err = os.Chmod(conf.MetricDir, 0777|os.ModeSticky)
if err != nil {
return errors.Wrap(err, "Failed to chmod MetricDir: "+conf.MetricDir)
return fmt.Errorf("failed to chmod MetricDir %s: %w", conf.MetricDir, err)
}
}

Expand All @@ -204,21 +203,21 @@ func (conf *Config) prepareEnvironment() error {
if conf.UseACL {
err := setACL(conf.MetricDir)
if err != nil {
return errors.Wrap(err, "Can not set ACLs for dir "+conf.MetricDir)
return fmt.Errorf("can not set ACLs for dir %s: %w", conf.MetricDir, err)
}
}

if _, err := os.Stat(filepath.Dir(conf.Log)); conf.Log != "-" || os.IsNotExist(err) {
if err = os.MkdirAll(filepath.Dir(conf.Log), os.ModePerm); err != nil {
return errors.Wrap(err, "Can not create logfile's dir "+filepath.Dir(conf.Log))
return fmt.Errorf("can not create logfile's dir %s: %w", filepath.Dir(conf.Log), err)
}
}

// Check if servers in CarbonAddrs are resolvable
for _, carbonAddr := range conf.CarbonAddrs {
_, err := net.ResolveTCPAddr("tcp", carbonAddr)
if err != nil {
return errors.New("Could not resolve an address from CarbonAddrs: " + err.Error())
return fmt.Errorf("could not resolve an address from CarbonAddrs: %w", err)
}
}

Expand All @@ -239,7 +238,7 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {

err := conf.prepareEnvironment()
if err != nil {
return nil, errors.Wrap(err, "Can not prepare environment")
return nil, fmt.Errorf("can not prepare environment: %w", err)
}

/*
Expand All @@ -265,7 +264,7 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {
} else {
f, err = os.OpenFile(conf.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0660)
if err != nil {
log.Println("Can not open file", conf.Log, err.Error())
log.Println("Can not open file", conf.Log, err)
os.Exit(1)
}
}
Expand All @@ -275,9 +274,9 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {
if hostname == "" {
hostname, err = os.Hostname()
if err != nil {
return nil, errors.New("Can not resolve the hostname: " + err.Error())
return nil, fmt.Errorf("can not resolve the hostname: %w", err)
}
hostname = strings.Replace(hostname, ".", "_", -1)
hostname = strings.ReplaceAll(hostname, ".", "_")
}

// There are 4 metrics per backend in client and 3 in server stats
Expand Down
7 changes: 4 additions & 3 deletions config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
package grafsy

import (
"fmt"

"github.com/naegelejd/go-acl"
"github.com/pkg/errors"
)

func setACL(metricDir string) error {
ac, err := acl.Parse("user::rw group::rw mask::r other::r")
if err != nil {
return errors.New("Unable to parse acl: " + err.Error())
return fmt.Errorf("unable to parse acl: %w", err)
}
err = ac.SetFileDefault(metricDir)
if err != nil {
return errors.New("Unable to set acl: " + err.Error())
return fmt.Errorf("unable to set acl: %w", err)
}
return nil
}
2 changes: 1 addition & 1 deletion config_noacl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build noacl
//go:build noacl

package grafsy

Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
module github.com/leoleovich/grafsy

go 1.21.0
go 1.24

require (
github.com/naegelejd/go-acl v0.0.0-20200406162857-ebe394c522e5
github.com/pelletier/go-toml/v2 v2.2.3
github.com/pkg/errors v0.9.1
github.com/pelletier/go-toml/v2 v2.2.4
)
14 changes: 2 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/naegelejd/go-acl v0.0.0-20200406162857-ebe394c522e5 h1:R10+S1Knv6udBjyDYU84+zD5R8qLWg7wSH/ddhJSzX4=
github.com/naegelejd/go-acl v0.0.0-20200406162857-ebe394c522e5/go.mod h1:nMzsOoQWESVMF6s+hAF8Qnc14fUIpL7pfmGm6MV8B2g=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
5 changes: 1 addition & 4 deletions grafsy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"flag"
"fmt"
"os"
"sync"

"github.com/leoleovich/grafsy"
)
Expand Down Expand Up @@ -52,11 +51,9 @@ func main() {
Mon: mon,
}

var wg sync.WaitGroup
go mon.Run()
go srv.Run()
go cli.Run()

wg.Add(3)
wg.Wait()
select {}
}
2 changes: 1 addition & 1 deletion grafsy_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func acceptAndReport(l net.Listener, ch chan string) error {
conBuf := bufio.NewReader(conn)
for {
metric, err := conBuf.ReadString('\n')
ch <- strings.Replace(strings.Replace(metric, "\r", "", -1), "\n", "", -1)
ch <- strings.TrimRight(metric, "\r\n")
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *Monitoring) generateOwnMonitoring() {
}

for _, carbonAddr := range m.Conf.CarbonAddrs {
carbonAddrString := strings.Replace(carbonAddr, ".", "_", -1)
carbonAddrString := strings.ReplaceAll(carbonAddr, ".", "_")
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.dropped %v %v", path, carbonAddrString, m.clientStat[carbonAddr].dropped, now))
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.from_retry %v %v", path, carbonAddrString, m.clientStat[carbonAddr].fromRetry, now))
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.saved %v %v", path, carbonAddrString, m.clientStat[carbonAddr].saved, now))
Expand Down
27 changes: 12 additions & 15 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -85,7 +84,7 @@ func (s Server) aggrMetricsWithPrefix() {
}

select {
case s.Lc.mainChannel <- fmt.Sprintf("%s %.2f %d", strings.Replace(metricName, prefix, "", -1), value, aggrTimestamp):
case s.Lc.mainChannel <- fmt.Sprintf("%s %.2f %d", strings.ReplaceAll(metricName, prefix, ""), value, aggrTimestamp):
default:
s.Lc.lg.Printf("Too many metrics in the main queue (%d). I can not append aggregated metrics", len(s.Lc.mainChannel))
dropped++
Expand Down Expand Up @@ -161,7 +160,7 @@ func (s Server) handleRequest(conn net.Conn) {
s.Mon.Increase(&s.Mon.serverStat.net, 1)
metric, err := conBuf.ReadString('\n')
// Even if error occurred we still put "metric" into analysis, cause it can be a valid metric, but without \n
s.cleanAndUseIncomingData([]string{strings.Replace(strings.Replace(metric, "\r", "", -1), "\n", "", -1)})
s.cleanAndUseIncomingData([]string{strings.TrimRight(metric, "\r\n")})
if err != nil {
return
}
Expand All @@ -174,12 +173,12 @@ func (s Server) handleDirMetrics() {
for ; ; time.Sleep(time.Duration(s.Conf.ClientSendInterval) * time.Second) {
entries, err := os.ReadDir(s.Conf.MetricDir)
if err != nil {
panic(err.Error())
panic(err)
}
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
panic(err.Error())
panic(err)
}
resultsList, _ := readMetricsFromFile(s.Conf.MetricDir + "/" + info.Name())
s.Mon.Increase(&s.Mon.serverStat.dir, len(resultsList))
Expand All @@ -193,18 +192,18 @@ func (s *Server) handleListener(addr *net.TCPAddr) {
// Listen for incoming connections.
l, err := net.ListenTCP("tcp", addr)
if err != nil {
s.Lc.lg.Println("Failed to run server:", err.Error())
s.Lc.lg.Println("Failed to run server:", err)
os.Exit(1)
} else {
s.Lc.lg.Println("Server is running")
}

s.Lc.lg.Println("Server is running")
defer l.Close()

for {
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
s.Lc.lg.Println("Error accepting: ", err.Error())
s.Lc.lg.Println("Error accepting:", err)
os.Exit(1)
}
// Handle connections in a new goroutine.
Expand All @@ -224,20 +223,20 @@ func (s *Server) resolveBind() []*net.TCPAddr {
// Resolve hostname to ips
h, p, err := net.SplitHostPort(s.Conf.LocalBind)
if err != nil {
s.Lc.lg.Println("Failed to split bind address:", err.Error())
s.Lc.lg.Println("Failed to split bind address:", err)
os.Exit(1)
}

ips, err := net.LookupIP(h)
if err != nil {
s.Lc.lg.Println("Failed to lookup IPs:", err.Error())
s.Lc.lg.Println("Failed to lookup IPs:", err)
os.Exit(1)
}

// Resolve named ports
port, err := net.LookupPort("tcp", p)
if err != nil {
s.Lc.lg.Println("Failed to lookup port:", err.Error())
s.Lc.lg.Println("Failed to lookup port:", err)
os.Exit(1)
}

Expand Down Expand Up @@ -266,7 +265,5 @@ func (s *Server) Run() {
// Run goroutine for aggr metrics with prefix
go s.aggrMetricsWithPrefix()

wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
select {}
}
Loading