Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Remove async and retry; replace by connection pool
Browse files Browse the repository at this point in the history
Sébastien GLON committed Sep 6, 2016
1 parent 002ce83 commit 435858f
Showing 8 changed files with 461 additions and 354 deletions.
17 changes: 12 additions & 5 deletions examples/flume/client.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ import (

func main() {
//t.SkipNow()
transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:true, NettyHost:"10.98.80.113"})
transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:false, NettyHost:"192.168.11.152"})
if err != nil {
log.Fatal(err)
}
@@ -24,20 +24,27 @@ func main() {
headers := make(map[string]interface{})
headers["host_header"] = "127.0.0.1"
flumeRecord.Set("headers", headers)
flumeRecord.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"))
requestor := goavro.NewRequestor(protocol, transceiver)

requestor := goavro.NewRequestor(protocol, transceiver)

flumeRecord.Set("body", []byte("test 1"))
err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request: ", err)
log.Fatal("Request 1: ", err)
}

log.Printf("Test 1 OK")


time.Sleep(5 * time.Second)
flumeRecord.Set("body", []byte("test 2"))
err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request: ", err)
log.Fatal("Request 2: ", err)
}
log.Printf("Test 2 OK")

}

49 changes: 22 additions & 27 deletions requestor.go
Original file line number Diff line number Diff line change
@@ -45,14 +45,17 @@ func init() {
}

func NewRequestor(localProto Protocol, transceiver transceiver.Transceiver) *Requestor {
return &Requestor{

r := &Requestor{
local_protocol: localProto,
transceiver: transceiver,
// remote_protocol: nil,
// remote_hash: nil,
send_protocol: false,
send_handshake: true,
}
transceiver.InitHandshake(r.write_handshake_request, r.read_handshake_response)
return r
}


@@ -72,12 +75,7 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er
frame1 := new(bytes.Buffer)
frame2 := new(bytes.Buffer)

err := a.write_handshake_request(frame1)
if err!=nil {
return err
}

err = a.write_call_requestHeader(message_name, frame1)
err := a.write_call_requestHeader(message_name, frame1)
if err!=nil {
return err
}
@@ -91,33 +89,24 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er
responses, err := a.transceiver.Transceive(buffer_writers)

if err!=nil {
return err
return fmt.Errorf("Fail to transceive %v", err)
}
//buffer_decoder := bytes.NewBuffer(decoder)
// process the handshake and call response

if len(responses) >0 {
ok, err := a.read_handshake_response(responses[0])
a.read_call_responseCode(responses[1])
if err != nil {
return err
}
a.send_handshake = !ok

if ok {
a.read_call_responseCode(responses[1])
if err != nil {
return err
}
// a.Request(message_name, request_datum)
}
// a.Request(message_name, request_datum)
}
return nil
}

