Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
fix: chat service timestamp error (#116)
Browse files Browse the repository at this point in the history
* Update init.go

* Update get_message.go

---------

Co-authored-by: ozline <ozlinex@outlook.com>
  • Loading branch information
XZ0730 and ozline authored Aug 31, 2023
1 parent 780429c commit cb43cdc
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 26 deletions.
10 changes: 9 additions & 1 deletion cmd/chat/dal/cache/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
redis "github.com/redis/go-redis/v9"
)

func MessageInsert(ctx context.Context, key string, score float64, member string) error {
func MessageInsert(ctx context.Context, key string, revkey string, score float64, member string) error {
// 先判断是否key是否存在,如果存在则判断过期时间是否小于十天,小于则加时间,大于则不加时间
if ok := MessageExist(ctx, key); ok != 0 {
err := RedisDB.ZAdd(context.TODO(), key, redis.Z{
Expand All @@ -27,6 +27,10 @@ func MessageInsert(ctx context.Context, key string, score float64, member string
if err != nil {
return err
}
err = RedisDB.Expire(ctx, revkey, time.Hour*24*30).Err()
if err != nil {
return err
}
return nil
}
}
Expand All @@ -42,6 +46,10 @@ func MessageInsert(ctx context.Context, key string, score float64, member string
if err != nil {
return err
}
err = RedisDB.Expire(ctx, revkey, time.Hour*24*30).Err()
if err != nil {
return err
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/chat/dal/cache/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"time"

"github.com/ozline/tiktok/config"
"github.com/panjf2000/ants/v2"
redis "github.com/redis/go-redis/v9"
"gorm.io/gorm"
)

var (
RedisDB *redis.Client
RedisDB *redis.Client
AntsPool *ants.PoolWithFunc
)

type Message struct {
Expand Down
13 changes: 9 additions & 4 deletions cmd/chat/dal/db/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/ozline/tiktok/kitex_gen/chat"
"gorm.io/gorm"
"gorm.io/hints"
)
Expand All @@ -27,9 +28,13 @@ type MiddleMessage struct {
CreatedAt string
UpdatedAt string
}
type MessageBuild struct {
MessageElem *Message
MessageList []*chat.Message
}
type MessageArray []*Message

func GetMessageList(ctx context.Context, to_user_id int64, from_user_id int64) ([]*Message, bool, error) {
func GetMessageList(ctx context.Context, to_user_id int64, from_user_id int64) ([]*Message, error) {
messageListFormMysql := make([]*Message, 0)
err := DB.WithContext(ctx).Clauses(hints.UseIndex("to_user_id", "from_user_id")).
Select("id", "to_user_id", "from_user_id", "content", "created_at").
Expand All @@ -40,12 +45,12 @@ func GetMessageList(ctx context.Context, to_user_id int64, from_user_id int64) (
// add some logs
klog.Info("err happen")
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false, errors.New("user not found")
return nil, errors.New("user not found")
}
return nil, false, err
return nil, err
}
// 回写redis --先返回信息,然后送到mq进行异步处理
return messageListFormMysql, true, nil
return messageListFormMysql, nil
}

func (d *DBAction) InsertMessage(message *Message) error {
Expand Down
2 changes: 2 additions & 0 deletions cmd/chat/dal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/ozline/tiktok/cmd/chat/dal/cache"
"github.com/ozline/tiktok/cmd/chat/dal/db"
"github.com/ozline/tiktok/cmd/chat/dal/mq"
"github.com/ozline/tiktok/pkg/ants"
)

func Init() {
Expand All @@ -12,4 +13,5 @@ func Init() {
mq.InitRabbitMQ()
mq.InitMessageMQ()
mq.InitChatMQ()
ants.Init()
}
3 changes: 2 additions & 1 deletion cmd/chat/dal/mq/chat_mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (c *ChatMQ) DealWithMessageToUser(msg <-chan amqp.Delivery) {
continue
}
key := strconv.FormatInt(message.FromUserId, 10) + "-" + strconv.FormatInt(message.ToUserId, 10)
err = cache.MessageInsert(context.TODO(), key, float64(message.CreatedAt.Unix()), string(req.Body))
revkey := strconv.FormatInt(message.ToUserId, 10) + "-" + strconv.FormatInt(message.FromUserId, 10)
err = cache.MessageInsert(context.TODO(), key, revkey, float64(message.CreatedAt.Unix()), string(req.Body))
if err != nil {
klog.Info(err)
continue
Expand Down
35 changes: 35 additions & 0 deletions cmd/chat/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"context"
"testing"
"time"

"github.com/ozline/tiktok/cmd/chat/dal"
"github.com/ozline/tiktok/config"
"github.com/ozline/tiktok/kitex_gen/chat"
"github.com/ozline/tiktok/pkg/utils"
)

func TestHadnlerGet(t *testing.T) {
t.Log("result===>")
config.InitForTest()
dal.Init()
msi := new(MessageServiceImpl)
token, err := utils.CreateToken(3)
if err != nil {
t.Error(err)
t.Fail()
}
mlr, err := msi.MessageList(context.Background(), &chat.MessageListRequest{
Token: token,
ToUserId: 2,
})
if err != nil {
t.Error(err)
t.Fail()
}
t.Log("result===>", mlr.MessageList)
t.Log("result===>", mlr.Total)
time.Sleep(2 * time.Second)
}
15 changes: 8 additions & 7 deletions cmd/chat/pack/chat.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
package pack

import (
"time"
"fmt"

"github.com/ozline/tiktok/cmd/chat/dal/db"
"github.com/ozline/tiktok/kitex_gen/chat"
)

type MessageBuildArray []*chat.Message

func BuildMessage(data []*db.Message) []*chat.Message {
if data == nil {
return make([]*chat.Message, 0)
}

res := make([]*chat.Message, 0)
res := make(MessageBuildArray, 0)
for _, val := range data {
create_time := val.CreatedAt.Format(time.DateTime)
message := &chat.Message{
create_at := fmt.Sprintf("%v", val.CreatedAt.UnixMilli())
msg := &chat.Message{
Id: val.Id,
ToUserId: val.ToUserId,
FromUserId: val.FromUserId,
Content: val.Content,
CreateTime: &create_time,
CreateTime: &create_at,
}
res = append(res, message)
res = append(res, msg)
}
return res
}
22 changes: 10 additions & 12 deletions cmd/chat/service/get_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,21 @@ func (c *ChatService) GetMessages(req *chat.MessageListRequest, user_id int64) (
return messageList, nil
}

messages, ok, err := db.GetMessageList(c.ctx, req.ToUserId, user_id)
messages, err := db.GetMessageList(c.ctx, req.ToUserId, user_id)
if err != nil {
klog.Info(err)
return nil, err
}

mq_message, err := sonic.Marshal(messages)
if err != nil {
klog.Info(err)
return nil, err
}
if ok {
mq_message, err := sonic.Marshal(messages)
if err != nil {
klog.Info(err)
return nil, err
}
err = mq.MessageMQCli.Publish(c.ctx, string(mq_message))
if err != nil {
klog.Info(err)
return messages, err
}
err = mq.MessageMQCli.Publish(c.ctx, string(mq_message))
if err != nil {
klog.Info(err)
return messages, err
}

return messages, nil
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
github.com/kitex-contrib/registry-etcd v0.1.0
github.com/kitex-contrib/tracer-opentracing v0.0.3
github.com/opentracing/opentracing-go v1.2.0
github.com/panjf2000/ants v1.3.0
github.com/panjf2000/ants/v2 v2.8.1
github.com/rabbitmq/amqp091-go v1.8.1
github.com/redis/go-redis/v9 v9.0.5
github.com/spf13/viper v1.16.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,10 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ=
github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
23 changes: 23 additions & 0 deletions pkg/ants/ants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ants

import (
"errors"
"sync"

"github.com/panjf2000/ants"
)

var (
AntsPool *ants.PoolWithFunc
Wg sync.WaitGroup
)

func Init() {
ants_Pool, err := ants.NewPoolWithFunc(500, func(payload interface{}) {
defer Wg.Done()
})
if err != nil {
panic(errors.New("[ants goroutine init error]"))
}
AntsPool = ants_Pool
}

0 comments on commit cb43cdc

Please sign in to comment.