Skip to content

Commit

Permalink
feat(cmd): adding an example with pubsub
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <bob@vibioh.fr>
  • Loading branch information
ViBiOh committed Oct 6, 2023
1 parent 76d1cba commit e77cdc1
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
OTEL_SERVICE_NAME=httputils
HTTP_PORT=1080
HTTP_GRACE_DURATION=5s

HTTP_TELEMETRY_URL=
HTTP_REDIS_ADDRESS=
Expand Down
23 changes: 17 additions & 6 deletions cmd/http/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"encoding/json"
"fmt"
"log/slog"
"os"
"syscall"
"time"

"github.com/ViBiOh/httputils/v4/pkg/cron"
"github.com/ViBiOh/httputils/v4/pkg/redis"
amqp "github.com/rabbitmq/amqp091-go"
)

Expand All @@ -18,22 +18,33 @@ func startBackground(ctx context.Context, config configuration, client client, a

var closers []func()

go client.redis.Pull(ctx, "httputils:tasks", func(content string, err error) {
closePubSub := redis.SubscribeFor(ctx, client.redis, "httputils:tasks", func(content time.Time, err error) {
if err != nil {
slog.Error(err.Error())
os.Exit(1)
slog.Error("consume on pubsub", "err", err)

return
}

slog.Info("content=`" + content + "`")
slog.Info("time from pubsub", "content", content)
})

closers = append(closers, func() {
if err := closePubSub(context.Background()); err != nil {
slog.Error("close pubsub", "err", err)
}
})

speakingClock := cron.New().Each(5 * time.Minute).OnSignal(syscall.SIGUSR1).OnError(func(err error) {
speakingClock := cron.New().Each(15 * time.Second).OnSignal(syscall.SIGUSR1).OnError(func(err error) {
slog.Error("run cron", "err", err)
}).Now()

go speakingClock.Start(ctx, func(_ context.Context) error {
slog.Info("Clock is ticking")

if err := client.redis.PublishJSON(ctx, "httputils:tasks", time.Now()); err != nil {
slog.Error("publish on pubsub", "err", err)
}

return nil
})

Expand Down
6 changes: 3 additions & 3 deletions cmd/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func main() {
os.Exit(1)
}

stopBackground := startBackground(ctx, config, client, adapter)
defer stopBackground()

ctxEnd := client.health.End(ctx)

stopBackground := startBackground(ctxEnd, config, client, adapter)
defer stopBackground()

handler := newPort(ctxEnd, config, client, adapter)

appServer := server.New(config.appServer)
Expand Down
4 changes: 1 addition & 3 deletions pkg/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c *Cache[K, V]) subscribe(ctx context.Context) {
return
}

done, close := redis.SubscribeFor(ctx, c.read, c.channel, func(id K, err error) {
close := redis.SubscribeFor(ctx, c.read, c.channel, func(id K, err error) {
slog.Info("evicting from memory cache", "id", id, "channel", c.channel)
c.memory.Delete(id)
})
Expand All @@ -42,6 +42,4 @@ func (c *Cache[K, V]) subscribe(ctx context.Context) {
if err := close(cntxt.WithoutDeadline(ctx)); err != nil {
slog.Error("close subscriber", "err", err)
}

<-done
}
8 changes: 2 additions & 6 deletions pkg/redis/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,15 @@ func (s Service) Subscribe(ctx context.Context, channel string) (<-chan *redis.M
}
}

func SubscribeFor[T any](ctx context.Context, client Subscriber, channel string, handler func(T, error)) (<-chan struct{}, func(context.Context) error) {
func SubscribeFor[T any](ctx context.Context, client Subscriber, channel string, handler func(T, error)) func(context.Context) error {
subscription, unsubscribe := client.Subscribe(ctx, channel)

done := make(chan struct{})

go func() {
defer close(done)

for item := range subscription {
var instance T
handler(instance, json.Unmarshal([]byte(item.Payload), &instance))
}
}()

return done, unsubscribe
return unsubscribe
}

0 comments on commit e77cdc1

Please sign in to comment.