Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat ipc #62

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Correct test failed
Sébastien GLON committed Sep 6, 2016

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 279575f00737ac416eba978a823b9af3cc310070
20 changes: 14 additions & 6 deletions transceiver/netty/Netty.go
Original file line number Diff line number Diff line change
@@ -7,11 +7,11 @@ import (
"sync"
"github.com/sebglon/goavro/transceiver"
"fmt"
"log"
)

type NettyTransceiver struct {
transceiver.Pool
pool *transceiver.Pool
mu sync.Mutex
pending []byte
alreadyCalled bool
@@ -20,20 +20,25 @@ type NettyTransceiver struct {
}
func NewTransceiver(config transceiver.Config) (f* NettyTransceiver, err error){
f = &NettyTransceiver{}
f.pool, err = transceiver.NewPool(config)
pool , err := transceiver.NewPool(config)
if err !=nil {
return
}
f.Pool =*pool
return
}

func (t NettyTransceiver) InitHandshake(writer transceiver.WriteHandshake,reader transceiver.ReadHandshake ) {
func (t *NettyTransceiver) InitHandshake(writer transceiver.WriteHandshake,reader transceiver.ReadHandshake ) {
t.writeHandShake=writer
t.readHandshake=reader
}



func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){
func (t *NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){
nettyFrame := new(bytes.Buffer)
t.Pack(nettyFrame, requests)
log.Printf("%#v",t.Pool)
conn, pc, err := t.Pool.Conn()

if err!=nil {
@@ -43,6 +48,9 @@ func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, erro

if !conn.IsChecked() {
frame0 := requests[0]
if t.writeHandShake ==nil {
return nil, fmt.Errorf("InitHandshake not called before Transceive")
}
handshake, err := t.writeHandShake()
if err!=nil {
return nil, err
@@ -55,7 +63,7 @@ func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, erro
}
}

bodyBytes, err := t.pool.Call(conn, pc, nettyFrame.Bytes())
bodyBytes, err := t.Pool.Call(conn, pc, nettyFrame.Bytes())
if err != nil {
return nil, err
}
@@ -67,7 +75,7 @@ func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, erro
return nil, err
}

if !conn.IsChecked() {
if !conn.IsChecked() && len(resps)>1{
ok, err := t.readHandshake(resps[0])
if err!=nil {
return nil, err
8 changes: 5 additions & 3 deletions transceiver/netty/Netty_test.go
Original file line number Diff line number Diff line change
@@ -9,8 +9,8 @@ import (
"runtime"
"net"
"github.com/sebglon/goavro/transceiver"

"strconv"
"io"
)

const (
@@ -64,10 +64,11 @@ func EchoFunc(conn net.Conn) {
}

func TestTransceive(t *testing.T) {
f, err := NewTransceiver(transceiver.Config{Network:NETWORK, SocketPath:ADDR, Host:HOST, Port:PORT})
f, err := NewTransceiver(transceiver.Config{Network:NETWORK, Host:HOST, Port:PORT})
if err != nil {
t.Fatal(err)
}
f.InitHandshake(func()([]byte, error){return make([]byte,1), nil},func(io.Reader)(bool, error){return true, nil})


msg := "This is test writing."
@@ -126,4 +127,5 @@ func TestUnpack(t *testing.T) {
t.Fatalf("Reponse message not equals (0) %x/%x; (1) %x/%x","\x0a", respons[0], "\x0b", respons[1])
}

}
}

7 changes: 5 additions & 2 deletions transceiver/pool.go
Original file line number Diff line number Diff line change
@@ -76,10 +76,13 @@ func NewPool(config Config) (*Pool, error) {
if err != nil {
return nil, err
}
return &Pool{

pool := &Pool{
pool: p,
Config: config,
}, nil
}
log.Printf("%#v",pool.pool)
return pool, nil

}