Skip to content

Commit

Permalink
Increase reading buffer size on large messages (#74) (#84)
Browse files Browse the repository at this point in the history
* Increase reading buffer size on large messages  (#74)

When larger than MaxBufferSize messages are being sent, the whole message processing
is failing badly. This can easily happen when socket plugin is used in Ceilometer metrics
or events processing.

Resolves: rhbz#2016460

Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=2016460
(cherry picked from commit f13e241)
(cherry picked from commit 4af768f)

* Ci opstools fix (#85) (#88) (#90)

* Fix centos-opstools repo in CI

* Update .github/workflows/integration.yml

Co-authored-by: Leif Madsen <[email protected]>
Co-authored-by: Matthias Runge <[email protected]>

(cherry picked from commit 04fc691)
(cherry picked from commit 63f8c2c)
(cherry picked from commit ecd07f5)

Co-authored-by: Martin Mágr <[email protected]>

Co-authored-by: Leif Madsen <[email protected]>
  • Loading branch information
paramite and leifmadsen authored Feb 4, 2022
1 parent ecd07f5 commit 0120e4c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 7 deletions.
27 changes: 20 additions & 7 deletions plugins/transport/socket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/infrawatch/sg-core/pkg/transport"
)

const maxBufferSize = 16384
const (
maxBufferSize = 65535
)

var (
msgCount int64
Expand Down Expand Up @@ -70,22 +72,27 @@ type Socket struct {
// Run implements type Transport
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {

msgBuffer := make([]byte, maxBufferSize)
var laddr net.UnixAddr

laddr.Name = s.conf.Path
laddr.Net = "unixgram"

os.Remove(s.conf.Path)

pc, err := net.ListenUnixgram("unixgram", &laddr)
if err != nil {
s.logger.Errorf(err, "failed to listen on unix socket %s", laddr.Name)
s.logger.Errorf(err, "failed to bind unix socket %s", laddr.Name)
return
}
// create socket file if it does not exist
skt, err := pc.File()
if err != nil {
s.logger.Errorf(err, "failed to retrieve file handle for %s", laddr.Name)
return
}
skt.Close()

s.logger.Infof("socket listening on %s", laddr.Name)
go func() {
go func(maxBuffSize int64) {
msgBuffer := make([]byte, maxBuffSize)
for {
n, err := pc.Read(msgBuffer)
if err != nil || n < 1 {
Expand All @@ -96,6 +103,11 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
return
}

// whole buffer was used, so we are potentially handling larger message
if n == len(msgBuffer) {
s.logger.Warnf("full read buffer used")
}

if s.conf.DumpMessages.Enabled {
_, err := s.dumpBuf.Write(msgBuffer[:n])
if err != nil {
Expand All @@ -107,10 +119,11 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
}
s.dumpBuf.Flush()
}

w(msgBuffer[:n])
msgCount++
}
}()
}(maxBufferSize)

for {
select {
Expand Down
83 changes: 83 additions & 0 deletions plugins/transport/socket/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"context"
"io/ioutil"
"net"
"os"
"path"
"sync"
"testing"
"time"

"github.com/infrawatch/apputils/logging"
"github.com/stretchr/testify/require"
"gopkg.in/go-playground/assert.v1"
)

const regularBuffSize = 16384

func TestSocketTransport(t *testing.T) {
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

logpath := path.Join(tmpdir, "test.log")
logger, err := logging.NewLogger(logging.DEBUG, logpath)
require.NoError(t, err)

sktpath := path.Join(tmpdir, "socket")
skt, err := os.OpenFile(sktpath, os.O_RDWR|os.O_CREATE, os.ModeSocket|os.ModePerm)
require.NoError(t, err)
defer skt.Close()

trans := Socket{
conf: configT{
Path: sktpath,
},
logger: &logWrapper{
l: logger,
},
}

t.Run("test large message transport", func(t *testing.T) {
msg := make([]byte, regularBuffSize)
addition := "wubba lubba dub dub"
for i := 0; i < regularBuffSize; i++ {
msg[i] = byte('X')
}
msg[regularBuffSize-1] = byte('$')
msg = append(msg, []byte(addition)...)

// verify transport
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
go trans.Run(ctx, func(mess []byte) {
wg.Add(1)
strmsg := string(mess)
assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message
assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct
wg.Done()
}, make(chan bool))

// wait for socket file to be created
for {
stat, err := os.Stat(sktpath)
require.NoError(t, err)
if stat.Mode()&os.ModeType == os.ModeSocket {
break
}
time.Sleep(250 * time.Millisecond)
}

// write to socket
wskt, err := net.DialUnix("unixgram", nil, &net.UnixAddr{Name: sktpath, Net: "unixgram"})
require.NoError(t, err)
_, err = wskt.Write(msg)
require.NoError(t, err)

cancel()
wg.Wait()
wskt.Close()
})
}

0 comments on commit 0120e4c

Please sign in to comment.