func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) {
if !a.send_handshake {
return nil
}
func (a *Requestor) write_handshake_request() (handshake []byte ,err error) {
buffer := new(bytes.Buffer)
defer buffer.Write(handshake)
local_hash :=a.local_protocol.MD5
remote_name := a.remote_protocol.Name
remote_hash := REMOTE_HASHES[remote_name]
@@ -128,30 +117,36 @@ func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) {

record, err := NewRecord(RecordSchema(handshakeRequestshema))
if err != nil {
return fmt.Errorf("Avro fail to init record handshakeRequest %v",err)
err = fmt.Errorf("Avro fail to init record handshakeRequest %v",err)
return
}

record.Set("clientHash", local_hash)
record.Set("serverHash", remote_hash)
record.Set("meta", make(map[string]interface{}))
codecHandshake, err := NewCodec(handshakeRequestshema)
if err != nil {
return fmt.Errorf("Avro fail to get codec handshakeRequest %v",err)
err = fmt.Errorf("Avro fail to get codec handshakeRequest %v",err)
return
}

if a.send_protocol {
json, err := a.local_protocol.Json()
if err!=nil {
return err
return nil ,err
}
record.Set("clientProtocol", json)
}



if err = codecHandshake.Encode(buffer, record); err !=nil {
return fmt.Errorf("Encode handshakeRequest ",err)
err = fmt.Errorf("Encode handshakeRequest ",err)
return
}

return nil

return
}

func (a *Requestor) write_call_request(message_name string, request_datum interface{}, frame io.Writer) (err error) {
261 changes: 143 additions & 118 deletions requestor_test.go
Original file line number Diff line number Diff line change
@@ -6,18 +6,15 @@ import (
"bytes"
"reflect"
netty "github.com/sebglon/goavro/transceiver/netty"
"github.com/sebglon/goavro/transceiver"
"runtime"
"strconv"
)

func TestWrite_handshake_request(t *testing.T) {
//t.SkipNow()
rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

transceiver, err := netty.NewTransceiver(netty.Config{})
transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT})
if err != nil {
t.Fatal(err)
}
@@ -27,11 +24,11 @@ func TestWrite_handshake_request(t *testing.T) {
}
requestor := NewRequestor(protocol, transceiver)

bb := new(bytes.Buffer)
requestor.write_handshake_request(bb)

hds, err := requestor.write_handshake_request()
// conn.Write(bb.Bytes())
t.Logf("Handshake_request size %v %x\n",bb.Len(), bb.Bytes())
t.Logf( "Handshake_request %v\n", bb.String())
t.Logf("Handshake_request size %v %x\n",len(hds), hds)
t.Logf( "Handshake_request %v\n", hds)

refHandshake := []byte("\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65\x00\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65\x02\x00")
//bytes := bb.Bytes()
@@ -61,7 +58,7 @@ func TestRead_handshake_reponse(t *testing.T) {
if err != nil {
t.Fatal(err)
}
record.Set("match", Enum{"match","BOTH"})
record.Set("match", Enum{"match", "BOTH"})
record.Set("serverProtocol", nil)
record.Set("serverHash", nil)
record.Set("meta", nil)
@@ -73,14 +70,12 @@ func TestRead_handshake_reponse(t *testing.T) {
}
t.Logf("Encode HandshakeResponse %v", bb.Bytes())


_, err = codecHandshake.Decode(bytes.NewReader(bb.Bytes()))
if err != nil {
t.Fatal(err)
}


transceiver, err := netty.NewTransceiver(netty.Config{})
transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT})
if err != nil {
t.Fatal(err)
}
@@ -95,120 +90,150 @@ func TestRead_handshake_reponse(t *testing.T) {
if err != nil {
t.Fatal(err)
}

}

type Conn struct {
bytes.Buffer
}
func (c *Conn) Close() error {
return nil
}

func TestWrite_call_request(t *testing.T) {
//t.SkipNow()

func TestWrite_call_request(t * testing.T) {
//t.SkipNow()
transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT})

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
}
requestor := NewRequestor(protocol, transceiver)

bb := new(bytes.Buffer)
datum, err := protocol.NewRecord("AvroFlumeEvent")
if err != nil {
t.Fatal(err)
}

headers := make(map[string]interface{})
headers["host_header"] = "127.0.0.1"
datum.Set("headers", headers)
datum.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"))

requestor.write_call_request("append", datum, bb)
// conn.Write(bb.Bytes())
t.Logf("\nCall_request size %v %v\n", bb.Len(), bb.Bytes())
t.Logf("\nCall_request %v\n", bb.String())

codec, err := protocol.MessageRequestCodec("append")
if err != nil {
t.Fatal(err)
}
value, err := codec.Decode(bb)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(datum, value) {
t.Fatalf("Request not equals to ref %x, %x", datum, value)
}
}

func TestWrite_call_requestHeader(t * testing.T) {
//t.SkipNow()
transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT})

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
}
requestor := NewRequestor(protocol, transceiver)

bb := new(bytes.Buffer)

requestor.write_call_requestHeader("append", bb)

refHeader := []byte("\x00\x0c\x61\x70\x70\x65\x6e\x64")
bytes := bb.Bytes()
if !reflect.DeepEqual(refHeader, bytes) {
t.Fatalf("Request_Header not equals to ref %n%x, %n%x", len(refHeader), refHeader, len(bytes), bytes)
}
}

func TestRead_call_responseMessage(t * testing.T) {

transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT})

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
}
requestor := NewRequestor(protocol, transceiver)

codec, err := protocol.MessageResponseCodec("append")
if err != nil {
t.Fatal(err)
}
bb := new(bytes.Buffer)
codec.Encode(bb, Enum{"Status", "OK"})
t.Logf("Bytes for OK %x", bb.Bytes())

err = requestor.read_call_responseMessage("append", bb)
if err != nil {
t.Fatal(err)
}

