Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
pref: use redis nx to avoid hotspot invalid (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkskj authored Aug 28, 2023
1 parent ecb8ada commit 7ca64b9
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 48 deletions.
91 changes: 72 additions & 19 deletions cmd/interaction/dal/cache/comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func AddComment(ctx context.Context, key string, comment *db.Comment) (err error
}

func AddComments(ctx context.Context, key string, comments *[]db.Comment) (err error) {
var zComments []redis.Z
for _, comment := range *comments {
data, err := comment.MarshalMsg(nil)
zComments := make([]redis.Z, len(*comments))
for i := 0; i < len(*comments); i++ {
data, err := (*comments)[i].MarshalMsg(nil)
if err != nil {
klog.Error(err)
return err
}
zComments = append(zComments, redis.Z{Score: float64(comment.CreatedAt.Unix()), Member: data})
zComments[i] = redis.Z{Score: float64((*comments)[i].CreatedAt.Unix()), Member: data}
}
pipe := RedisClient.TxPipeline()
err = pipe.ZAdd(ctx, key, zComments...).Err()
Expand Down Expand Up @@ -98,6 +98,35 @@ func AddComments(ctx context.Context, key string, comments *[]db.Comment) (err e
return err
}

func AddNoData(ctx context.Context, key string) (err error) {
zData := redis.Z{}
pipe := RedisClient.TxPipeline()
err = pipe.ZAdd(ctx, key, zData).Err()
if err != nil {
klog.Error(err)
return err
}
err = pipe.Expire(ctx, key, constants.NoDataExpiredTime).Err()
if err != nil {
klog.Error(err)
return err
}
cmders, err := pipe.Exec(ctx)
if err != nil {
klog.Error(err)
return err
}
for _, cmder := range cmders {
err = cmder.Err()
if err != nil {
klog.Error(err)
return err
}
}
klog.Infof("Add NoData: videoId %v \n", key)
return err
}

func DeleteComment(ctx context.Context, key string, comment *db.Comment) (err error) {
data, err := comment.MarshalMsg(nil)
if err != nil {
Expand All @@ -113,24 +142,28 @@ func DeleteComment(ctx context.Context, key string, comment *db.Comment) (err er
return
}

func CountComments(ctx context.Context, key string) (count int64, err error) {
lastTime, err := RedisClient.TTL(ctx, key).Result()
if err != nil {
return 0, err
func GetCount(ctx context.Context, key string) (ok bool, count string, err error) {
count, err = RedisClient.Get(ctx, GetCountKey(key)).Result()
if err == redis.Nil {
klog.Infof("Count comment: videoId %v count nil ! \n", key)
return false, count, nil
}
if lastTime < constants.CommentExpiredTime/2 {
err = RedisClient.Expire(ctx, key, constants.CommentExpiredTime).Err()
if err != nil {
return 0, err
}
if err == nil {
klog.Infof("Count comment: videoId %v count %v \n", key, count)
} else {
klog.Error(err)
}
count, err = RedisClient.ZCard(ctx, key).Result()
return true, count, err
}

func SetCount(ctx context.Context, key string, count int64) (err error) {
err = RedisClient.Set(ctx, GetCountKey(key), count, constants.CommentExpiredTime).Err()
if err != nil {
klog.Error(err)
} else {
klog.Infof("Count comment: videoId %v count %v \n", key, count)
return err
}
return
klog.Infof("Set Count: videoId %v \n", key)
return err
}

func IsExistComment(ctx context.Context, key string) (exist int64, err error) {
Expand All @@ -143,12 +176,32 @@ func IsExistComment(ctx context.Context, key string) (exist int64, err error) {
return
}

func DeleteComments(ctx context.Context, key string) (err error) {
func Delete(ctx context.Context, key string) (err error) {
err = RedisClient.Del(ctx, key).Err()
if err != nil {
klog.Error(err)
} else {
klog.Infof("Delete comments: videoId %v \n", key)
klog.Infof("Delete : %v \n", key)
}
return
}

func Lock(ctx context.Context, key string) (ok bool, err error) {
ok, err = RedisClient.SetNX(ctx, key, 1, constants.LockTime).Result()
if err != nil {
klog.Error(err)
} else {
klog.Infof("Lock: set %v %v \n", key, ok)
}
return
}

func AddCount(ctx context.Context, increment int64, videoID string) (err error) {
err = RedisClient.IncrBy(ctx, GetCountKey(videoID), increment).Err()
if err != nil {
klog.Error(err)
} else {
klog.Infof("Add count: videoId %v increment %v\n", videoID, increment)
}
return
}
18 changes: 17 additions & 1 deletion cmd/interaction/dal/cache/keys.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package cache

import "fmt"
import (
"fmt"

"github.com/ozline/tiktok/pkg/constants"
)

var keyPattern = "%s::%d"

Expand All @@ -11,3 +15,15 @@ func GetVideoKey(videoID int64) string {
func GetUserKey(userID int64) string {
return fmt.Sprintf(keyPattern, "favorite", userID)
}

func GetCommentNXKey(videoID string) string {
return fmt.Sprintf("%s:%s", constants.CommentNXKey, videoID)
}

func GetCountKey(videoID string) string {
return fmt.Sprintf("%s:%s", constants.CommentCountKey, videoID)
}

func GetCountNXKey(videoID string) string {
return fmt.Sprintf("%s:%s", constants.CountNXKey, videoID)
}
4 changes: 2 additions & 2 deletions cmd/interaction/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *InteractionServiceImpl) CommentList(ctx context.Context, req *interacti
return resp, nil
}

commentsResp, err := service.NewInteractionService(ctx).GetComments(req)
commentsResp, err := service.NewInteractionService(ctx).GetComments(req, 0)

if err != nil {
resp.Base = pack.BuildBaseResp(err)
Expand All @@ -169,7 +169,7 @@ func (s *InteractionServiceImpl) CommentCount(ctx context.Context, req *interact
}
}

count, err := service.NewInteractionService(ctx).CountComments(req)
count, err := service.NewInteractionService(ctx).CountComments(req, 0)
if err != nil {
resp.Base = pack.BuildBaseResp(err)
return resp, nil
Expand Down
42 changes: 35 additions & 7 deletions cmd/interaction/service/count_comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,57 @@ package service

import (
"strconv"
"time"

"github.com/cloudwego/kitex/pkg/klog"

"github.com/ozline/tiktok/pkg/constants"

"github.com/ozline/tiktok/cmd/interaction/dal/cache"
"github.com/ozline/tiktok/cmd/interaction/dal/db"
"github.com/ozline/tiktok/kitex_gen/interaction"
)

func (s *InteractionService) CountComments(req *interaction.CommentCountRequest) (count int64, err error) {
func (s *InteractionService) CountComments(req *interaction.CommentCountRequest, times int) (count int64, err error) {
videoId := req.VideoId

key := strconv.FormatInt(videoId, 10)
exist, err := cache.IsExistComment(s.ctx, key)
exist, rCount, err := cache.GetCount(s.ctx, key)
if err != nil {
return 0, err
}

if exist == 1 {
count, err = cache.CountComments(s.ctx, key)
if exist {
count, err = strconv.ParseInt(rCount, 10, 64)
} else {
lockKey := cache.GetCountNXKey(key)
ok, err := cache.Lock(s.ctx, lockKey)
if err != nil {
return 0, err
}
if !ok && times < constants.MaxRetryTimes {
klog.Infof("count %v times", times+1)
time.Sleep(constants.LockWaitTime)
return s.CountComments(req, times+1)
}
count, err = db.CountCommentsByVideoID(s.ctx, videoId)
if err != nil {
return 0, err
}
err = cache.SetCount(s.ctx, key, count)
if err != nil {
return 0, err
}
if ok {
err = cache.Delete(s.ctx, lockKey)
if err != nil {
return 0, err
}
}
}

if err != nil {
return 0, err
if count < 0 {
count = 0
}
return
return count, err
}
2 changes: 1 addition & 1 deletion cmd/interaction/service/count_comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestCommentCount(t *testing.T) {
VideoId: 1,
Token: &token,
}
_, err := interactionService.CountComments(req)
_, err := interactionService.CountComments(req, 0)
if err != nil {
t.Logf("err: [%v] \n", err)
t.Error(err)
Expand Down
15 changes: 12 additions & 3 deletions cmd/interaction/service/create_comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,21 @@ func (s *InteractionService) CreateComment(req *interaction.CommentActionRequest
klog.Error(e)
}
}()
exist, err := cache.IsExistComment(ctx, key)
err := cache.Delete(ctx, key)
return err
})
eg.Go(func() error {
defer func() {
if e := recover(); e != nil {
klog.Error(e)
}
}()
ok, _, err := cache.GetCount(ctx, key)
if err != nil {
return err
}
if exist == 1 {
err = cache.DeleteComments(ctx, key)
if ok {
err = cache.AddCount(ctx, 1, key)
}
return err
})
Expand Down
15 changes: 12 additions & 3 deletions cmd/interaction/service/delete_comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,21 @@ func (s *InteractionService) DeleteComment(req *interaction.CommentActionRequest
klog.Error(e)
}
}()
exist, err := cache.IsExistComment(s.ctx, key)
err := cache.Delete(s.ctx, key)
return err
})
eg.Go(func() error {
defer func() {
if e := recover(); e != nil {
klog.Error(e)
}
}()
ok, _, err := cache.GetCount(ctx, key)
if err != nil {
return err
}
if exist == 1 {
err = cache.DeleteComments(s.ctx, key)
if ok {
err = cache.AddCount(ctx, -1, key)
}
return err
})
Expand Down
33 changes: 29 additions & 4 deletions cmd/interaction/service/get_comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import (
"strconv"
"time"

"github.com/ozline/tiktok/pkg/constants"

"github.com/cloudwego/kitex/pkg/klog"

"github.com/ozline/tiktok/cmd/interaction/dal/cache"
"github.com/ozline/tiktok/cmd/interaction/dal/db"
"github.com/ozline/tiktok/cmd/interaction/pack"
"github.com/ozline/tiktok/cmd/interaction/rpc"
"github.com/ozline/tiktok/kitex_gen/interaction"
"github.com/ozline/tiktok/kitex_gen/user"
"github.com/ozline/tiktok/pkg/constants"
"golang.org/x/sync/errgroup"
)

func (s *InteractionService) GetComments(req *interaction.CommentListRequest) ([]*interaction.Comment, error) {
func (s *InteractionService) GetComments(req *interaction.CommentListRequest, times int) ([]*interaction.Comment, error) {
var comments []db.Comment
key := strconv.FormatInt(req.VideoId, 10)
exist, err := cache.IsExistComment(s.ctx, key)
Expand All @@ -30,6 +28,9 @@ func (s *InteractionService) GetComments(req *interaction.CommentListRequest) ([
if err != nil {
return nil, err
}
if len(*rComments) == 1 && (*rComments)[0].Score == 0 {
return []*interaction.Comment{}, nil
}
for i := 0; i < len(*rComments); i++ {
var comment db.Comment
_, err = comment.UnmarshalMsg([]byte((*rComments)[i].Member.(string)))
Expand All @@ -40,6 +41,17 @@ func (s *InteractionService) GetComments(req *interaction.CommentListRequest) ([
comments = append(comments, comment)
}
} else {
lockKey := cache.GetCommentNXKey(key)
ok, err := cache.Lock(s.ctx, lockKey)
if err != nil {
return nil, err
}
if !ok && times < constants.MaxRetryTimes {
klog.Infof("get %v times", times+1)
time.Sleep(constants.LockWaitTime)
return s.GetComments(req, times+1)
}

comments, err = db.GetCommentsByVideoID(s.ctx, req.VideoId)
if err != nil {
return nil, err
Expand All @@ -50,6 +62,19 @@ func (s *InteractionService) GetComments(req *interaction.CommentListRequest) ([
if err != nil {
return nil, err
}
} else {
klog.Infof("no comments for video %d", req.VideoId)
err = cache.AddNoData(s.ctx, key)
if err != nil {
return nil, err
}
}

if ok {
err = cache.Delete(s.ctx, lockKey)
if err != nil {
return nil, err
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/interaction/service/get_comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func TestCommentsGet(t *testing.T) {
Token: token,
}

_, err = interactionService.GetComments(req2)
_, err = interactionService.GetComments(req2, 0)

if err != nil {
t.Logf("err: [%v] \n", err)
t.Error(err)
t.Fail()
}
_, err = interactionService.GetComments(req2)
_, err = interactionService.GetComments(req2, 0)

if err != nil {
t.Logf("err: [%v] \n", err)
Expand Down
Loading

0 comments on commit 7ca64b9

Please sign in to comment.