Skip to content

Commit

Permalink
Merge pull request #31 from snapp-incubator/30-improve-error-handling
Browse files Browse the repository at this point in the history
30 improve error handling
  • Loading branch information
mehditeymorian authored Jun 8, 2022
2 parents 734fa85 + c9579c4 commit f7e4cb0
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 37 deletions.
33 changes: 29 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ReconnectPolicy struct {
RetryInterval int // duration between retry intervals in milliseconds
}

//nolint:funlen
func NewClient(address string, topics []string, config *ClientConfig) (Client, error) {
processedConfig := processConfig(config)

Expand Down Expand Up @@ -68,15 +69,39 @@ func NewClient(address string, topics []string, config *ClientConfig) (Client, e
}

offer := internal.NewOffer(processedConfig.Token, topics)
bytes, _ := json.Marshal(offer) //nolint:errchkjson

stream, _ := connection.OpenUniStream()
bytes, err := json.Marshal(offer)
if err != nil {
log.Printf("failed to marshal offer: %+v\n", err)

return nil, internal.ErrFailedToMarshal
}

_ = internal.WriteData(bytes, stream)
stream, err := connection.OpenUniStream()
if err != nil {
log.Printf("failed to open send stream: %+v\n", err)
internal.CloseClientConnection(connection, internal.CodeFailedToCreateStream, internal.ErrFailedToCreateStream)

return nil, internal.ErrFailedToCreateStream
}

err = internal.WriteData(bytes, stream)
if err != nil {
log.Printf("failed to send offer to server: %+v\n", err)
internal.CloseClientConnection(connection, internal.CodeFailedToSendOffer, internal.ErrFailedToSendOffer)

return nil, internal.ErrFailedToSendOffer
}

_ = stream.Close()

receiveStream, _ := connection.AcceptUniStream(context.Background())
receiveStream, err := connection.AcceptUniStream(context.Background())
if err != nil {
log.Printf("failed to open receive stream: %+v\n", err)
internal.CloseClientConnection(connection, internal.CodeFailedToCreateStream, internal.ErrFailedToCreateStream)

return nil, internal.ErrFailedToCreateStream
}

reader := bufio.NewReader(receiveStream)
go client.AcceptEvents(reader)
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Jeffail/tunny v0.1.4
github.com/go-errors/errors v1.4.2
github.com/lucas-clemente/quic-go v0.27.1
github.com/prometheus/client_golang v1.12.2
github.com/stretchr/testify v1.7.2
)