codec.Encode(bb, Enum{"Status", "FAILED"})
t.Logf("Bytes for FAILED %x", bb.Bytes())
err = requestor.read_call_responseMessage("append", bb)
if err == nil || err.Error() != "Reponse failure. status == FAILED" {
t.Fatalf("Status FAILED can return error")
}

}


const (
RECV_BUF_LEN = 1024
NETWORK = "tcp"
HOST = "127.0.0.1"
PORT=6666
ADDR="127.0.0.1:6666"
)

transceiver, err := netty.NewTransceiver(netty.Config{})
buf := &Conn{}
transceiver.Conn = buf

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
func init() {
numProcs := runtime.NumCPU()
if numProcs < 2 {
numProcs = 2
}
requestor := NewRequestor(protocol, transceiver)
runtime.GOMAXPROCS(numProcs)

bb := new(bytes.Buffer)
datum, err := protocol.NewRecord("AvroFlumeEvent")
listener, err := net.Listen(NETWORK, "0.0.0.0:"+strconv.Itoa(PORT))
if err != nil {
t.Fatal(err)
}

headers := make(map[string]interface{})
headers["host_header"] = "127.0.0.1"
datum.Set("headers", headers)
datum.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"))



requestor.write_call_request("append", datum,bb)
// conn.Write(bb.Bytes())
t.Logf("\nCall_request size %v %v\n",bb.Len(), bb.Bytes())
t.Logf("\nCall_request %v\n", bb.String())

codec, err := protocol.MessageRequestCodec("append")
if err != nil {
t.Fatal(err)
}
value, err := codec.Decode(bb)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(datum, value) {
t.Fatalf("Request not equals to ref %x, %x", datum, value)
println("error listening:", err.Error())
}
go func() {
for {
conn, err := listener.Accept()
if err != nil {
println("Error accept:", err.Error())
return
}
go EchoFunc(conn)
}
}()
}

func TestWrite_call_requestHeader(t *testing.T) {
//t.SkipNow()
transceiver, err := netty.NewTransceiver(netty.Config{})
buf := &Conn{}
transceiver.Conn = buf

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
}
requestor := NewRequestor(protocol, transceiver)

bb := new(bytes.Buffer)

requestor.write_call_requestHeader("append", bb)

refHeader := []byte("\x00\x0c\x61\x70\x70\x65\x6e\x64")
bytes := bb.Bytes()
if !reflect.DeepEqual(refHeader, bytes) {
t.Fatalf("Request_Header not equals to ref %n%x, %n%x", len(refHeader), refHeader, len(bytes), bytes)
}
}

