Skip to content

Commit

Permalink
fix(yesod): tested pull feed
Browse files Browse the repository at this point in the history
  • Loading branch information
MuZhou233 committed Jul 1, 2024
1 parent 0576ac4 commit e3cb3c2
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 45 deletions.
2 changes: 1 addition & 1 deletion app/sephirah/cmd/sephirah/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion app/sephirah/internal/biz/bizangela/angela.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type AngelaRepo interface {
UpsertAppInfos(context.Context, []*modelgebura.AppInfo) error
AccountPurchaseAppInfos(context.Context, model.InternalID, []model.InternalID) error
UpsertFeed(context.Context, *modelfeed.Feed) error
UpsertFeedItems(context.Context, []*modelfeed.Item, model.InternalID) ([]string, error)
CheckNewFeedItems(context.Context, []*modelfeed.Item, model.InternalID) ([]string, error)
UpsertFeedItems(context.Context, []*modelfeed.Item, model.InternalID) error
UpdateFeedPullStatus(context.Context, *modelyesod.FeedConfig) error
GetFeedItem(context.Context, model.InternalID) (*modelfeed.Item, error)
GetFeedActions(context.Context, model.InternalID) ([]*modelyesod.FeedActionSet, error)
Expand Down
44 changes: 33 additions & 11 deletions app/sephirah/internal/biz/bizangela/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (
"github.com/tuihub/librarian/app/sephirah/internal/model/converter"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelangela"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach"
"github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod"
"github.com/tuihub/librarian/internal/lib/libcodec"
"github.com/tuihub/librarian/internal/lib/libmq"
"github.com/tuihub/librarian/internal/model/modelfeed"
porter "github.com/tuihub/protos/pkg/librarian/porter/v1"
Expand Down Expand Up @@ -88,7 +86,7 @@ func NewPullFeedTopic( //nolint:gocognit // TODO
fc.LatestPullMessage = fmt.Sprintf("UpsertFeed failed: %s", err.Error())
return err
}
newItemGUIDs, err := a.repo.UpsertFeedItems(ctx, feed.Items, feed.ID)
newItemGUIDs, err := a.repo.CheckNewFeedItems(ctx, feed.Items, feed.ID)
if err != nil {
fc.LatestPullMessage = fmt.Sprintf("UpsertFeedItems failed: %s", err.Error())
return err
Expand All @@ -100,7 +98,7 @@ func NewPullFeedTopic( //nolint:gocognit // TODO
if slices.Contains(newItemGUIDs, item.GUID) {
err = parse.Publish(ctx, modelangela.FeedItemPostprocess{
FeedID: feed.ID,
ItemID: item.ID,
Item: item,
SystemNotify: p.SystemNotify,
})
}
Expand All @@ -116,36 +114,55 @@ func NewPullFeedTopic( //nolint:gocognit // TODO
func NewFeedItemPostprocessTopic( //nolint:gocognit // TODO
a *AngelaBase,
notify *libmq.Topic[modelangela.NotifyRouter],
systemNotify *libmq.Topic[modelnetzach.SystemNotify],
) *libmq.Topic[modelangela.FeedItemPostprocess] {
return libmq.NewTopic[modelangela.FeedItemPostprocess](
"FeedItemPostprocess",
func(ctx context.Context, p *modelangela.FeedItemPostprocess) error {
item, err := a.repo.GetFeedItem(ctx, p.ItemID)
if err != nil {
return err
notifyMsg := p.SystemNotify
if notifyMsg == nil {
notifyMsg = new(modelnetzach.SystemNotify)
}
notifyMsg.Notification.Content = ""
notifyMsg.Notification.Level = modelnetzach.SystemNotificationLevelError
defer func() {
if p.SystemNotify != nil && len(p.SystemNotify.Notification.Content) > 0 {
p.SystemNotify.Notification.Content = fmt.Sprintf(
"[%d] %s",
p.Item.ID,
p.SystemNotify.Notification.Content,
)
_ = systemNotify.PublishFallsLocalCall(ctx, *p.SystemNotify)
}
}()

item := p.Item
actionSets, err := a.repo.GetFeedActions(ctx, p.FeedID)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("GetFeedActions failed: %s", err.Error())
return err
}
builtin := bizyesod.GetBuiltinActionMap(ctx)
item, err = bizyesod.RequiredStartAction(ctx, item)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("RequiredStartAction failed: %s", err.Error())
return err
}
for _, actions := range actionSets {
for _, action := range actions.Actions {
var config modeltiphereth.FeatureRequest
err = libcodec.Unmarshal(libcodec.JSON, []byte(action.ConfigJSON), &config)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("%s Unmarshal failed: %s", action.ID, err.Error())
return err
}
if f, ok := builtin[action.ID]; ok { //nolint:nestif // TODO
item, err = f(ctx, config, item)
item, err = f(ctx, action, item)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("%s Exec failed: %s", action.ID, err.Error())
return err
}
if item == nil {
notifyMsg.Notification.Content = fmt.Sprintf("%s Filtered", action.ID)
notifyMsg.Notification.Level = modelnetzach.SystemNotificationLevelWarning
return nil
}
} else if a.supv.CheckFeedItemAction(action) {
Expand All @@ -158,9 +175,12 @@ func NewFeedItemPostprocessTopic( //nolint:gocognit // TODO
},
)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("%s Exec failed: %s", action.ID, err.Error())
return err
}
if resp.GetItem() != nil {
notifyMsg.Notification.Content = fmt.Sprintf("%s Filtered", action.ID)
notifyMsg.Notification.Level = modelnetzach.SystemNotificationLevelWarning
return nil
}
item = converter.ToBizFeedItem(resp.GetItem())
Expand All @@ -169,10 +189,12 @@ func NewFeedItemPostprocessTopic( //nolint:gocognit // TODO
}
item, err = bizyesod.RequiredEndAction(ctx, item)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("RequiredEndAction failed: %s", err.Error())
return err
}
_, err = a.repo.UpsertFeedItems(ctx, []*modelfeed.Item{item}, p.FeedID)
err = a.repo.UpsertFeedItems(ctx, []*modelfeed.Item{item}, p.FeedID)
if err != nil {
notifyMsg.Notification.Content = fmt.Sprintf("UpsertFeedItems failed: %s", err.Error())
return err
}
_ = notify.Publish(ctx, modelangela.NotifyRouter{
Expand Down
10 changes: 5 additions & 5 deletions app/sephirah/internal/biz/bizyesod/builtin_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func RequiredStartAction(ctx context.Context, item *modelfeed.Item) (*modelfeed.

func GetBuiltinActionMap(
ctx context.Context,
) map[string]func(context.Context, modeltiphereth.FeatureRequest, *modelfeed.Item) (*modelfeed.Item, error) {
return map[string]func(context.Context, modeltiphereth.FeatureRequest, *modelfeed.Item) (*modelfeed.Item, error){
) map[string]func(context.Context, *modeltiphereth.FeatureRequest, *modelfeed.Item) (*modelfeed.Item, error) {
return map[string]func(context.Context, *modeltiphereth.FeatureRequest, *modelfeed.Item) (*modelfeed.Item, error){
simpleKeywordFilterActionID: simpleKeywordFilterAction,
keywordFilterActionID: keywordFilterAction,
descriptionGeneratorActionID: descriptionGeneratorAction,
Expand Down Expand Up @@ -124,7 +124,7 @@ func parseDigestAction(_ context.Context, item *modelfeed.Item) (*modelfeed.Item

func simpleKeywordFilterAction(
_ context.Context,
request modeltiphereth.FeatureRequest,
request *modeltiphereth.FeatureRequest,
item *modelfeed.Item,
) (*modelfeed.Item, error) {
config := new(modelyesod.SimpleKeywordFilterActionConfig)
Expand Down Expand Up @@ -154,12 +154,12 @@ func simpleKeywordFilterAction(
return item, nil
}

func keywordFilterAction(ctx context.Context, _ modeltiphereth.FeatureRequest, item *modelfeed.Item) (*modelfeed.Item, error) {
func keywordFilterAction(ctx context.Context, _ *modeltiphereth.FeatureRequest, item *modelfeed.Item) (*modelfeed.Item, error) {
// TODO: impl
return item, nil
}

func descriptionGeneratorAction(_ context.Context, _ modeltiphereth.FeatureRequest, item *modelfeed.Item) (*modelfeed.Item, error) {
func descriptionGeneratorAction(_ context.Context, _ *modeltiphereth.FeatureRequest, item *modelfeed.Item) (*modelfeed.Item, error) {
if len(item.Description) > 0 {
return item, nil
}
Expand Down
40 changes: 20 additions & 20 deletions app/sephirah/internal/data/angela.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,7 @@ func (a *angelaRepo) UpsertFeed(ctx context.Context, f *modelfeed.Feed) error {
})
}

func (a *angelaRepo) UpsertFeedItems(
ctx context.Context,
items []*modelfeed.Item,
feedID model.InternalID,
) ([]string, error) {
func (a *angelaRepo) CheckNewFeedItems(ctx context.Context, items []*modelfeed.Item, feedID model.InternalID) ([]string, error) {
guids := make([]string, 0, len(items))
for _, item := range items {
guids = append(guids, item.GUID)
Expand All @@ -233,6 +229,24 @@ func (a *angelaRepo) UpsertFeedItems(
if err != nil {
return nil, err
}
existItemMap := make(map[string]bool)
res := make([]string, 0, len(items)-len(existItems))
for _, item := range existItems {
existItemMap[item.GUID] = true
}
for _, item := range items {
if _, exist := existItemMap[item.GUID]; !exist {
res = append(res, item.GUID)
}
}
return res, nil
}

func (a *angelaRepo) UpsertFeedItems(
ctx context.Context,
items []*modelfeed.Item,
feedID model.InternalID,
) error {
il := make([]*ent.FeedItemCreate, len(items))
for i, item := range items {
il[i] = a.data.db.FeedItem.Create().
Expand All @@ -258,7 +272,7 @@ func (a *angelaRepo) UpsertFeedItems(
il[i].SetPublishedParsed(time.Now())
}
}
err = a.data.db.FeedItem.CreateBulk(il...).
return a.data.db.FeedItem.CreateBulk(il...).
OnConflict(
sql.ConflictColumns(feeditem.FieldFeedID, feeditem.FieldGUID),
//
Expand All @@ -269,20 +283,6 @@ func (a *angelaRepo) UpsertFeedItems(
// }),
sql.DoNothing(),
).Exec(ctx)
if err != nil {
return nil, err
}
existItemMap := make(map[string]bool)
res := make([]string, 0, len(items)-len(existItems))
for _, item := range existItems {
existItemMap[item.GUID] = true
}
for _, item := range items {
if _, exist := existItemMap[item.GUID]; !exist {
res = append(res, item.GUID)
}
}
return res, nil
}

func (a *angelaRepo) UpdateFeedPullStatus(ctx context.Context, conf *modelyesod.FeedConfig) error {
Expand Down
2 changes: 1 addition & 1 deletion app/sephirah/internal/data/yesod.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (y *yesodRepo) ListFeedConfigs( //nolint:gocognit //TODO
}
feedConfig.ActionSets = actionSets
res = append(res, &modelyesod.FeedWithConfig{
FeedConfig: converter.ToBizFeedConfig(config),
FeedConfig: feedConfig,
Feed: converter.ToBizFeed(config.Edges.Feed),
})
}
Expand Down
2 changes: 1 addition & 1 deletion app/sephirah/internal/model/modelangela/modelangela.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type NotifyPush struct {

type FeedItemPostprocess struct {
FeedID model.InternalID
ItemID model.InternalID
Item *modelfeed.Item
SystemNotify *modelnetzach.SystemNotify
}

Expand Down
8 changes: 4 additions & 4 deletions app/sephirah/internal/model/modelyesod/builtin_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package modelyesod
import "github.com/invopop/jsonschema"

type SimpleKeywordFilterActionConfig struct {
TitleInclude []string `json:"title_include" jsonschema:"title=Title include"`
TitleExclude []string `json:"title_exclude" jsonschema:"title=Title exclude"`
ContentInclude []string `json:"content_include" jsonschema:"title=Content include"`
ContentExclude []string `json:"content_exclude" jsonschema:"title=Content exclude"`
TitleInclude []string `json:"title_include,omitempty" jsonschema:"title=Title include"`
TitleExclude []string `json:"title_exclude,omitempty" jsonschema:"title=Title exclude"`
ContentInclude []string `json:"content_include,omitempty" jsonschema:"title=Content include"`
ContentExclude []string `json:"content_exclude,omitempty" jsonschema:"title=Content exclude"`
}

func GetSimpleKeywordFilterActionConfigSchema() (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion app/sephirah/pkg/service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e3cb3c2

Please sign in to comment.