Skip to content

Commit

Permalink
feat: support use porter without consul
Browse files Browse the repository at this point in the history
  • Loading branch information
MuZhou233 committed Jul 19, 2024
1 parent ea37298 commit 30efdaf
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 89 deletions.
8 changes: 4 additions & 4 deletions app/sephirah/cmd/sephirah/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 24 additions & 17 deletions app/sephirah/internal/client/porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"fmt"

"github.com/tuihub/librarian/internal/conf"
"github.com/tuihub/librarian/internal/lib/libapp"
Expand All @@ -16,53 +15,61 @@ import (

type Porter struct {
porter.LibrarianPorterServiceClient
checker *libapp.HealthChecker
checker libapp.HealthChecker
}

func NewPorter(
client porter.LibrarianPorterServiceClient,
consul *conf.Consul,
porter *conf.Porter,
) (*Porter, error) {
checker, err := libapp.NewHealthChecker("porter", consul)
if err != nil {
return nil, err
}
if libapp.IsEmptyHealthChecker(checker) {
checker, err = newStaticDiscovery(porter)
if err != nil {
return nil, err
}
}
return &Porter{
LibrarianPorterServiceClient: client,
checker: checker,
}, nil
}

func (p *Porter) GetServiceAddresses(ctx context.Context) ([]string, error) {
instances, err := p.checker.GetAliveInstances()
if err != nil {
return nil, err
}
res := make([]string, 0, len(instances))
for _, instance := range instances {
res = append(res, fmt.Sprintf("%s:%d", instance.Service.Address, instance.Service.Port))
}
return res, nil
return p.checker.GetAliveInstances()
}

func NewPorterClient(c *conf.Consul) (porter.LibrarianPorterServiceClient, error) {
func NewPorterClient(c *conf.Consul, p *conf.Porter, app *libapp.Settings) (porter.LibrarianPorterServiceClient, error) {
r, err := libapp.NewDiscovery(c)
if err != nil {
return nil, err
}
conn, err := grpc.DialInsecure(
context.Background(),
if libapp.IsEmptyDiscovery(r) {
r, err = newStaticDiscovery(p)
if err != nil {
return nil, err
}
}
middlewares := []grpc.ClientOption{
grpc.WithEndpoint("discovery:///porter"),
grpc.WithDiscovery(r),
grpc.WithNodeFilter(
newPorterNameFilter(),
newPorterAddressFilter(),
newPorterFastFailFilter(),
),
grpc.WithMiddleware(
recovery.Recovery(),
),
grpc.WithTimeout(libtime.Minute),
}
if app.EnablePanicRecovery {
middlewares = append(middlewares, grpc.WithMiddleware(recovery.Recovery()))
}
conn, err := grpc.DialInsecure(
context.Background(),
middlewares...,
)
cli := porter.NewLibrarianPorterServiceClient(conn)
return cli, err
Expand Down
78 changes: 78 additions & 0 deletions app/sephirah/internal/client/porter_static_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package client

import (
"context"
"fmt"
"net/url"

"github.com/tuihub/librarian/internal/conf"

"github.com/go-kratos/kratos/v2/registry"
)

type staticDiscovery struct {
serviceInstances []*registry.ServiceInstance
watcherCount int
watcherCh chan struct{}
}

func newStaticDiscovery(c *conf.Porter) (*staticDiscovery, error) {
if c == nil {
c = new(conf.Porter)
}
var serviceInstances []*registry.ServiceInstance
for i, addr := range c.GetAddress() {
parsed, err := url.Parse(addr)
if err != nil {
return nil, fmt.Errorf("porter address %s invalid: %w", addr, err)
}
if parsed.Scheme != "grpc" && parsed.Scheme != "grpcs" {
return nil, fmt.Errorf("porter address %s is not a valid gRPC address", addr)
}
serviceInstances = append(serviceInstances, &registry.ServiceInstance{
ID: fmt.Sprintf("porter-%d", i),
Name: "porter",
Version: "",
Metadata: nil,
Endpoints: []string{addr},
})
}
return &staticDiscovery{
serviceInstances: serviceInstances,
watcherCount: 0,
watcherCh: make(chan struct{}),
}, nil
}

func (s *staticDiscovery) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
return s.serviceInstances, nil
}

func (s *staticDiscovery) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
return s, nil
}

func (s *staticDiscovery) Next() ([]*registry.ServiceInstance, error) {
if s.watcherCount > 0 {
<-s.watcherCh
}
s.watcherCount++
return s.serviceInstances, nil
}

func (s *staticDiscovery) Stop() error {
s.watcherCh <- struct{}{}
return nil
}

func (s *staticDiscovery) GetAliveInstances() ([]string, error) {
res := make([]string, 0, len(s.serviceInstances))
for _, instance := range s.serviceInstances {
parsed, err := url.Parse(instance.Endpoints[0])
if err != nil {
continue
}
res = append(res, fmt.Sprintf("%s:%s", parsed.Hostname(), parsed.Port()))
}
return res, nil
}
2 changes: 2 additions & 0 deletions app/sephirah/internal/supervisor/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth"
)

// fixme: current filter methods will lead to calling disabled instances.

func (s *Supervisor) CheckAccountPlatform(platform string) bool {
for _, p := range s.featureSummary.AccountPlatforms {
if p.ID == platform {
Expand Down
6 changes: 4 additions & 2 deletions app/sephirah/internal/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewSupervisor(
refreshMu: sync.Mutex{},
featureSummary: new(modeltiphereth.ServerFeatureSummary),
muFeatureSummary: sync.RWMutex{},
trustedAddresses: c.GetTrustedAddress(),
trustedAddresses: c.GetTrusted(),
systemNotify: systemNotify,
}, nil
}
Expand All @@ -80,7 +80,9 @@ func (s *Supervisor) UpdateKnownInstances(instances []*modeltiphereth.PorterInst
func (s *Supervisor) RefreshAliveInstances(
ctx context.Context,
) ([]*modeltiphereth.PorterInstance, error) {
s.refreshMu.Lock()
if !s.refreshMu.TryLock() {
return nil, errors.New("refreshing")
}
defer s.refreshMu.Unlock()

if s.knownInstances == nil {
Expand Down
4 changes: 2 additions & 2 deletions app/sephirah/pkg/service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions cmd/librarian/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ func searcherClientSelector(
conf *conf.Librarian_EnableServiceDiscovery,
c *conf.Consul,
inproc *inprocgrpc.InprocClients,
app *libapp.Settings,
) (searcher.LibrarianSearcherServiceClient, error) {
if conf.GetSearcher() {
return client.NewSearcherClient(c)
return client.NewSearcherClient(c, app)
}
return inproc.Searcher, nil
}
Expand All @@ -153,9 +154,10 @@ func minerClientSelector(
conf *conf.Librarian_EnableServiceDiscovery,
c *conf.Consul,
inproc *inprocgrpc.InprocClients,
app *libapp.Settings,
) (miner.LibrarianMinerServiceClient, error) {
if conf.GetMiner() {
return client.NewMinerClient(c)
return client.NewMinerClient(c, app)
}
return inproc.Miner, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/librarian/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 28 additions & 19 deletions internal/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,64 @@ import (
"github.com/go-kratos/kratos/v2/transport/grpc"
)

func NewMapperClient(c *conf.Consul) (mapper.LibrarianMapperServiceClient, error) {
func NewMapperClient(c *conf.Consul, app *libapp.Settings) (mapper.LibrarianMapperServiceClient, error) {
r, err := libapp.NewDiscovery(c)
if err != nil {
return nil, err
}
conn, err := grpc.DialInsecure(
context.Background(),
middlewares := []grpc.ClientOption{
grpc.WithEndpoint("discovery:///mapper"),
grpc.WithDiscovery(r),
grpc.WithNodeFilter(libapp.NewNodeFilter()),
grpc.WithMiddleware(
recovery.Recovery(),
),
}
if app.EnablePanicRecovery {
middlewares = append(middlewares, grpc.WithMiddleware(recovery.Recovery()))
}
conn, err := grpc.DialInsecure(
context.Background(),
middlewares...,
)
cli := mapper.NewLibrarianMapperServiceClient(conn)
return cli, err
}

func NewSearcherClient(c *conf.Consul) (searcher.LibrarianSearcherServiceClient, error) {
func NewSearcherClient(c *conf.Consul, app *libapp.Settings) (searcher.LibrarianSearcherServiceClient, error) {
r, err := libapp.NewDiscovery(c)
if err != nil {
return nil, err
}
conn, err := grpc.DialInsecure(
context.Background(),
middlewares := []grpc.ClientOption{
grpc.WithEndpoint("discovery:///searcher"),
grpc.WithDiscovery(r),
grpc.WithNodeFilter(libapp.NewNodeFilter()),
grpc.WithMiddleware(
recovery.Recovery(),
),
}
if app.EnablePanicRecovery {
middlewares = append(middlewares, grpc.WithMiddleware(recovery.Recovery()))
}
conn, err := grpc.DialInsecure(
context.Background(),
middlewares...,
)
cli := searcher.NewLibrarianSearcherServiceClient(conn)
return cli, err
}

func NewMinerClient(c *conf.Consul) (miner.LibrarianMinerServiceClient, error) {
func NewMinerClient(c *conf.Consul, app *libapp.Settings) (miner.LibrarianMinerServiceClient, error) {
r, err := libapp.NewDiscovery(c)
if err != nil {
return nil, err
}
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///porter"),
middlewares := []grpc.ClientOption{
grpc.WithEndpoint("discovery:///miner"),
grpc.WithDiscovery(r),
grpc.WithNodeFilter(libapp.NewNodeFilter()),
grpc.WithMiddleware(
recovery.Recovery(),
),
}
if app.EnablePanicRecovery {
middlewares = append(middlewares, grpc.WithMiddleware(recovery.Recovery()))
}
conn, err := grpc.DialInsecure(
context.Background(),
middlewares...,
)
cli := miner.NewLibrarianMinerServiceClient(conn)
return cli, err
Expand Down
Loading

0 comments on commit 30efdaf

Please sign in to comment.