Skip to content

Commit

Permalink
Merge pull request #173 from SiaFoundation/nate/add-event-reporter
Browse files Browse the repository at this point in the history
Add session reporter
  • Loading branch information
n8maninger authored Sep 20, 2023
2 parents 1051268 + 170ff3b commit 1349d5a
Show file tree
Hide file tree
Showing 15 changed files with 604 additions and 517 deletions.
16 changes: 15 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.sia.tech/hostd/host/metrics"
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/wallet"
"go.sia.tech/jape"
"go.sia.tech/siad/modules"
Expand Down Expand Up @@ -106,6 +107,14 @@ type (
AcceptTransactionSet(txns []types.Transaction) error
}

// A RHPSessionReporter reports on RHP session lifecycle events
RHPSessionReporter interface {
Subscribe(rhp.SessionSubscriber)
Unsubscribe(rhp.SessionSubscriber)

Active() []rhp.Session
}

// An api provides an HTTP API for the host
api struct {
hostKey types.PublicKey
Expand All @@ -123,14 +132,15 @@ type (
wallet Wallet
metrics Metrics
settings Settings
sessions RHPSessionReporter

volumeJobs volumeJobs
checks integrityCheckJobs
}
)

// NewServer initializes the API
func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler {
func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, rsr RHPSessionReporter, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler {
api := &api{
hostKey: hostKey,
name: name,
Expand All @@ -145,6 +155,7 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C
metrics: m,
settings: s,
wallet: w,
sessions: rsr,
log: log,

checks: integrityCheckJobs{
Expand Down Expand Up @@ -195,6 +206,9 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C
"DELETE /volumes/:id": api.handleDeleteVolume,
"DELETE /volumes/:id/cancel": api.handleDELETEVolumeCancelOp,
"PUT /volumes/:id/resize": api.handlePUTVolumeResize,
// session endpoints
"GET /sessions": api.handleGETSessions,
"GET /sessions/subscribe": api.handleGETSessionsSubscribe,
// tpool endpoints
"GET /tpool/fee": api.handleGETTPoolFee,
// wallet endpoints
Expand Down
45 changes: 45 additions & 0 deletions api/rhpsessions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package api

import (
"context"
"encoding/json"

"go.sia.tech/hostd/rhp"
"go.sia.tech/jape"
"go.uber.org/zap"
"nhooyr.io/websocket"
)

type rhpSessionSubscriber struct {
conn *websocket.Conn
}

func (rs *rhpSessionSubscriber) ReceiveSessionEvent(event rhp.SessionEvent) {
buf, err := json.Marshal(event)
if err != nil {
return
}
rs.conn.Write(context.Background(), websocket.MessageText, buf)
}

func (a *api) handleGETSessions(c jape.Context) {
c.Encode(a.sessions.Active())
}

func (a *api) handleGETSessionsSubscribe(c jape.Context) {
wsc, err := websocket.Accept(c.ResponseWriter, c.Request, &websocket.AcceptOptions{
OriginPatterns: []string{"*"},
})
if err != nil {
a.log.Warn("failed to accept websocket connection", zap.Error(err))
return
}
defer wsc.Close(websocket.StatusNormalClosure, "")

// subscribe the websocket conn
sub := &rhpSessionSubscriber{
conn: wsc,
}
a.sessions.Subscribe(sub)
defer a.sessions.Unsubscribe(sub)
}
2 changes: 1 addition & 1 deletion cmd/hostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func main() {
auth := jape.BasicAuth(cfg.HTTP.Password)
web := http.Server{
Handler: webRouter{
api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.metrics, node.settings, node.w, log.Named("api"))),
api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.sessions, node.metrics, node.settings, node.w, log.Named("api"))),
ui: hostd.Handler(),
},
ReadTimeout: 30 * time.Second,
Expand Down
16 changes: 10 additions & 6 deletions cmd/hostd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type node struct {
registry *registry.Manager
storage *storage.VolumeManager

sessions *rhp.SessionReporter
rhp2Monitor *rhp.DataRecorder
rhp2 *rhpv2.SessionHandler
rhp3Monitor *rhp.DataRecorder
Expand All @@ -64,17 +65,17 @@ func (n *node) Close() error {
return nil
}

func startRHP2(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cs rhpv2.ChainManager, tp rhpv2.TransactionPool, w rhpv2.Wallet, cm rhpv2.ContractManager, sr rhpv2.SettingsReporter, sm rhpv2.StorageManager, monitor rhp.DataMonitor, log *zap.Logger) (*rhpv2.SessionHandler, error) {
rhp2, err := rhpv2.NewSessionHandler(l, hostKey, rhp3Addr, cs, tp, w, cm, sr, sm, monitor, discardMetricReporter{}, log)
func startRHP2(l net.Listener, hostKey types.PrivateKey, rhp3Addr string, cs rhpv2.ChainManager, tp rhpv2.TransactionPool, w rhpv2.Wallet, cm rhpv2.ContractManager, sr rhpv2.SettingsReporter, sm rhpv2.StorageManager, monitor rhp.DataMonitor, sessions *rhp.SessionReporter, log *zap.Logger) (*rhpv2.SessionHandler, error) {
rhp2, err := rhpv2.NewSessionHandler(l, hostKey, rhp3Addr, cs, tp, w, cm, sr, sm, monitor, sessions, log)
if err != nil {
return nil, err
}
go rhp2.Serve()
return rhp2, nil
}

func startRHP3(l net.Listener, hostKey types.PrivateKey, cs rhpv3.ChainManager, tp rhpv3.TransactionPool, w rhpv3.Wallet, am rhpv3.AccountManager, cm rhpv3.ContractManager, rm rhpv3.RegistryManager, sr rhpv3.SettingsReporter, sm rhpv3.StorageManager, monitor rhp.DataMonitor, log *zap.Logger) (*rhpv3.SessionHandler, error) {
rhp3, err := rhpv3.NewSessionHandler(l, hostKey, cs, tp, w, am, cm, rm, sm, sr, monitor, discardMetricReporter{}, log)
func startRHP3(l net.Listener, hostKey types.PrivateKey, cs rhpv3.ChainManager, tp rhpv3.TransactionPool, w rhpv3.Wallet, am rhpv3.AccountManager, cm rhpv3.ContractManager, rm rhpv3.RegistryManager, sr rhpv3.SettingsReporter, sm rhpv3.StorageManager, monitor rhp.DataMonitor, sessions *rhp.SessionReporter, log *zap.Logger) (*rhpv3.SessionHandler, error) {
rhp3, err := rhpv3.NewSessionHandler(l, hostKey, cs, tp, w, am, cm, rm, sm, sr, monitor, sessions, log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -179,14 +180,16 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva
}
registryManager := registry.NewManager(hostKey, db, logger.Named("registry"))

sessions := rhp.NewSessionReporter()

rhp2Monitor := rhp.NewDataRecorder(&rhp2MonitorStore{db}, logger.Named("rhp2Monitor"))
rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, rhp2Monitor, logger.Named("rhpv2"))
rhp2, err := startRHP2(rhp2Listener, hostKey, rhp3Listener.Addr().String(), cm, tp, w, contractManager, sr, sm, rhp2Monitor, sessions, logger.Named("rhpv2"))
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp2: %w", err)
}

rhp3Monitor := rhp.NewDataRecorder(&rhp3MonitorStore{db}, logger.Named("rhp3Monitor"))
rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, rhp3Monitor, logger.Named("rhpv3"))
rhp3, err := startRHP3(rhp3Listener, hostKey, cm, tp, w, accountManager, contractManager, registryManager, sr, sm, rhp3Monitor, sessions, logger.Named("rhpv3"))
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to start rhp3: %w", err)
}
Expand All @@ -206,6 +209,7 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva
storage: sm,
registry: registryManager,

sessions: sessions,
rhp2Monitor: rhp2Monitor,
rhp2: rhp2,
rhp3Monitor: rhp3Monitor,
Expand Down
86 changes: 0 additions & 86 deletions host/financials/types.go

This file was deleted.

11 changes: 5 additions & 6 deletions internal/test/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/persist/sqlite"
"go.sia.tech/hostd/rhp"
rhpv2 "go.sia.tech/hostd/rhp/v2"
rhpv3 "go.sia.tech/hostd/rhp/v3"
"go.sia.tech/hostd/wallet"
Expand All @@ -26,10 +27,6 @@ import (

const blocksPerMonth = 144 * 30

type stubMetricReporter struct{}

func (stubMetricReporter) Report(any) (_ error) { return }

type stubDataMonitor struct{}

func (stubDataMonitor) ReadBytes(n int) {}
Expand Down Expand Up @@ -214,13 +211,15 @@ func NewHost(privKey types.PrivateKey, dir string, node *Node, log *zap.Logger)
registry := registry.NewManager(privKey, db, log.Named("registry"))
accounts := accounts.NewManager(db, settings)

rhpv2, err := rhpv2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), node.cm, node.tp, wallet, contracts, settings, storage, stubDataMonitor{}, stubMetricReporter{}, log.Named("rhpv2"))
sessions := rhp.NewSessionReporter()

rhpv2, err := rhpv2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), node.cm, node.tp, wallet, contracts, settings, storage, stubDataMonitor{}, sessions, log.Named("rhpv2"))
if err != nil {
return nil, fmt.Errorf("failed to create rhpv2 session handler: %w", err)
}
go rhpv2.Serve()

rhpv3, err := rhpv3.NewSessionHandler(rhp3Listener, privKey, node.cm, node.tp, wallet, accounts, contracts, registry, storage, settings, stubDataMonitor{}, stubMetricReporter{}, log.Named("rhpv3"))
rhpv3, err := rhpv3.NewSessionHandler(rhp3Listener, privKey, node.cm, node.tp, wallet, accounts, contracts, registry, storage, settings, stubDataMonitor{}, sessions, log.Named("rhpv3"))
if err != nil {
return nil, fmt.Errorf("failed to create rhpv3 session handler: %w", err)
}
Expand Down
Loading

0 comments on commit 1349d5a

Please sign in to comment.