Skip to content

Commit

Permalink
api: write a connection schema getter
Browse files Browse the repository at this point in the history
Write a helper function to load the actual schema for the user.

Previously we stored actual schema in a private `schemaResolver`
field and `Schema` field was used only to get a current schema.
But now because of the new function, we don't need to store the
`Schema` as a different field. So `Schema` was also removed.

To update the schema, used needs to use `GetSchema` + `SetSchema`
in pair. `SetSchema` is a renamed `OverrideSchema`.

Closes #7
  • Loading branch information
DerekBum committed Nov 20, 2023
1 parent 6225ec4 commit 66c5253
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 35 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support `IPROTO_FEATURE_SPACE_AND_INDEX_NAMES` for Tarantool
version >= 3.0.0-alpha1 (#338). It allows to use space and index names
in requests instead of their IDs.
- `GetSchema` function to get the actual schema (#7)

### Changed

Expand All @@ -51,6 +52,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
instead of `crud.OptUint` (#342)
- Change all `Upsert` and `Update` requests to accept `*tarantool.Operations`
as `ops` parameters instead of `interface{}` (#348)
- Change `OverrideSchema()` to `SetSchema()` (#7)

### Deprecated

Expand All @@ -70,6 +72,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- UUID_extId (#158)
- IPROTO constants (#158)
- Code() method from the Request interface (#158)
- `Schema` field from the `Connection` struct (#7)

### Fixed

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ now does not attempt to reconnect and tries to establish a connection only once.
Function might be canceled via context. Context accepted as first argument,
and user may cancel it in process.

#### Connection schema

* Removed `Schema` field from the `Connection` struct. Instead, new
`GetSchema(Connector)` function was added to get the actual connection
schema on demand.
* `OverrideSchema` method renamed to the `SetSchema`.

#### Protocol changes

* `iproto.Feature` type used instead of `ProtocolFeature`.
Expand Down
27 changes: 16 additions & 11 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ type Connection struct {
c Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// schemaResolver contains a SchemaResolver implementation.
schemaResolver SchemaResolver
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -436,12 +434,14 @@ func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err

// TODO: reload schema after reconnect.
if !conn.opts.SkipSchema {
if err = conn.loadSchema(); err != nil {
schema, err := GetSchema(conn)
if err != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.closeConnection(err, true)
return nil, err
}
conn.SetSchema(schema)
}

return conn, err
Expand Down Expand Up @@ -1302,15 +1302,20 @@ func (conn *Connection) ConfiguredTimeout() time.Duration {
return conn.opts.Timeout
}

// OverrideSchema sets Schema for the connection.
func (conn *Connection) OverrideSchema(s *Schema) {
if s != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.lockShards()
defer conn.unlockShards()
// SetSchema sets Schema for the connection.
func (conn *Connection) SetSchema(s Schema) {
spaceAndIndexNamesSupported :=
isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
conn.serverProtocolInfo.Features)

conn.Schema = s
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.lockShards()
defer conn.unlockShards()

conn.schemaResolver = &loadedSchemaResolver{
Schema: &s,
SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
}
}

Expand Down
24 changes: 22 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,10 @@ func ExampleSchema() {
conn := exampleConnect(opts)
defer conn.Close()

schema := conn.Schema
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
if schema.SpacesById == nil {
fmt.Println("schema.SpacesById is nil")
}
Expand All @@ -1080,13 +1083,30 @@ func ExampleSchema() {
// Space 2 ID 616 schematest
}

// Example demonstrates how to update the connection schema.
func ExampleConnection_SetSchema() {
conn := exampleConnect(opts)
defer conn.Close()

// Get the actual schema.
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
// Update the current schema to match the actual one.
conn.SetSchema(schema)
}

// Example demonstrates how to retrieve information with space schema.
func ExampleSpace() {
conn := exampleConnect(opts)
defer conn.Close()

// Save Schema to a local variable to avoid races
schema := conn.Schema
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
if schema.SpacesById == nil {
fmt.Println("schema.SpacesById is nil")
}
Expand Down
28 changes: 8 additions & 20 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"

"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)
Expand Down Expand Up @@ -340,16 +339,17 @@ func (indexField *IndexField) DecodeMsgpack(d *msgpack.Decoder) error {
return errors.New("unexpected schema format (index fields)")
}

func (conn *Connection) loadSchema() (err error) {
schema := new(Schema)
// GetSchema returns the actual schema for the connection.
func GetSchema(conn Connector) (Schema, error) {
schema := Schema{}
schema.SpacesById = make(map[uint32]*Space)
schema.Spaces = make(map[string]*Space)

// Reload spaces.
var spaces []*Space
err = conn.SelectTyped(vspaceSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &spaces)
err := conn.SelectTyped(vspaceSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &spaces)
if err != nil {
return err
return Schema{}, err
}
for _, space := range spaces {
schema.SpacesById[space.Id] = space
Expand All @@ -360,31 +360,19 @@ func (conn *Connection) loadSchema() (err error) {
var indexes []*Index
err = conn.SelectTyped(vindexSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &indexes)
if err != nil {
return err
return Schema{}, err
}
for _, index := range indexes {
spaceId := index.SpaceId
if _, ok := schema.SpacesById[spaceId]; ok {
schema.SpacesById[spaceId].IndexesById[index.Id] = index
schema.SpacesById[spaceId].Indexes[index.Name] = index
} else {
return errors.New("concurrent schema update")
return Schema{}, errors.New("concurrent schema update")
}
}

spaceAndIndexNamesSupported :=
isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
conn.serverProtocolInfo.Features)

conn.lockShards()
conn.Schema = schema
conn.schemaResolver = &loadedSchemaResolver{
Schema: schema,
SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
}
conn.unlockShards()

return nil
return schema, nil
}

// resolveSpaceNumber tries to resolve a space number.
Expand Down
23 changes: 21 additions & 2 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,19 @@ func TestConnection_DoWithStrangerConn(t *testing.T) {
}
}

func TestGetSchema(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

s, err := GetSchema(conn)
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
if s.Version != 0 || s.Spaces[spaceName].Id != spaceNo {
t.Errorf("GetSchema() returns incorrect schema")
}
}

func TestNewPreparedFromResponse(t *testing.T) {
var (
ErrNilResponsePassed = fmt.Errorf("passed nil response")
Expand Down Expand Up @@ -1882,7 +1895,10 @@ func TestSchema(t *testing.T) {
defer conn.Close()

// Schema
schema := conn.Schema
schema, err := GetSchema(conn)
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
if schema.SpacesById == nil {
t.Errorf("schema.SpacesById is nil")
}
Expand Down Expand Up @@ -2028,7 +2044,10 @@ func TestSchema_IsNullable(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

schema := conn.Schema
schema, err := GetSchema(conn)
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
if schema.Spaces == nil {
t.Errorf("schema.Spaces is nil")
}
Expand Down

0 comments on commit 66c5253

Please sign in to comment.