diff --git a/core/node/rpc/sync/client/local.go b/core/node/rpc/sync/client/local.go index 9e1e646a8..10b8158e2 100644 --- a/core/node/rpc/sync/client/local.go +++ b/core/node/rpc/sync/client/local.go @@ -2,6 +2,7 @@ package client import ( "context" + "github.com/ethereum/go-ethereum/common" "github.com/linkdata/deadlock" . "github.com/towns-protocol/towns/core/node/base" diff --git a/core/node/rpc/sync/client/syncer_set.go b/core/node/rpc/sync/client/syncer_set.go index a3d6568da..9a53ed3f5 100644 --- a/core/node/rpc/sync/client/syncer_set.go +++ b/core/node/rpc/sync/client/syncer_set.go @@ -177,7 +177,6 @@ func (ss *SyncerSet) Run() { ss.muSyncers.Unlock() ss.syncerTasks.Wait() // background syncers finished -> safe to close messages channel - close(ss.messages) // close will cause the sync operation to send the SYNC_CLOSE message to the client } func (ss *SyncerSet) AddInitialStreams() { diff --git a/core/node/rpc/sync/operation.go b/core/node/rpc/sync/operation.go index 4d29ad11e..924f27037 100644 --- a/core/node/rpc/sync/operation.go +++ b/core/node/rpc/sync/operation.go @@ -95,8 +95,10 @@ func (syncOp *StreamSyncOperation) Run( ) error { log := logging.FromCtx(syncOp.ctx).With("syncId", syncOp.SyncID) + messagesSendToClient := 0 + log.Info("Stream sync operation start") - defer log.Info("Stream sync operation stopped") + defer log.Infow("Stream sync operation stopped", "send", messagesSendToClient) cookies, err := client.ValidateAndGroupSyncCookies(req.Msg.GetSyncPos()) if err != nil { @@ -131,6 +133,8 @@ func (syncOp *StreamSyncOperation) Run( return err } + messagesSendToClient++ + log.Debug("Pending messages in sync operation", "count", len(messages)) case <-syncOp.ctx.Done():