Expand All @@ -24,7 +25,6 @@ require (
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand All @@ -35,7 +35,6 @@ require (
golang.org/x/tools v0.1.1 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
Expand Down
4 changes: 3 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (c *Client) AcceptEvents(reader *bufio.Reader) {

var event Event
if err = json.Unmarshal(bytes, &event); err != nil {
checkError(err)
log.Printf("failed to unmarshal event: %+v\n", err)

continue
}

switch {
Expand Down
17 changes: 15 additions & 2 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"encoding/json"
"errors"
"log"
)

type Error struct {
Expand All @@ -12,11 +13,20 @@ type Error struct {

const ErrorTopic = "error"

var ErrNotAuthorized = errors.New("not authorized")
var (
ErrNotAuthorized = errors.New("not authorized")
ErrFailedToCreateStream = errors.New("failed to create send/receive stream to client")
ErrFailedToReadOffer = errors.New("failed to read offer from client")
ErrFailedToSendOffer = errors.New("failed to send offer to server")
ErrFailedToMarshal = errors.New("failed to marshal/unmarshal data")
)

const (
CodeNotAuthorized = iota + 1
CodeTopicNotAvailable
CodeFailedToCreateStream
CodeFailedToSendOffer
CodeUnknown
)

func NewErr(code int, data map[string]any) *Error {
Expand All @@ -30,7 +40,10 @@ func UnmarshalError(bytes []byte) Error {
var e Error

if err := json.Unmarshal(bytes, &e); err != nil {
checkError(err)
log.Printf("failed to unmarshal error: %+v\n", err)

e.Code = CodeUnknown
e.Data = make(map[string]any)
}

return e
Expand Down
85 changes: 63 additions & 22 deletions internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,25 @@ func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc) {
func (s *Server) AcceptClients() {
for {
background := context.Background()

connection, err := s.Listener.Accept(background)
checkError(err)
if err != nil {
log.Printf("failed to accept new client: %+v\n", err)

continue
}

log.Println("found a new client")

client := NewSubscriber(connection)
var client *Subscriber

client, err = NewSubscriber(connection)
if err != nil {
log.Printf("failed to handle new subscriber: %+v\n", err)

continue
}

go s.handleClient(client)
}
}
Expand All @@ -90,47 +104,64 @@ func (s *Server) AcceptClients() {
func (s *Server) handleClient(client *Subscriber) {
isValid := s.Authenticator.Authenticate(client.Token)
if !isValid {
log.Println("client is not authenticated")
log.Println("client is not valid")

code := quic.ApplicationErrorCode(CodeNotAuthorized)
err := client.connection.CloseWithError(code, ErrNotAuthorized.Error())
checkError(err)
CloseClientConnection(client.connection, CodeNotAuthorized, ErrNotAuthorized)

return
}

log.Println("client is authenticated")

sendStream, err := client.connection.OpenUniStream()
checkError(err)
if err != nil {
log.Printf("failed to open send stream to client: %+v\n", err)
CloseClientConnection(client.connection, CodeUnknown, err)

return
}

s.addClientTopicsToEventSources(client, sendStream)
}

// addClientTopicsToEventSources adds the client's sendStream to the eventSources.
func (s *Server) addClientTopicsToEventSources(client *Subscriber, sendStream quic.SendStream) {
for _, topic := range client.Topics {
if ok := s.Authorizer.Authorize(client.Token, topic); !ok {
log.Printf("client is not authorized for %s", topic)
valid, err := s.isTopicValid(client, sendStream, topic)
if err != nil {
log.Printf("failed to send error to client: %+v\n", err)

continue
break
}

if _, ok := s.EventSources[topic]; ok {
if valid {
s.EventSources[topic].Subscribers = append(s.EventSources[topic].Subscribers, sendStream)
s.Metrics.IncSubscriber(topic)
} else {
e := NewErr(CodeTopicNotAvailable, map[string]any{
"topic": topic,
})
errBytes, _ := json.Marshal(e) //nolint:errchkjson
errEvent := NewEvent(ErrorTopic, errBytes)
err := WriteData(errEvent, sendStream)
checkError(err)
}
}
}

// isTopicValid check whether topic exists and client is authorized on it or not.
func (s *Server) isTopicValid(client *Subscriber, sendStream quic.SendStream, topic string) (bool, error) {
if _, ok := s.EventSources[topic]; !ok {
log.Printf("topic doesn't exists: %s\n", topic)

err := SendError(sendStream, NewErr(CodeTopicNotAvailable, map[string]any{"topic": topic}))

return false, err
}

if !s.Authorizer.Authorize(client.Token, topic) {
log.Printf("client is not authorized for topic: %s", topic)

err := SendError(sendStream, NewErr(CodeNotAuthorized, map[string]any{"topic": topic}))

return false, err
}

return true, nil
}

// GenerateEventSources generates eventSources for each topic.
func (s *Server) GenerateEventSources(topics []string) {
for _, topic := range topics {
Expand All @@ -143,8 +174,18 @@ func (s *Server) GenerateEventSources(topics []string) {
}
}

func checkError(err error) {
if err != nil {
log.Println(err.Error())
// SendError send input error to client.
func SendError(sendStream quic.SendStream, e *Error) error {
errBytes, _ := json.Marshal(e) //nolint:errchkjson
errEvent := NewEvent(ErrorTopic, errBytes)

return WriteData(errEvent, sendStream)
}

func CloseClientConnection(connection quic.Connection, code int, err error) {
appCode := quic.ApplicationErrorCode(code)

if err = connection.CloseWithError(appCode, err.Error()); err != nil {
log.Printf("failed to close connection with client: %+v\n", err)
}
}
18 changes: 12 additions & 6 deletions internal/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@ func NewOffer(token string, topics []string) Offer {
return Offer{Token: token, Topics: topics}
}

func NewSubscriber(connection quic.Connection) *Subscriber {
func NewSubscriber(connection quic.Connection) (*Subscriber, error) {
stream, err := connection.AcceptUniStream(context.Background())
checkError(err)
if err != nil {
return nil, ErrFailedToCreateStream
}

reader := bufio.NewReader(stream)

bytes, err := reader.ReadBytes(DELIMITER)
checkError(err)
if err != nil {
return nil, ErrFailedToReadOffer
}

var offer Offer
err = json.Unmarshal(bytes, &offer)
checkError(err)
if err := json.Unmarshal(bytes, &offer); err != nil {
return nil, ErrFailedToMarshal
}

return &Subscriber{
connection: connection,
Token: offer.Token,
Topics: offer.Topics,
}
}, nil
}

0 comments on commit f7e4cb0

Please sign in to comment.