Skip to content

JetCache-go is a Go cache framework. | 多级缓存 | 异步刷新 | 防穿透 | 指标采集 | 自动降级 | 批量查询 |

License

Notifications You must be signed in to change notification settings

go-impatient/jetcache-go

 
 

Repository files navigation

Build Status codeCov Go Repport Card License

Translate to: 简体中文

Introduction

jetcache-go is a general-purpose cache access framework based on go-redis/cache. It implements the core features of the Java version of JetCache, including:

  • ✅ Flexible combination of two-level caching: You can use memory, Redis, or your own custom storage method.
  • ✅ The Once interface adopts the singleflight pattern, which is highly concurrent and thread-safe.
  • ✅ By default, MsgPack is used for encoding and decoding values. Optional sonic and native json.
  • ✅ The default local cache implementation includes Ristretto and FreeCache.
  • ✅ The default distributed cache implementation is based on go-redis/v9, and you can also customize your own implementation.
  • ✅ You can customize the errNotFound error and use placeholders to prevent cache penetration by caching empty results.
  • ✅ Supports asynchronous refreshing of distributed caches.
  • ✅ Metrics collection: By default, it prints statistical metrics (QPM, Hit, Miss, Query, QueryFail) through logs.
  • ✅ Automatic degradation of distributed cache query failures.
  • ✅ The MGet interface supports the Load function. In a distributed caching scenario, the Pipeline mode is used to improve performance. (v1.1.0+)
  • ✅ Invalidate local caches (in all Go processes) after updates (v1.1.1+)

Installation

To start using the latest version of jetcache-go, you can import the library into your project:

go get github.com/mgtv-tech/jetcache-go

Getting started

Basic Usage

package cache_test

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/mgtv-tech/jetcache-go"
	"github.com/mgtv-tech/jetcache-go/local"
	"github.com/mgtv-tech/jetcache-go/remote"
	"github.com/redis/go-redis/v9"
)

var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")

type object struct {
	Str string
	Num int
}

func Example_basicUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound))

	ctx := context.TODO()
	key := "mykey:1"
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	var wanted object
	if err := mycache.Get(ctx, key, &wanted); err == nil {
		fmt.Println(wanted)
	}
	// Output: {mystring 42}

	mycache.Close()
}

func Example_advancedUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRefreshDuration(time.Minute))

	ctx := context.TODO()
	key := "mykey:1"
	obj := new(object)
	if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
		cache.Do(func(ctx context.Context) (any, error) {
			return mockDBGetObject(1)
		})); err != nil {
		panic(err)
	}
	fmt.Println(obj)
	// Output: &{mystring 42}

	mycache.Close()
}

func Example_mGetUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
	)
	cacheT := cache.NewT[int, *object](mycache)

	ctx := context.TODO()
	key := "mget"
	ids := []int{1, 2, 3}

	ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
		return mockDBMGetObject(ids)
	})

	var b bytes.Buffer
	for _, id := range ids {
		b.WriteString(fmt.Sprintf("%v", ret[id]))
	}
	fmt.Println(b.String())
	// Output: &{mystring 1}&{mystring 2}<nil>

	cacheT.Close()
}

func Example_syncLocalUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	sourceID := "12345678" // Unique identifier for this cache instance
	channelName := "syncLocalChannel"
	pubSub := ring.Subscribe(context.Background(), channelName)

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
		cache.WithSourceId(sourceID),
		cache.WithSyncLocal(true),
		cache.WithEventHandler(func(event *cache.Event) {
			// Broadcast local cache invalidation for the received keys
			bs, _ := json.Marshal(event)
			ring.Publish(context.Background(), channelName, string(bs))
		}),
	)
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	go func() {
		for {
			msg := <-pubSub.Channel()
			var event *cache.Event
			if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
				panic(err)
			}
			fmt.Println(event.Keys)

			// Invalidate local cache for received keys (except own events)
			if event.SourceID != sourceID {
				for _, key := range event.Keys {
					mycache.DeleteFromLocalCache(key)
				}
			}
		}
	}()

	// Output: [mykey]
	mycache.Close()
	time.Sleep(time.Second)
}

func mockDBGetObject(id int) (*object, error) {
	if id > 100 {
		return nil, errRecordNotFound
	}
	return &object{Str: "mystring", Num: 42}, nil
}

func mockDBMGetObject(ids []int) (map[int]*object, error) {
	ret := make(map[int]*object)
	for _, id := range ids {
		if id == 3 {
			continue
		}
		ret[id] = &object{Str: "mystring", Num: id}
	}
	return ret, nil
}

Configure settings

