Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
add: init mq
Browse files Browse the repository at this point in the history
  • Loading branch information
wushiling50 committed Sep 8, 2023
1 parent b57e055 commit daf0943
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
34 changes: 22 additions & 12 deletions cmd/follow/dal/mq/follow_mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,30 @@ func (f *FollowMQ) Consume(ctx context.Context) (<-chan amqp.Delivery, error) {
}

// TODO:sync resolve msg
func (s *SyncFollow) SyncFollowMQ(ctx context.Context) error {
defer FollowMQCli.ReleaseRes()
msgs, err := FollowMQCli.Consume(ctx)
if err != nil {
return err
}

// type SyncFollow struct {
// }
var forever chan struct{}

// func (s *SyncFollow) RunTaskCreate(ctx context.Context) error {
// msgs, err := FollowMQCli.Consume(ctx)
// if err != nil {
// return err
// }
// var forever chan struct{}
go func() {
for msg := range msgs {
klog.Infof("Resolve msg: %s", msg.Body)
// TODO:落库处理
msg.Ack(false)

Check failure on line 90 in cmd/follow/dal/mq/follow_mq.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `msg.Ack` is not checked (errcheck)
}
}()

// //TODO:
<-forever

// <-forever
return nil
}

// return nil
// }
// 释放资源,建议获取实例后配合defer使用
func (f *FollowMQ) ReleaseRes() {
f.conn.Close()
f.channel.Close()
}
16 changes: 16 additions & 0 deletions cmd/follow/dal/mq/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mq

import (
"context"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/ozline/tiktok/pkg/utils"
amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -18,6 +20,9 @@ type FollowMQ struct {
queueName string
}

type SyncFollow struct {
}

var (
Rmq *RabbitMQ
FollowMQCli *FollowMQ
Expand All @@ -43,6 +48,9 @@ func Init() {
klog.Error(err)
return
}

ctx := context.Background()
go run(ctx)
}

func FollowMQInit() (*FollowMQ, error) {
Expand All @@ -57,3 +65,11 @@ func FollowMQInit() (*FollowMQ, error) {
queueName: "FollowQueue", // 加入consts
}, nil
}

func run(ctx context.Context) {
fSync := new(SyncFollow)
err := fSync.SyncFollowMQ(ctx)
if err != nil {
klog.Infof("RunTaskCreate:%s", err)
}
}

0 comments on commit daf0943

Please sign in to comment.