From 6a35b16e14b8503b9a119cb5d3ee4b9f3e3e01f5 Mon Sep 17 00:00:00 2001 From: zwwhdls Date: Sat, 14 Dec 2024 00:30:05 +0800 Subject: [PATCH] make post meilisearch sync Signed-off-by: zwwhdls --- api/doc.go | 153 +++++++++++++++++++----------------- cmd/main.go | 4 + pkg/service/chain.go | 3 +- pkg/store/docstore/meili.go | 68 +++++++++------- pkg/utils/util.go | 29 +++++++ 5 files changed, 159 insertions(+), 98 deletions(-) diff --git a/api/doc.go b/api/doc.go index 1a15caa..a63c9fa 100644 --- a/api/doc.go +++ b/api/doc.go @@ -19,6 +19,7 @@ package api import ( "fmt" "strconv" + "strings" "github.com/gin-gonic/gin" @@ -44,6 +45,9 @@ func (s *HttpServer) store() gin.HandlerFunc { // store the document doc := body.ToDocument() if err := s.chain.Store(c, doc); err != nil { + if strings.Contains(err.Error(), "already exists") { + return + } c.String(500, fmt.Sprintf("store document error: %s", err)) return } @@ -123,85 +127,22 @@ func (s *HttpServer) get() gin.HandlerFunc { func (s *HttpServer) filter() gin.HandlerFunc { return func(c *gin.Context) { - namespace := c.Param("namespace") - page, err := strconv.Atoi(c.DefaultQuery("page", "0")) - if err != nil { - c.String(400, fmt.Sprintf("invalid page number: %s", c.Query("page"))) - return - } - pageSize, err := strconv.Atoi(c.DefaultQuery("pageSize", "10")) - if err != nil { - c.String(400, fmt.Sprintf("invalid pagesize: %s", c.Query("page"))) + docQuery := getFilterQuery(c) + if docQuery == nil { return } - docQuery := DocQuery{ - Namespace: namespace, - Source: c.Query("source"), - WebUrl: c.Query("webUrl"), - ParentID: c.Query("parentID"), - Search: c.Query("search"), - HitsPerPage: int64(pageSize), - Page: int64(page), - Sort: c.DefaultQuery("sort", "createdAt"), - Desc: c.DefaultQuery("desc", "false") == "true", - } - createAtStart := c.Query("createAtStart") - if createAtStart != "" { - createAtStartTimestamp, err := strconv.Atoi(createAtStart) - if err != nil { - c.String(400, fmt.Sprintf("invalid createAtStart: %s", c.Query("page"))) - return - } - docQuery.CreatedAtStart = utils.ToPtr(int64(createAtStartTimestamp)) - } - createAtEnd := c.Query("createAtEnd") - if createAtEnd != "" { - createAtEndTimestamp, err := strconv.Atoi(createAtEnd) - if err != nil { - c.String(400, fmt.Sprintf("invalid createAtEnd: %s", c.Query("page"))) - return - } - docQuery.ChangedAtEnd = utils.ToPtr(int64(createAtEndTimestamp)) - } - updatedAtStart := c.Query("updatedAtStart") - if updatedAtStart != "" { - updatedAtStartTimestamp, err := strconv.Atoi(updatedAtStart) - if err != nil { - c.String(400, fmt.Sprintf("invalid updatedAtStart: %s", c.Query("page"))) - return - } - docQuery.ChangedAtStart = utils.ToPtr(int64(updatedAtStartTimestamp)) - } - updatedAtEnd := c.Query("updatedAtEnd") - if updatedAtEnd != "" { - updatedAtEndTimestamp, err := strconv.Atoi(updatedAtEnd) - if err != nil { - c.String(400, fmt.Sprintf("invalid updatedAtEnd: %s", c.Query("page"))) - return - } - docQuery.ChangedAtEnd = utils.ToPtr(int64(updatedAtEndTimestamp)) - } - fuzzyName := c.Query("fuzzyName") - if fuzzyName != "" { - docQuery.FuzzyName = &fuzzyName - } - if c.Query("unRead") != "" { - docQuery.UnRead = utils.ToPtr(c.Query("unRead") == "true") - } - if c.Query("mark") != "" { - docQuery.Mark = utils.ToPtr(c.Query("mark") == "true") - } docs, err := s.chain.Search(c, docQuery.ToQuery(), docQuery.GetAttrQueries()) if err != nil { c.String(500, fmt.Sprintf("search document error: %s", err)) return } + + var docWithAttrs []DocumentWithAttr ids := []string{} for _, doc := range docs { ids = append(ids, doc.EntryId) } - - allAttrs, err := s.chain.ListDocumentAttrs(c, namespace, ids) + allAttrs, err := s.chain.ListDocumentAttrs(c, docQuery.Namespace, ids) if err != nil { c.String(500, fmt.Sprintf("list document attrs error: %s", err)) return @@ -214,7 +155,6 @@ func (s *HttpServer) filter() gin.HandlerFunc { attrsMap[attr.EntryId] = append(attrsMap[attr.EntryId], attr) } - var docWithAttrs []DocumentWithAttr for _, document := range docs { docWithAttr := DocumentWithAttr{Document: document} attrs := attrsMap[document.EntryId] @@ -233,11 +173,84 @@ func (s *HttpServer) filter() gin.HandlerFunc { } docWithAttrs = append(docWithAttrs, docWithAttr) } - c.JSON(200, docWithAttrs) } } +func getFilterQuery(c *gin.Context) *DocQuery { + namespace := c.Param("namespace") + page, err := strconv.Atoi(c.DefaultQuery("page", "0")) + if err != nil { + c.String(400, fmt.Sprintf("invalid page number: %s", c.Query("page"))) + return nil + } + pageSize, err := strconv.Atoi(c.DefaultQuery("pageSize", "10")) + if err != nil { + c.String(400, fmt.Sprintf("invalid pagesize: %s", c.Query("page"))) + return nil + } + + docQuery := DocQuery{ + Namespace: namespace, + Source: c.Query("source"), + WebUrl: c.Query("webUrl"), + ParentID: c.Query("parentID"), + Search: c.Query("search"), + HitsPerPage: int64(pageSize), + Page: int64(page), + Sort: c.DefaultQuery("sort", "createdAt"), + Desc: c.DefaultQuery("desc", "false") == "true", + } + + createAtStart := c.Query("createAtStart") + if createAtStart != "" { + createAtStartTimestamp, err := strconv.Atoi(createAtStart) + if err != nil { + c.String(400, fmt.Sprintf("invalid createAtStart: %s", c.Query("page"))) + return nil + } + docQuery.CreatedAtStart = utils.ToPtr(int64(createAtStartTimestamp)) + } + createAtEnd := c.Query("createAtEnd") + if createAtEnd != "" { + createAtEndTimestamp, err := strconv.Atoi(createAtEnd) + if err != nil { + c.String(400, fmt.Sprintf("invalid createAtEnd: %s", c.Query("page"))) + return nil + } + docQuery.ChangedAtEnd = utils.ToPtr(int64(createAtEndTimestamp)) + } + updatedAtStart := c.Query("updatedAtStart") + if updatedAtStart != "" { + updatedAtStartTimestamp, err := strconv.Atoi(updatedAtStart) + if err != nil { + c.String(400, fmt.Sprintf("invalid updatedAtStart: %s", c.Query("page"))) + return nil + } + docQuery.ChangedAtStart = utils.ToPtr(int64(updatedAtStartTimestamp)) + } + updatedAtEnd := c.Query("updatedAtEnd") + if updatedAtEnd != "" { + updatedAtEndTimestamp, err := strconv.Atoi(updatedAtEnd) + if err != nil { + c.String(400, fmt.Sprintf("invalid updatedAtEnd: %s", c.Query("page"))) + return nil + } + docQuery.ChangedAtEnd = utils.ToPtr(int64(updatedAtEndTimestamp)) + } + fuzzyName := c.Query("fuzzyName") + if fuzzyName != "" { + docQuery.FuzzyName = &fuzzyName + } + if c.Query("unRead") != "" { + docQuery.UnRead = utils.ToPtr(c.Query("unRead") == "true") + } + if c.Query("mark") != "" { + docQuery.Mark = utils.ToPtr(c.Query("mark") == "true") + } + return &docQuery +} + func (s *HttpServer) delete() gin.HandlerFunc { return func(c *gin.Context) { namespace := c.Param("namespace") diff --git a/cmd/main.go b/cmd/main.go index e559cb9..01315ff 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,9 +17,12 @@ package main import ( + "path" + "github.com/spf13/cobra" "github.com/basenana/friday/cmd/apps" + "github.com/basenana/friday/config" "github.com/basenana/friday/pkg/utils/logger" ) @@ -36,6 +39,7 @@ func init() { RootCmd.AddCommand(apps.ServeCmd) RootCmd.AddCommand(apps.AgentCmd) + RootCmd.PersistentFlags().StringVar(&config.FilePath, "config", path.Join(config.LocalUserPath(), config.DefaultConfigBase), "friday config file") } func main() { diff --git a/pkg/service/chain.go b/pkg/service/chain.go index 71bad69..1eea671 100644 --- a/pkg/service/chain.go +++ b/pkg/service/chain.go @@ -18,6 +18,7 @@ package service import ( "context" + "fmt" "go.uber.org/zap" @@ -64,7 +65,7 @@ func (c *Chain) Store(ctx context.Context, document *doc.Document) error { return err } else if d != nil { c.Log.Debugf("document already exists: %+v", d.String()) - return nil + return fmt.Errorf("document already exists: %+v", d.String()) } for _, plugin := range c.Plugins { err := plugin.Run(ctx, document) diff --git a/pkg/store/docstore/meili.go b/pkg/store/docstore/meili.go index c955bd8..4ea048d 100644 --- a/pkg/store/docstore/meili.go +++ b/pkg/store/docstore/meili.go @@ -27,6 +27,7 @@ import ( "github.com/basenana/friday/config" "github.com/basenana/friday/pkg/models/doc" + "github.com/basenana/friday/pkg/utils" "github.com/basenana/friday/pkg/utils/logger" ) @@ -56,20 +57,41 @@ func NewMeiliClient(conf config.Config) (DocStoreInterface, error) { index: index, client: client, } + return meiliClient, meiliClient.init() +} + +func (c *MeiliClient) init() error { filterableAttrs := append(doc.DocFilterableAttrs, doc.DocAttrFilterableAttrs...) - t, err := client.Index(conf.MeiliConfig.Index).UpdateFilterableAttributes(&filterableAttrs) + + attrs, err := c.index.GetFilterableAttributes() if err != nil { - return nil, err + return err } - if err = meiliClient.wait(context.TODO(), t.TaskUID); err != nil { - return nil, err + if !utils.Equal(filterableAttrs, attrs) { + t, err := c.index.UpdateFilterableAttributes(&filterableAttrs) + if err != nil { + return err + } + if err = c.wait(context.TODO(), t.TaskUID); err != nil { + return err + } } + sortAttrs := doc.DocSortAttrs - t, err = client.Index(conf.MeiliConfig.Index).UpdateSortableAttributes(&sortAttrs) + crtSortAttrs, err := c.index.GetSortableAttributes() if err != nil { - return nil, err + return err } - return meiliClient, meiliClient.wait(context.TODO(), t.TaskUID) + if !utils.Equal(sortAttrs, crtSortAttrs) { + t, err := c.index.UpdateSortableAttributes(&sortAttrs) + if err != nil { + return err + } + if err = c.wait(context.TODO(), t.TaskUID); err != nil { + return err + } + } + return nil } func (c *MeiliClient) Store(ctx context.Context, docPtr doc.DocPtrInterface) error { @@ -79,11 +101,9 @@ func (c *MeiliClient) Store(ctx context.Context, docPtr doc.DocPtrInterface) err c.log.Error(err) return err } - go func() { - if err := c.wait(ctx, task.TaskUID); err != nil { - c.log.Errorf("store document with entryId %s error: %s", docPtr.EntryID(), err) - } - }() + if err := c.wait(ctx, task.TaskUID); err != nil { + c.log.Errorf("store document with entryId %s error: %s", docPtr.EntryID(), err) + } return nil } @@ -134,11 +154,9 @@ func (c *MeiliClient) Update(ctx context.Context, document *doc.Document) error c.log.Error(err) return err } - go func() { - if err := c.wait(ctx, t.TaskUID); err != nil { - c.log.Errorf("update document %s error: %s", document.ID, err) - } - }() + if err := c.wait(ctx, t.TaskUID); err != nil { + c.log.Errorf("update document %s error: %s", document.ID, err) + } return nil } @@ -149,11 +167,9 @@ func (c *MeiliClient) Delete(ctx context.Context, docId string) error { c.log.Error(err) return err } - go func() { - if err := c.wait(ctx, t.TaskUID); err != nil { - c.log.Errorf("delete document %s error: %s", docId, err) - } - }() + if err := c.wait(ctx, t.TaskUID); err != nil { + c.log.Errorf("delete document %s error: %s", docId, err) + } return nil } @@ -169,11 +185,9 @@ func (c *MeiliClient) DeleteByFilter(ctx context.Context, aqs doc.DocumentAttrQu c.log.Error(err) return err } - go func() { - if err := c.wait(ctx, t.TaskUID); err != nil { - c.log.Errorf("delete document by filter error: %s", err) - } - }() + if err := c.wait(ctx, t.TaskUID); err != nil { + c.log.Errorf("delete document by filter error: %s", err) + } return nil } diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 78b5186..efe88b3 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -16,6 +16,35 @@ package utils +import ( + "reflect" + "sort" +) + func ToPtr[T any](t T) *T { return &t } + +func Equal(a []string, b *[]string) bool { + if b == nil { + return false + } + aa := deDup(a) + bb := deDup(*b) + sort.Sort(sort.StringSlice(aa)) + sort.Sort(sort.StringSlice(bb)) + return reflect.DeepEqual(aa, bb) +} + +func deDup(res []string) []string { + uniqueMap := make(map[string]bool) + result := []string{} + + for _, str := range res { + if _, exists := uniqueMap[str]; !exists { + uniqueMap[str] = true + result = append(result, str) + } + } + return result +}