diff --git a/tasks/actorstate/market/deal_state.go b/tasks/actorstate/market/deal_state.go index 902046a2d..45217cb8d 100644 --- a/tasks/actorstate/market/deal_state.go +++ b/tasks/actorstate/market/deal_state.go @@ -54,15 +54,6 @@ func (DealStateExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, n return out, nil } - changed, err := ec.CurrState.StatesChanged(ec.PrevState) - if err != nil { - return nil, fmt.Errorf("checking for deal state changes: %w", err) - } - - if !changed { - return nil, nil - } - changes, err := market.DiffDealStates(ctx, ec.Store, ec.PrevState, ec.CurrState) if err != nil { return nil, fmt.Errorf("diffing deal states: %w", err) diff --git a/tasks/messages/parsedmessage/task.go b/tasks/messages/parsedmessage/task.go index dd7f91191..f4e4fcdad 100644 --- a/tasks/messages/parsedmessage/task.go +++ b/tasks/messages/parsedmessage/task.go @@ -147,6 +147,14 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut Method: method, Params: params, } + + sectorIds, err := parseParamsInDetail(method, params) + if err != nil { + log.Errorf("method: %v, params: %v, error: %v", method, params, err) + } + if len(sectorIds) > 0 { + log.Infof("method: %v, params: %v, sector_ids: %v", method, params, sectorIds) + } parsedMessageResults = append(parsedMessageResults, pm) } } diff --git a/tasks/messages/parsedmessage/util.go b/tasks/messages/parsedmessage/util.go new file mode 100644 index 000000000..d9453a23a --- /dev/null +++ b/tasks/messages/parsedmessage/util.go @@ -0,0 +1,85 @@ +package parsedmessage + +import ( + "encoding/json" + + // Other necessary imports, possibly including types from the Lily project + + minertypes "github.com/filecoin-project/go-state-types/builtin/v14/miner" +) + +type SectorNumber struct { + Elemcount int64 + Rle []uint64 +} + +type ProveCommitAggregateParams struct { + AggregateProof []byte + SectorNumbers SectorNumber +} + +func decodeRLE(rle []uint64) []uint64 { + result := make([]uint64, 0) + current := uint64(0) + + for i, count := range rle { + if i%2 == 1 { // Odd indices represent runs of 1s + for j := uint64(0); j < count; j++ { + result = append(result, current+j) + } + } + current += count + } + + return result +} + +func parseParamsInDetail(method string, params string) ([]uint64, error) { + sectorNumbers := []uint64{} + + switch method { + case "ProveCommitAggregate": + var aggregateParams ProveCommitAggregateParams + if err := json.Unmarshal([]byte(params), &aggregateParams); err != nil { + return sectorNumbers, err + } + // Assuming AggregateProveCommitParams has a field SectorNumbers which is a slice + sectorNumbers = decodeRLE(aggregateParams.SectorNumbers.Rle) + + case "ProveCommitSector": + var sectorParams minertypes.ProveCommitSectorParams + if err := json.Unmarshal([]byte(params), §orParams); err != nil { + return sectorNumbers, err + } + sectorNumbers = []uint64{uint64(sectorParams.SectorNumber)} + + case "ProveCommitSectors3": + var sectors3Params minertypes.ProveCommitSectors3Params + if err := json.Unmarshal([]byte(params), §ors3Params); err != nil { + return sectorNumbers, err + } + // Assuming ProveCommitSectors3Params has a field SectorNumbers which is a slice + if len(sectors3Params.SectorActivations) > 0 { + for _, sector := range sectors3Params.SectorActivations { + sectorNumbers = append(sectorNumbers, uint64(sector.SectorNumber)) + } + } + + case "ProveCommitSectorsNI": + var sectorsNIParams minertypes.ProveCommitSectorsNIParams + if err := json.Unmarshal([]byte(params), §orsNIParams); err != nil { + return sectorNumbers, err + } + // Assuming ProveCommitSectorsNIParams has a field SectorNumbers which is a slice + if len(sectorsNIParams.Sectors) > 0 { + for _, sector := range sectorsNIParams.Sectors { + sectorNumbers = append(sectorNumbers, uint64(sector.SealerID)) + } + } + + default: + return sectorNumbers, nil + } + + return sectorNumbers, nil +} diff --git a/tasks/messages/receipt/task.go b/tasks/messages/receipt/task.go index 9eb7d4350..fe8a19c74 100644 --- a/tasks/messages/receipt/task.go +++ b/tasks/messages/receipt/task.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -18,6 +19,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +var log = logging.Logger("lily/tasks/receipt") + type Task struct { node tasks.DataSource } @@ -96,6 +99,8 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut parsedReturn, _, err := util.ParseReturn(rec.Return, msg.VMMessage().Method, toCode) if err == nil { rcpt.ParsedReturn = parsedReturn + } else { + log.Errorf("got error during parsed_return: %v", err) } } receiptResults = append(receiptResults, rcpt)