Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

fix:the message consume once problem #134

Merged
merged 15 commits into from
Sep 7, 2023
7 changes: 3 additions & 4 deletions cmd/chat/dal/db/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/cloudwego/kitex/pkg/klog"
"github.com/ozline/tiktok/kitex_gen/chat"
"github.com/ozline/tiktok/pkg/errno"
"gorm.io/gorm"
"gorm.io/hints"
)
Expand All @@ -26,7 +25,7 @@ type MiddleMessage struct {
ToUserId int64
FromUserId int64
Content string
IsRead int
IsReadNum []int64
CreatedAt string
}
type MessageBuild struct {
Expand All @@ -44,10 +43,10 @@ func GetMessageList(ctx context.Context, to_user_id int64, from_user_id int64) (
Find(&messageListFormMysql).Error
if err != nil {
// add some logs
klog.Error("err happen ==>", err)
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errno.UserExistedError
return nil, errors.New("user not found")
}
klog.Errorf("get message_list error: %v\n", err)
return nil, err
}
// 回写redis --先返回信息,然后送到mq进行异步处理
Expand Down
2 changes: 1 addition & 1 deletion cmd/chat/dal/mq/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MiddleMessage struct {
ToUserId int64
FromUserId int64
Content string
IsRead int
IsReadNum []int64
CreatedAt string
}

Expand Down
31 changes: 19 additions & 12 deletions cmd/chat/service/get_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ import (

// Get Messages history list
func (c *ChatService) GetMessages(req *chat.MessageListRequest, user_id int64) ([]*db.Message, error) {
mq.Mu.Lock()
defer mq.Mu.Unlock()
messageList := make(db.MessageArray, 0)
// redis ZSET
// RedisDB.WithContext(ctx)
key := strconv.FormatInt(req.ToUserId, 10) + "-" + strconv.FormatInt(user_id, 10)
revkey := strconv.FormatInt(user_id, 10) + "-" + strconv.FormatInt(req.ToUserId, 10)
no_empty := 0
mq.Mu.Lock()
defer mq.Mu.Unlock()
msg_array, err := cacheMessageDeal(c.ctx, key, &no_empty)
msg_array, err := cacheMessageDeal(c.ctx, key, revkey, &no_empty, user_id)
if err != nil {
klog.Error(err)
return nil, err
}
messageList = append(messageList, msg_array...)
rev_msg_array, err := cacheMessageDeal(c.ctx, revkey, &no_empty)
rev_msg_array, err := cacheMessageDeal(c.ctx, revkey, key, &no_empty, user_id)
if err != nil {
klog.Error(err)
return nil, err
Expand All @@ -47,7 +47,7 @@ func (c *ChatService) GetMessages(req *chat.MessageListRequest, user_id int64) (
klog.Info("no new")
return nil, nil
}
klog.Info("db search")
klog.Info("db search no_empty==", no_empty, " len(message_List)=", messageList.Len())
messages, err := db.GetMessageList(c.ctx, req.ToUserId, user_id)
if err != nil {
klog.Error(err)
Expand All @@ -67,12 +67,12 @@ func (c *ChatService) GetMessages(req *chat.MessageListRequest, user_id int64) (
}

for _, val := range message {
val.IsRead = 1
val.IsReadNum = append(val.IsReadNum, user_id)
mes, _ := sonic.Marshal(val)
key := strconv.FormatInt(val.FromUserId, 10) + "-" + strconv.FormatInt(val.ToUserId, 10)
cre_time, _ := time.ParseInLocation(time.RFC3339, val.CreatedAt, time.Local)

err := cache.RedisDB.HSet(context.TODO(), key, cre_time.UnixMilli(), mes).Err()
err := cache.MessageInsert(c.ctx, key, revkey, cre_time.UnixMilli(), string(mes))
if err != nil {
klog.Info(err)
continue
Expand All @@ -81,7 +81,7 @@ func (c *ChatService) GetMessages(req *chat.MessageListRequest, user_id int64) (
return messages, nil
}

func cacheMessageDeal(ctx context.Context, key string, isempty *int) (db.MessageArray, error) {
func cacheMessageDeal(ctx context.Context, key string, revkey string, isempty *int, user_id int64) (db.MessageArray, error) {
msg_array := make(db.MessageArray, 0)
if ok := cache.MessageExist(ctx, key); ok != 0 {
// 查询 a->b的消息
Expand All @@ -104,15 +104,22 @@ func cacheMessageDeal(ctx context.Context, key string, isempty *int) (db.Message
klog.Error(err)
return err
}
if tempMessage.IsRead == 1 {
return nil
for _, id := range tempMessage.IsReadNum {
if id == 0 {
continue
} else if id == user_id {
return nil
}
}
// if tempMessage.IsRead == 1 {
// return nil
// }
err = db.Convert(message, tempMessage)
if err != nil {
klog.Error(err)
return err
}
tempMessage.IsRead = 1
tempMessage.IsReadNum = append(tempMessage.IsReadNum, user_id)
redis_msg, err := sonic.Marshal(tempMessage)
if err != nil {
klog.Error(err)
Expand All @@ -123,7 +130,7 @@ func cacheMessageDeal(ctx context.Context, key string, isempty *int) (db.Message
klog.Error(err)
return err
}
err = cache.RedisDB.HSet(ctx, key, cre_time.UnixMilli(), redis_msg).Err()
err = cache.MessageInsert(ctx, key, revkey, cre_time.UnixMilli(), string(redis_msg))
if err != nil {
klog.Error(err)
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/chat/service/send_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (c *ChatService) SendMessage(req *chat.MessagePostRequest, user_id int64, c
Id: db.SF.NextVal(),
ToUserId: req.ToUserId,
FromUserId: user_id,
IsRead: 0,
IsReadNum: make([]int64, 0),
Content: req.Content,
CreatedAt: create_at,
}
Expand Down
27 changes: 22 additions & 5 deletions test/chat/get_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,37 @@ func testGetMessage(t *testing.T) {
}
}
func testPolling(t *testing.T) {
token, err := utils.CreateToken(from_user_id)
f_token, err := utils.CreateToken(from_user_id)
if err != nil {
t.Error(err)
t.Fail()
}
req := &chat.MessageListRequest{
Token: token,
t_token, err := utils.CreateToken(to_user_id)
if err != nil {
t.Error(err)
t.Fail()
}
f_req := &chat.MessageListRequest{
Token: f_token,
ToUserId: to_user_id,
}
t_req := &chat.MessageListRequest{
Token: t_token,
ToUserId: from_user_id,
}
res := make([]*db.Message, 0)
var eg errgroup.Group
for i := 0; i < 10; i++ {
eg.Go(func() error {
resp, err := chatService.GetMessages(req, from_user_id)
resp, err := chatService.GetMessages(f_req, from_user_id)
if err != nil {
return err
}
res = append(res, resp...)
return nil
})
eg.Go(func() error {
resp, err := chatService.GetMessages(t_req, to_user_id)
if err != nil {
return err
}
Expand All @@ -58,7 +75,7 @@ func testPolling(t *testing.T) {
}
for _, v := range res {
t.Log("=======================")
t.Log(v.Content)
t.Log("content===>", v.Content)
}
t.Log("count===>", len(res))
}
Expand Down
4 changes: 2 additions & 2 deletions test/chat/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
)

var (
from_user_id int64 = 2
to_user_id int64 = 3
from_user_id int64 = 3
to_user_id int64 = 2
content_get string = "test get"
content_post string = "test post"
create_at string = time.Now().Format(time.RFC3339)
Expand Down
Loading