func TestRead_call_responseMessage(t *testing.T) {


transceiver, err := netty.NewTransceiver(netty.Config{})
buf := &Conn{}
transceiver.Conn = buf

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
func EchoFunc(conn net.Conn) {
for {
buf := make([]byte, RECV_BUF_LEN)
n, err := conn.Read(buf)
if err != nil {
println("Error reading:", err.Error())
return
}
println("received ", n, " bytes of data =", string(buf))
n, err = conn.Write(buf)
if err != nil {
println("Error writing:", err.Error())
return
}
println("sended ", n, " bytes of data =", string(buf))
}
requestor := NewRequestor(protocol, transceiver)


codec, err := protocol.MessageResponseCodec("append")
if err != nil {
t.Fatal(err)
}
bb := new(bytes.Buffer)
codec.Encode(bb, Enum{"Status", "OK"})
t.Logf("Bytes for OK %x",bb.Bytes() )


err = requestor.read_call_responseMessage("append", bb)
if err != nil {
t.Fatal(err)
}

codec.Encode(bb, Enum{"Status", "FAILED"})
t.Logf("Bytes for FAILED %x",bb.Bytes() )
err = requestor.read_call_responseMessage("append", bb)
if err == nil || err.Error() != "Reponse failure. status == FAILED"{
t.Fatalf("Status FAILED can return error")
}

}


25 changes: 24 additions & 1 deletion transceiver/Tranceiver.go
Original file line number Diff line number Diff line change
@@ -3,9 +3,32 @@ package transceiver
import (
"bytes"
"io"
"time"
)

type WriteHandshake func() ([]byte, error)
type ReadHandshake func(io.Reader) (bool, error)
type Transceiver interface {
Transceive(request []bytes.Buffer) ([]io.Reader, error)
InitHandshake(WriteHandshake, ReadHandshake )



}



type Config struct {
Port int `json:"port"`
Host string `json:"host"`
Network string `json:"network"`
SocketPath string `json:"socket_path"`
Timeout time.Duration `json:"timeout"`
AsyncConnect bool `json:"async_connect"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
InitialCap int `json:"initial_cap"`
MaxCap int `json:"max_cap"`
}

}
46 changes: 46 additions & 0 deletions transceiver/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package transceiver

import (
"io"
"net"
"strconv"
)

type HandShakeConn interface {
io.ReadWriteCloser
IsChecked() bool
Checked(bool)
GetConn() (net.Conn, error)
}

type Connection struct {
net.Conn
checked bool
bad bool
}


func NewConnection(config Config) (*Connection, error) {

conn := &Connection{}
var err error
switch config.Network {
case "tcp":
conn.Conn, err = net.DialTimeout(config.Network, config.Host+":"+strconv.Itoa(config.Port), config.Timeout)
case "unix":
conn.Conn, err = net.DialTimeout(config.Network, config.SocketPath, config.Timeout)
default:
err = net.UnknownNetworkError(config.Network)
}

return conn, err
}

func (c *Connection) Checked(check bool) {
c.checked = check
}

func (c *Connection) IsChecked() bool{
return c.checked
}

250 changes: 60 additions & 190 deletions transceiver/netty/Netty.go
Original file line number Diff line number Diff line change
@@ -2,108 +2,85 @@ package netty

import (
"bytes"
"net"
"encoding/binary"
"fmt"
"io"
"sync"
"time"
"strconv"
"math"
"os"
"github.com/sebglon/goavro/transceiver"
"fmt"
)

type NettyTransceiver struct {
transceiver.Pool
pool *transceiver.Pool
mu sync.Mutex
pending []byte
alreadyCalled bool
writeHandShake transceiver.WriteHandshake
readHandshake transceiver.ReadHandshake
}
func NewTransceiver(config transceiver.Config) (f* NettyTransceiver, err error){
f = &NettyTransceiver{}
f.pool, err = transceiver.NewPool(config)
return
}

func (t NettyTransceiver) InitHandshake(writer transceiver.WriteHandshake,reader transceiver.ReadHandshake ) {
t.writeHandShake=writer
t.readHandshake=reader
}

const (
defaultHost = "127.0.0.1"
defaultNetwork = "tcp"
defaultSocketPath = ""
defaultPort = 63001
defaultTimeout = 3 * time.Second
defaultBufferLimit = 8 * 1024 * 1024
defaultRetryWait = 500
defaultMaxRetry = 13
defaultReconnectWaitIncreRate = 1.5
)

type Config struct {
NettyPort int `json:"netty_port"`
NettyHost string `json:"netty_host"`
NettyNetwork string `json:"netty_network"`
NettySocketPath string `json:"netty_socket_path"`
Timeout time.Duration `json:"timeout"`
AsyncConnect bool `json:"async_connect"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
}

func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){
nettyFrame := new(bytes.Buffer)
t.Pack(nettyFrame, requests)
conn, pc, err := t.Pool.Conn()

type NettyTransceiver struct {
Config
Conn io.ReadWriteCloser
reconnecting bool
mu sync.Mutex
pending []byte
}
func NewTransceiver(config Config) (f* NettyTransceiver, err error){
if config.NettyNetwork == "" {
config.NettyNetwork = defaultNetwork
}
if config.NettyHost == "" {
config.NettyHost = defaultHost
}
if config.NettyPort == 0 {
config.NettyPort = defaultPort
}
if config.NettySocketPath == "" {
config.NettySocketPath = defaultSocketPath
}
if config.Timeout == 0 {
config.Timeout = defaultTimeout
if err!=nil {
return nil, err
}
if config.BufferLimit == 0 {
config.BufferLimit = defaultBufferLimit
}
if config.RetryWait == 0 {
config.RetryWait = defaultRetryWait
}
if config.MaxRetry == 0 {
config.MaxRetry = defaultMaxRetry
defer pc.Close()

if !conn.IsChecked() {
frame0 := requests[0]
handshake, err := t.writeHandShake()
if err!=nil {
return nil, err
}

requests[0].Reset()
_, err = requests[0].Write(append( handshake, frame0.Bytes()...))
if err!=nil {
return nil, err
}
}
if config.AsyncConnect {
f = &NettyTransceiver{Config: config, reconnecting: true}
f.reconnect()
} else {
f = &NettyTransceiver{Config: config, reconnecting: false}
err = f.connect()

bodyBytes, err := t.pool.Call(conn, pc, nettyFrame.Bytes())
if err != nil {
return nil, err
}
return
}

func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){
nettyFrame := new(bytes.Buffer)
t.Pack(nettyFrame, requests)

// Send request
t.pending = append(t.pending, nettyFrame.Bytes()...)
if err := t.send(); err != nil {
t.close()
if len(t.pending) > t.Config.BufferLimit {
t.flushBuffer()

resps, err := t.Unpack(bodyBytes)
if err != nil {
return nil, err
}

if !conn.IsChecked() {
ok, err := t.readHandshake(resps[0])
if err!=nil {
return nil, err
}
conn.Checked(ok)
if !ok {
return nil, fmt.Errorf("Fail to validate Handshake")
}
return resps[1:], nil
} else {
t.flushBuffer()
return resps, nil
}

// Read Response
bodyBytes := make([]byte, 1024)
t.receive(bodyBytes)
// if err!=nil {
// return nil, fmt.Errorf("Fail to read on socket %v", err)
// }
return t.Unpack(bodyBytes)
}

func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) {
@@ -142,110 +119,3 @@ func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) {

return result, nil
}

// connect establishes a new connection using the specified transport.
func (f *NettyTransceiver) connect() (error) {
var err error
switch f.Config.NettyNetwork {
case "tcp":
f.Conn, err = net.DialTimeout(f.Config.NettyNetwork, f.Config.NettyHost+":"+strconv.Itoa(f.Config.NettyPort), f.Config.Timeout)
case "unix":
f.Conn, err = net.DialTimeout(f.Config.NettyNetwork, f.Config.NettySocketPath, f.Config.Timeout)
default:
err = net.UnknownNetworkError(f.Config.NettyNetwork)
}
return err
}


func e(x, y float64) int {
return int(math.Pow(x, y))
}

func (f *NettyTransceiver) reconnect() {
go func() {

for i := 0; ; i++ {

err := f.connect()
if err == nil {
f.mu.Lock()
f.reconnecting = false
f.mu.Unlock()
break
} else {
if i == f.Config.MaxRetry {
panic("Netty#reconnect: failed to reconnect!")
}
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
}
}
}()
}

func (f *NettyTransceiver) flushBuffer() {
f.mu.Lock()
defer f.mu.Unlock()
f.pending = f.pending[0:0]
}


// Close closes the connection.
func (f *NettyTransceiver) Close() (err error) {
if len(f.pending) > 0 {
err = f.send()
}
f.close()
return
}

// close closes the connection.
func (f *NettyTransceiver) close() (err error) {
if f.Conn != nil {
f.mu.Lock()
defer f.mu.Unlock()
} else {
return
}
if f.Conn != nil {
f.Conn.Close()
f.Conn = nil
}
return
}

func (f *NettyTransceiver) send() (err error) {
if f.Conn == nil {
if f.reconnecting == false {
f.mu.Lock()
f.reconnecting = true
f.mu.Unlock()
f.reconnect()
}
err = fmt.Errorf("Netty#send: can't send logs, client is reconnecting")
} else {
f.mu.Lock()
_, err = f.Conn.Write(f.pending)
f.mu.Unlock()
}
return
}

func (f *NettyTransceiver) receive(resp []byte) (err error) {

if f.Conn == nil {
if f.reconnecting == false {
f.mu.Lock()
f.reconnecting = true
f.mu.Unlock()
f.reconnect()
}
err = fmt.Errorf("Netty#receive: can't send logs, client is reconnecting")
} else {
f.mu.Lock()
_, err = f.Conn.Read(resp)
f.mu.Unlock()
}
return
}
32 changes: 19 additions & 13 deletions transceiver/netty/Netty_test.go
Original file line number Diff line number Diff line change
@@ -8,19 +8,19 @@ import (
"io/ioutil"
"runtime"
"net"
"github.com/sebglon/goavro/transceiver"

"strconv"
)

const (
RECV_BUF_LEN = 1024
NETWORK = "tcp"
HOST = "127.0.0.1"
PORT=6666
ADDR="127.0.0.1:6666"
)

type Conn struct {
bytes.Buffer
}

func (c *Conn) Close() error {
return nil
}

func init() {
numProcs := runtime.NumCPU()
@@ -29,7 +29,7 @@ func init() {
}
runtime.GOMAXPROCS(numProcs)

listener, err := net.Listen("tcp", "0.0.0.0:6666")
listener, err := net.Listen(NETWORK, "0.0.0.0:"+strconv.Itoa(PORT))
if err != nil {
println("error listening:", err.Error())
}
@@ -54,24 +54,30 @@ func EchoFunc(conn net.Conn) {
return
}
println("received ", n, " bytes of data =", string(buf))
n, err = conn.Write(buf)
if err != nil {
println("Error writing:", err.Error())
return
}
println("sended ", n, " bytes of data =", string(buf))
}
}

func TestTransceive(t *testing.T) {
f := &NettyTransceiver{Config: Config{}, reconnecting: false}
f, err := NewTransceiver(transceiver.Config{Network:NETWORK, SocketPath:ADDR, Host:HOST, Port:PORT})
if err != nil {
t.Fatal(err)
}

buf := &Conn{}
f.Conn = buf

msg := "This is test writing."
bmsg := make([]bytes.Buffer, 1)
bmsg[0] = *bytes.NewBuffer([]byte(msg))

resp, err := f.Transceive(bmsg)
if err != nil {
t.Error(err)
t.Fatal(err.Error())
}

brcv := make([]byte, len([]byte(msg)))
resp[0].Read(brcv)
rcv := string(brcv)
135 changes: 135 additions & 0 deletions transceiver/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package transceiver

import (
"gopkg.in/fatih/pool.v2"
"net"
"fmt"
"sync"
"errors"
"log"
"time"
)


var (
errPoolClosed = errors.New("Avro transceiver: Pool Closed")
)
type Pool struct {
Config
pool pool.Pool
mu sync.RWMutex
closed bool
}

const (
defaultHost = "127.0.0.1"
defaultNetwork = "tcp"
defaultSocketPath = ""
defaultPort = 63001
defaultTimeout = 3 * time.Second
defaultBufferLimit = 8 * 1024 * 1024
defaultRetryWait = 500
defaultMaxRetry = 13
defaultInitialCap = 2
defaultMaxCap = 5
defaultReconnectWaitIncreRate = 1.5
)

func NewPool(config Config) (*Pool, error) {
if config.Network == "" {
config.Network = defaultNetwork
}
if config.Host == "" {
config.Host = defaultHost
}
if config.Port == 0 {
config.Port = defaultPort
}
if config.SocketPath == "" {
config.SocketPath = defaultSocketPath
}
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.BufferLimit == 0 {
config.BufferLimit = defaultBufferLimit
}
if config.RetryWait == 0 {
config.RetryWait = defaultRetryWait
}
if config.MaxRetry == 0 {
config.MaxRetry = defaultMaxRetry
}
if config.InitialCap == 0 {
config.InitialCap = defaultInitialCap
}
if config.MaxCap == 0 {
config.MaxCap = defaultMaxCap
}
p, err := pool.NewChannelPool(config.InitialCap,config.MaxCap, func() (net.Conn, error) {
conn, err := NewConnection(config)
if err != nil {
return nil, fmt.Errorf("\nFail to init connec, %#v \n%v",config,err)
}
return conn, err
})
if err != nil {
return nil, err
}
return &Pool{
pool: p,
Config: config,
}, nil

}

func (p *Pool) Conn() (*Connection, *pool.PoolConn, error) {
p.mu.RLock()
defer p.mu.RUnlock()

if p.closed {
return nil, nil, errPoolClosed
}


nc, err := p.pool.Get()
if err != nil {
return nil, nil, err
}

log.Printf(" %T %#v", nc,nc)

pc, ok := nc.(*pool.PoolConn)
if !ok {
// This should never happen!
return nil, nil, fmt.Errorf("Invalid connection in pool")
}

conn, ok := pc.Conn.(*Connection)
if !ok {
// This should never happen!
return nil, nil, fmt.Errorf("Invalid connection in pool")
}

return conn, pc, nil
}

func (p *Pool) Call(conn *Connection, pc *pool.PoolConn, req []byte) (resp []byte, err error) {
if err != nil {
return
}
defer pc.Close()

_,err= conn.Write(req)

if err != nil {
return nil, err
}
resp = make([]byte, 1024)
_,err = conn.Read(resp)

if err != nil {
return nil, err
}
return
}

0 comments on commit 435858f

Please sign in to comment.