Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update bench #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
OS := $(shell uname | awk '{print tolower($$0)}')

.PHONY: generate
generate:
pushd ./bench && \
rm ./bench-data/*.nljson 2> /dev/null || true && \
go mod download && go run ./generate.go ./logger.go -file-count 256 -line-count 4096 && \
popd
rm -f ./bench-data/*.nljson || true
go run ./bench/generate -file-count 256 -line-count 4096

.PHONY: bench
bench:
go mod download && go run ./bench/bench.go ./bench/logger.go
go run ./bench/server

.PHONY: bench-file-d
bench-file-d:
pushd ./file.d && \
rm offsets || true && \
./file.d_$(OS) --config config.yaml && \
popd
rm ./file.d/offsets || true
./file.d/file.d --config ./file.d/config.yaml

.PHONY: bench-fluent-bit
bench-fluent-bit:
./fluentbit/fluent-bit fluent-bit -c fluentbit/config.toml

.PHONY: bench-filebeat
bench-filebeat:
pushd ./filebeat && \
rm -r data || true && \
./filebeat_$(OS) -c config.yaml && \
popd
rm -rf ./filebeat/data || true
./filebeat/filebeat -c ./filebeat/config.yaml

.PHONY: bench-vector
bench-vector:
pushd ./vector && \
rm -r ./logs || true && \
./vector_$(OS) --config config.toml && \
popd
rm ./logs/checkpoints.json || true
./vector/vector --config ./vector/config.toml
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
# Benchmarks

Pipeline: File > JSON > Elastic Search
Pipeline: File > JSON > Elastic Search

Supported tools for benchmarking:
* File.d
* Vector
* Filebeat

Works only on MacOS.
* File.d v0.25.2
* Vector v0.38.0
* Filebeat 7.17.13

Works only on x86-64 MacOS/Linux.

How to run:

* Download a log collector binary to its folder
* Run `make generate` to generate json bench data (only once)
* Run `make bench` to run elasticsearch mock service
* Switch to other terminal tab and run `make bench-file-d|bench-vector|bench-filebeat` (one of tool)
* Look at output of elasticsearch mock service

2.6 GHz 6‑Core Intel Core i7 results:
* File.d — 251.48Mb/s
* Vector — 21.50Mb/s
* Filebeat — 64.75Mb/s
AMD Ryzen 9 5950x (4.9 GHz 16‑Core) and SSD M2 Samsung 980 PRO (read 7000 MB/s, write 5000 MB/s) results:

* File.d — 500.48Mb/s
* Vector — 323.14Mb/s
* Filebeat — 75.42Mb/s
37 changes: 20 additions & 17 deletions bench/generate.go → bench/generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)

var (
Expand All @@ -29,6 +30,14 @@ var (
workersCount = runtime.GOMAXPROCS(0)
)

var logger = func() *zap.SugaredLogger {
lg, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
return lg.Sugar()
}()

func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano()) // constant number is intended to repeat output
Expand All @@ -43,7 +52,7 @@ func main() {

jobs := make(chan int)

err = os.MkdirAll("./../bench-data", os.ModePerm)
err = os.MkdirAll("./bench-data", os.ModePerm)
if err != nil {
logger.Info(err.Error())
os.Exit(1)
Expand All @@ -69,25 +78,19 @@ func main() {
func getJSONFixtures() ([]*insaneJSON.Root, error) {
jsonFixtures := make([]*insaneJSON.Root, 0, 0)

//root, err := insaneJSON.DecodeFile("./fixtures/canada.json")
//if err != nil {
// return nil, err
//}
//jsonFixtures = append(jsonFixtures, root)
//
//root, err = insaneJSON.DecodeFile("./fixtures/citm.json")
//if err != nil {
// return nil, err
//}
//jsonFixtures = append(jsonFixtures, root)

root, err := insaneJSON.DecodeFile("./fixtures/twitter.json")
root, err := insaneJSON.DecodeFile("./bench/fixtures/twitter.json")
if err != nil {
return nil, err
}
jsonFixtures = append(jsonFixtures, root)

root, err = insaneJSON.DecodeFile("./fixtures/unknown.json")
root, err = insaneJSON.DecodeFile("./bench/fixtures/unknown.json")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -154,7 +157,7 @@ func worker(fileName string, lineCount int, fields []string, values []string, jo
queue := make([]*insaneJSON.Node, 0)
jsonOut := make([]byte, 0)
for j := range jobs {
fullFileName := fmt.Sprintf("../bench-data/%s-%04d.nljson", fileName, j)
fullFileName := fmt.Sprintf("./bench-data/%s-%04d.nljson", fileName, j)

curAvg := avgSizeTarget
buf = buf[:0]
Expand Down Expand Up @@ -221,20 +224,20 @@ func genFillNodes(root *insaneJSON.Root, node *insaneJSON.Node, fields []string,
val float64
t string
}{
{0.2,"obj"},
{0.2,"arr"},
{0.3,"str"},
{0.1,"int"},
{0.1,"flt"},
{0.1,"bool"},
{0.2, "obj"},
{0.2, "arr"},
{0.3, "str"},
{0.1, "int"},
{0.1, "flt"},
{0.1, "bool"},
}
r := rand.Float64()

for _, x := range probs {
if r < x.val {
return x.t
}
r-=x.val
r -= x.val
}

return "null"
Expand Down
32 changes: 0 additions & 32 deletions bench/logger.go

This file was deleted.

23 changes: 16 additions & 7 deletions bench/bench.go → bench/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"time"

"github.com/valyala/fasthttp"
"go.uber.org/zap"
)

var (
addr = flag.String("addr", "127.0.0.1:9200", "TCP address to listen to")
debug = flag.Bool("debug", false, "Verbose logging")
filebeatBulkSize = flag.Int("filebeat", 50, "Filebeat bulk_max_size")
filebeatBulkSize = flag.Int("filebeat", 1000, "Filebeat bulk_max_size")
duration = flag.Duration("duration", time.Second*10, "Benchmark duration")
stats = &Stats{}
firstReq = true
Expand All @@ -29,11 +30,19 @@ var (
PathEmpty = []byte("/")
PathLicense = []byte("/_license")
PathXpack = []byte("/_xpack")
PathFilebeatTemplate = []byte("/_template/filebeat-7.15.2")
PathCatFilebeatTemplate = []byte("/_cat/templates/filebeat-7.15.2")
PathFilebeatTemplate = []byte("/_template/filebeat-")
PathCatFilebeatTemplate = []byte("/_cat/templates/filebeat-")
PathBulk = []byte("/_bulk")
)

var logger = func() *zap.SugaredLogger {
lg, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
return lg.Sugar()
}()

func main() {
flag.Parse()
termChan := make(chan os.Signal, 2)
Expand All @@ -42,7 +51,7 @@ func main() {
srv := fasthttp.Server{
Handler: requestHandler,
Name: "default",
MaxRequestBodySize: fasthttp.DefaultMaxRequestBodySize,
MaxRequestBodySize: 1024 * 1024 * 100, // 100 MiB
}

logger.Infof("started, waiting for first request")
Expand Down Expand Up @@ -176,14 +185,14 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
"security":{"available":true,"enabled":false},"slm":{"available":true,"enabled":true},"spatial":{"available":true,"enabled":true},
"sql":{"available":true,"enabled":true},"transform":{"available":true,"enabled":true},"voting_only":{"available":true,"enabled":true},
"watcher":{"available":false,"enabled":true}},"tagline":"You know, for X"}`)
case bytes.Equal(path, PathCatFilebeatTemplate):
case bytes.HasPrefix(path, PathCatFilebeatTemplate):
ctx.SetBody(filebeatTemplate)
}
return
}

// Filebeat puts template
if bytes.Equal(method, MethodPut) && bytes.Equal(path, PathFilebeatTemplate) {
if bytes.Equal(method, MethodPut) && bytes.HasPrefix(path, PathFilebeatTemplate) {
filebeatTemplate = ctx.PostBody()
ctx.SetBodyString("{\"took\":0,\"errors\":false}\n")
return
Expand Down Expand Up @@ -248,7 +257,7 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
return
}

ctx.Error("Bad Request", fasthttp.StatusBadRequest)
ctx.Write([]byte("{}"))
}

func dumpReport(stats *Stats) {
Expand Down
6 changes: 3 additions & 3 deletions file.d/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ pipelines:
benchmark:
settings:
decoder: json
capacity: 1024
capacity: 2048 # more capacity = more memory usage, but more throughput
input:
type: file
persistence_mode: async
watching_dir: ./../bench-data/
watching_dir: ./bench-data/
filename_pattern: "*.nljson"
offsets_file: ./offsets
offsets_file: ./file.d/offsets
offsets_op: reset
output:
type: elasticsearch
Expand Down
Binary file removed file.d/file.d_darwin
Binary file not shown.
4 changes: 2 additions & 2 deletions filebeat/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ filebeat.inputs:
json.add_error_key: true
json.message_key: message
paths:
- ../bench-data/*.nljson
- ./bench-data/*.nljson

output.elasticsearch:
hosts: ["http://127.0.0.1:9200"]
bulk_max_size: 50
bulk_max_size: 1000

logging.level: info
logging.to_files: false
Expand Down
Binary file removed filebeat/filebeat_darwin
Binary file not shown.
12 changes: 8 additions & 4 deletions vector/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ data_dir = "./"

[sources.logs]
type = "file"
include = ["../bench-data/*.nljson"]
include = ["./bench-data/*.nljson"]
data_dir = './'
fingerprinting.strategy = "device_and_inode"

[transforms.logs_parse_json]
inputs = [ "logs" ]
type = "json_parser"
[transforms.json_parser]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type = "json_parse" removes in vector v0.12

type = "remap"
inputs = ["logs"]
drop_on_error = false
source = '''
. |= object!(parse_json!(.message))
'''

[sinks.elasticsearch]
type = "elasticsearch"
Expand Down
Binary file removed vector/vector_darwin
Binary file not shown.