// Options are used to store cache options.
Options struct {
    name                       string             // Cache name, used for log identification and metric reporting
    remote                     remote.Remote      // Remote is distributed cache, such as Redis.
    local                      local.Local        // Local is memory cache, such as FreeCache.
    codec                      string             // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
    errNotFound                error              // Error to return for cache miss. Used to prevent cache penetration.
    remoteExpiry               time.Duration      // Remote cache ttl, Default is 1 hour.
    notFoundExpiry             time.Duration      // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
    offset                     time.Duration      // Expiration time jitter factor for cache misses.
    refreshDuration            time.Duration      // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
    stopRefreshAfterLastAccess time.Duration      // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
    refreshConcurrency         int                // Maximum number of concurrent cache refreshes. Default is 4.
    statsDisabled              bool               // Flag to disable cache statistics.
    statsHandler               stats.Handler      // Metrics statsHandler collector.
    sourceID                   string             // Unique identifier for cache instance.
    syncLocal                  bool               // Enable events for syncing local cache (only for "Both" cache type).
    eventChBufSize             int                // Buffer size for event channel (default: 100).
    eventHandler               func(event *Event) // Function to handle local cache invalidation events.
}

Cache metrics collection and statistics

You can implement the stats.Handler interface and register it with the Cache component to customize metric collection, for example, using Prometheus to collect metrics. We have provided a default implementation that logs the statistical metrics, as shown below:

2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
cache       |         qpm|   hit_ratio|         hit|        miss|       query|  query_fail
------------+------------+------------+------------+------------+------------+------------
bench       |   216440123|     100.00%|   216439867|         256|         256|           0|
bench_local |   216440123|     100.00%|   216434970|        5153|           -|           -|
bench_remote|        5153|      95.03%|        4897|         256|           -|           -|
------------+------------+------------+------------+------------+------------+------------

Custom Logger

import "github.com/mgtv-tech/jetcache-go/logger"

// Set your Logger
logger.SetDefaultLogger(l logger.Logger)

Custom Encoding and Decoding

import (
    "github.com/mgtv-tech/jetcache-go"
    "github.com/mgtv-tech/jetcache-go/encoding"
)

// Register your codec
encoding.RegisterCodec(codec Codec)

// Set your codec name
mycache := cache.New("any",
    cache.WithRemote(...),
    cache.WithCodec(yourCodecName string))

Usage Scenarios

Automatic Cache Refresh

jetcache-go provides automatic cache refresh capability to prevent cache avalanche and database overload when cache misses occur. It is suitable for scenarios with a small number of keys, low real-time requirements, and high loading overhead. The code below specifies a refresh every minute, and stops refreshing after 1 hour without access. If the cache is Redis or the last level of a multi-level cache is Redis, the cache loading behavior is globally unique, which means that only one server is refreshing at a time regardless of the number of servers, to reduce the load on the backend.

mycache := cache.New(cache.WithName("any"),
       // ...
       // cache.WithRefreshDuration sets the asynchronous refresh interval
       cache.WithRefreshDuration(time.Minute),
       // cache.WithStopRefreshAfterLastAccess sets the time to cancel the refresh task after the cache key is not accessed
        cache.WithStopRefreshAfterLastAccess(time.Hour))

// `Once` interface starts automatic refresh by `cache.Refresh(true)`
err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
    return mockDBGetObject(1)
})) 

MGet Batch Query

MGet utilizes golang generics and the Load function to provide a user-friendly way to batch query entities corresponding to IDs in a multi-level cache. If the cache is Redis or the last level of a multi-level cache is Redis, Pipeline is used to implement read and write operations to improve performance. It's worth noting that for abnormal scenarios (IO exceptions, serialization exceptions, etc.), our design philosophy is to provide lossy services as much as possible to prevent cache penetration.

mycache := cache.New(cache.WithName("any"),
       // ...
       cache.WithRemoteExpiry(time.Minute),
    )
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mykey"
ids := []int{1, 2, 3}

ret := mycache.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
    return mockDBMGetObject(ids)
})

Codec Selection

jetcache-go implements three serialization and deserialization (codec) methods by default: sonicMsgPack, and native json.

Selection Guide:

  • For high-performance encoding and decoding: If the local cache hit rate is extremely high, but the deserialization operation of converting byte arrays to objects in the local cache consumes a lot of CPU, choose sonic.
  • For balanced performance and extreme storage space: Choose MsgPack, which uses MsgPack encoding and decoding. Content > 64 bytes will be compressed with snappy.

Tip: Remember to import the necessary packages as needed to register the codec.

 _ "github.com/mgtv-tech/jetcache-go/encoding/sonic"

Plugin

Plugin Project: jetcache-go-plugin, Welcome to contribute to this project. Currently, the following features have been implemented:

Contributing

Everyone is welcome to help improve jetcache-go. If you have any questions, suggestions, or want to add other features, please submit an issue or PR directly.

Please follow these steps to submit a PR:

  • Clone the repository
  • Create a new branch: name it feature-xxx for new features or bug-xxx for bug fixes
  • Describe the changes in detail in the PR

Contact

If you have any questions, please contact daoshenzzg@163.com.

About

JetCache-go is a Go cache framework. | 多级缓存 | 异步刷新 | 防穿透 | 指标采集 | 自动降级 | 批量查询 |

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 99.6%
  • Makefile 0.4%