Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Version entity serialization mechanism and fix issue with int64 vals #2944

Merged
merged 19 commits into from
Jul 20, 2022
2 changes: 1 addition & 1 deletion go/internal/feast/onlinestore/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
Expand Down
38 changes: 25 additions & 13 deletions go/internal/feast/onlinestore/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/feast-dev/feast/go/internal/feast/registry"
"sort"
"strconv"
"strings"

"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"github.com/spaolacci/murmur3"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
Expand All @@ -37,10 +38,15 @@ type RedisOnlineStore struct {

// Redis client connector
client *redis.Client

config *registry.RepoConfig
}

func NewRedisOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) {
store := RedisOnlineStore{project: project}
func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) {
store := RedisOnlineStore{
project: project,
config: config,
}

var address []string
var password string
Expand Down Expand Up @@ -161,7 +167,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E
redisKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {

var key, err = buildRedisKey(r.project, entityKeys[i])
var key, err = buildRedisKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -270,16 +276,16 @@ func (r *RedisOnlineStore) Destruct() {

}

func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) {
serKey, err := serializeEntityKey(entityKey)
func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
serKey, err := serializeEntityKey(entityKey, entityKeySerializationVersion)
if err != nil {
return nil, err
}
fullKey := append(*serKey, []byte(project)...)
return &fullKey, nil
}

func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
// Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table.

// Ensure that we have the right amount of join keys and entity values
Expand Down Expand Up @@ -316,7 +322,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
offset := (2 * len(keys)) + (i * 3)
value := m[keys[i]].GetVal()

valueBytes, valueTypeBytes, err := serializeValue(value)
valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion)
if err != nil {
return valueBytes, err
}
Expand All @@ -341,7 +347,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
return &entityKeyBuffer, nil
}

func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) {
func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
switch x := (value).(type) {
case *types.Value_StringVal:
Expand All @@ -354,10 +360,16 @@ func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) {
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
return &valueBuffer, types.ValueType_INT32, nil
case *types.Value_Int64Val:
// TODO (woop): We unfortunately have to use 32 bit here for backward compatibility :(
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
if entityKeySerializationVersion <= 1 {
// We unfortunately have to use 32 bit here for backward compatibility :(
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
} else {
valueBuffer := make([]byte, 8)
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
}
case nil:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
default:
Expand Down
25 changes: 21 additions & 4 deletions go/internal/feast/onlinestore/redisonlinestore_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package onlinestore

import (
"github.com/feast-dev/feast/go/internal/feast/registry"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -10,7 +11,11 @@ func TestNewRedisOnlineStore(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand All @@ -23,7 +28,11 @@ func TestNewRedisOnlineStoreWithPassword(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379,password=secret",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand All @@ -34,7 +43,11 @@ func TestNewRedisOnlineStoreWithDB(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379,db=1",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand All @@ -45,7 +58,11 @@ func TestNewRedisOnlineStoreWithSsl(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379,ssl=true",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand Down
15 changes: 8 additions & 7 deletions go/internal/feast/onlinestore/sqliteonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@ import (

_ "github.com/mattn/go-sqlite3"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
)

type SqliteOnlineStore struct {
// Feast project name
project string
path string
db *sql.DB
db_mu sync.Mutex
project string
path string
db *sql.DB
db_mu sync.Mutex
repoConfig *registry.RepoConfig
}

// Creates a new sqlite online store object. onlineStoreConfig should have relative path of database file with respect to repoConfig.repoPath.
func NewSqliteOnlineStore(project string, repoConfig *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) {
store := SqliteOnlineStore{project: project}
store := SqliteOnlineStore{project: project, repoConfig: repoConfig}
if db_path, ok := onlineStoreConfig["path"]; !ok {
return nil, fmt.Errorf("cannot find sqlite path %s", db_path)
} else {
Expand Down Expand Up @@ -69,7 +70,7 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.
in_query := make([]string, len(entityKeys))
serialized_entities := make([]interface{}, len(entityKeys))
for i := 0; i < len(entityKeys); i++ {
serKey, err := serializeEntityKey(entityKeys[i])
serKey, err := serializeEntityKey(entityKeys[i], s.repoConfig.EntityKeySerializationVersion)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions go/internal/feast/registry/repoconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type RepoConfig struct {
Flags map[string]interface{} `json:"flags"`
// RepoPath
RepoPath string `json:"repo_path"`
// EntityKeySerializationVersion
EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"`
}

type RegistryConfig struct {
Expand Down
Loading