Skip to content

Commit

Permalink
Convert migration options to enum; other PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Aug 8, 2020
1 parent ce549da commit 99458a1
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 37 deletions.
40 changes: 21 additions & 19 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ import (
var (
// defaultNumProcessorsPerCPU is the default number of processors per CPU.
defaultNumProcessorsPerCPU = 0.125

// defaultMigrationConcurrency is the default number of concurrent workers to perform migrations
defaultMigrationConcurrency = 10
)

// BootstrapConfiguration specifies the config for bootstrappers.
Expand Down Expand Up @@ -75,43 +72,48 @@ type BootstrapFilesystemConfiguration struct {
// NumProcessorsPerCPU is the number of processors per CPU.
NumProcessorsPerCPU float64 `yaml:"numProcessorsPerCPU" validate:"min=0.0"`

// Migrations configuration
Migrations *BootstrapMigrations `yaml:"migrations"`
// Migration configuration specifies what version, if any, existing data filesets should be migrated to
// if necessary
Migration *BootstrapMigrationConfiguration `yaml:"migration"`
}

func (c BootstrapFilesystemConfiguration) numCPUs() int {
return int(math.Ceil(float64(c.NumProcessorsPerCPU * float64(runtime.NumCPU()))))
}

func (c BootstrapFilesystemConfiguration) migrations() BootstrapMigrations {
if cfg := c.Migrations; cfg != nil {
func (c BootstrapFilesystemConfiguration) migration() BootstrapMigrationConfiguration {
if cfg := c.Migration; cfg != nil {
return *cfg
}
return BootstrapMigrations{Concurrency: defaultMigrationConcurrency}
return BootstrapMigrationConfiguration{}
}

func newDefaultBootstrapFilesystemConfiguration() BootstrapFilesystemConfiguration {
return BootstrapFilesystemConfiguration{
NumProcessorsPerCPU: defaultNumProcessorsPerCPU,
Migrations: &BootstrapMigrations{},
Migration: &BootstrapMigrationConfiguration{},
}
}

// BootstrapMigrations specifies configuration for data migrations during bootstrapping
type BootstrapMigrations struct {
// ToVersion1_1Task indicates that we should attempt to upgrade filesets to
// what’s expected of 1.1 files
ToVersion1_1 bool `yaml:"toVersion1_1"`
// BootstrapMigrationConfiguration specifies configuration for data migrations during bootstrapping
type BootstrapMigrationConfiguration struct {
// ToVersion indicates that we should attempt to upgrade filesets to
// what’s expected of the specified version
ToVersion migration.MigrateVersion `yaml:"toVersion"`

// Concurrency sets the number of concurrent workers performing migrations
Concurrency int `yaml:"concurrency"`
}

// NewOptions generates migration.Options from the configuration
func (m BootstrapMigrations) NewOptions() migration.Options {
return migration.NewOptions().
SetToVersion1_1(m.ToVersion1_1).
SetConcurrency(m.Concurrency)
func (m BootstrapMigrationConfiguration) NewOptions() migration.Options {
opts := migration.NewOptions().SetToVersion(m.ToVersion)

if m.Concurrency > 0 {
opts = opts.SetConcurrency(m.Concurrency)
}

return opts
}

// BootstrapCommitlogConfiguration specifies config for the commitlog bootstrapper.
Expand Down Expand Up @@ -215,7 +217,7 @@ func (bsc BootstrapConfiguration) New(
SetBoostrapDataNumProcessors(fsCfg.numCPUs()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migrations().NewOptions())
SetMigrationOptions(fsCfg.migration().NewOptions())
if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil {
return nil, err
}
Expand Down
35 changes: 28 additions & 7 deletions src/dbnode/persist/fs/migration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,45 @@

package migration

import "errors"

// defaultMigrationConcurrency is the default number of concurrent workers to perform migrations
const defaultMigrationConcurrency = 10

var (
errConcurrencyInvalid = errors.New("concurrency value valid. must be >= 1")
)

type options struct {
toVersion1_1 bool
concurrency int
toVersion MigrateVersion
concurrency int
}

// NewOptions creates new migration options
func NewOptions() Options {
return &options{}
return &options{
concurrency: defaultMigrationConcurrency,
}
}

func (o *options) Validate() error {
if err := ValidateMigrateVersion(o.toVersion); err != nil {
return err
}
if o.concurrency < 1 {
return errConcurrencyInvalid
}
return nil
}

func (o *options) SetToVersion1_1(value bool) Options {
func (o *options) SetToVersion(value MigrateVersion) Options {
opts := *o
opts.toVersion1_1 = value
opts.toVersion = value
return &opts
}

func (o *options) ToVersion1_1() bool {
return o.toVersion1_1
func (o *options) ToVersion() MigrateVersion {
return o.toVersion
}

func (o *options) SetConcurrency(value int) Options {
Expand Down
22 changes: 15 additions & 7 deletions src/dbnode/persist/fs/migration/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@ import (
"github.com/stretchr/testify/require"
)

func TestOptionsToVersion1_1(t *testing.T) {
func TestOptionsToVersion(t *testing.T) {
opts := NewOptions()
require.False(t, opts.ToVersion1_1())
require.Equal(t, MigrateVersionNone, opts.ToVersion())

opts = opts.SetToVersion1_1(true)
require.True(t, opts.ToVersion1_1())
opts = opts.SetToVersion(MigrateVersion_1_1)
require.Equal(t, MigrateVersion_1_1, opts.ToVersion())
}

func TestOptionsConcurrency(t *testing.T) {
opts := NewOptions()
require.Equal(t, 0, opts.Concurrency())

opts = opts.SetConcurrency(10)
require.Equal(t, 10, opts.Concurrency())

opts = opts.SetConcurrency(100)
require.Equal(t, 100, opts.Concurrency())
}

func TestOptionsValidate(t *testing.T) {
opts := NewOptions()
require.NoError(t, opts.Validate())

require.Error(t, opts.SetToVersion(2).Validate())
require.Error(t, opts.SetConcurrency(0).Validate())
}
80 changes: 76 additions & 4 deletions src/dbnode/persist/fs/migration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,89 @@

package migration

import "fmt"

// Options represents the options for migrations
type Options interface {
// SetToVersion1_1 sets the toVersion1_1 migration option
SetToVersion1_1(value bool) Options
// Validate validates migration options
Validate() error

// SetToVersion sets the toVersion migration option
SetToVersion(value MigrateVersion) Options

// ToVersion1_1Task returns the value of toVersion1_1 migration option
ToVersion1_1() bool
// ToVersion returns the value of toVersion migration option
ToVersion() MigrateVersion

// SetConcurrency sets the number of concurrent workers performing migrations
SetConcurrency(value int) Options

// Concurrency gets the number of concurrent workers performing migrations
Concurrency() int
}

// MigrateVersion is an enum that corresponds to the major and minor version number to migrate data files to
type MigrateVersion uint

const (
// MigrateVersionNone indicates node should not attempt to perform any migrations
MigrateVersionNone MigrateVersion = iota
// MigrateVersion_1_1 indicates node should attempt to migrate data files up to version 1.1
MigrateVersion_1_1
)

var (
validMigrateVersions = []MigrateVersion{
MigrateVersionNone,
MigrateVersion_1_1,
}
)

func (m *MigrateVersion) String() string {
switch *m {
case MigrateVersionNone:
return "none"
case MigrateVersion_1_1:
return "1.1"
default:
return "unknown"
}
}

// ParseMigrateVersion parses a string for a MigrateVersion
func ParseMigrateVersion(str string) (MigrateVersion, error) {
for _, valid := range validMigrateVersions {
if str == valid.String() {
return valid, nil
}
}

return 0, fmt.Errorf("unrecognized migrate version: %v", str)
}

// ValidateMigrateVersion validates a stored metrics type.
func ValidateMigrateVersion(m MigrateVersion) error {
for _, valid := range validMigrateVersions {
if valid == m {
return nil
}
}

return fmt.Errorf("invalid migrate version '%v': should be one of %v",
m, validMigrateVersions)
}

// UnmarshalYAML unmarshals a migrate version
func (m *MigrateVersion) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}

if value, err := ParseMigrateVersion(str); err == nil {
*m = value
return nil
}

return fmt.Errorf("invalid MigrateVersion '%s' valid types are: %v",
str, validMigrateVersions)
}
65 changes: 65 additions & 0 deletions src/dbnode/persist/fs/migration/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2020 Uber Technologies, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package migration

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
)

func TestParseMigrateVersion(t *testing.T) {
v, err := ParseMigrateVersion("none")
require.NoError(t, err)
require.Equal(t, MigrateVersionNone, v)

v, err = ParseMigrateVersion("1.1")
require.NoError(t, err)
require.Equal(t, MigrateVersion_1_1, v)
}

func TestValidateMigrateVersion(t *testing.T) {
err := ValidateMigrateVersion(MigrateVersion_1_1)
require.NoError(t, err)

err = ValidateMigrateVersion(2)
require.Error(t, err)
}

func TestUnmarshalYAML(t *testing.T) {
type config struct {
Version MigrateVersion `yaml:"version"`
}

for _, value := range validMigrateVersions {
str := fmt.Sprintf("version: %s\n", value.String())
var cfg config
require.NoError(t, yaml.Unmarshal([]byte(str), &cfg))
require.Equal(t, value, cfg.Version)
}

var cfg config
require.Error(t, yaml.Unmarshal([]byte("version: abc"), &cfg))
}
7 changes: 7 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
errCompactorNotSet = errors.New("compactor not set")
errIndexOptionsNotSet = errors.New("index options not set")
errFilesystemOptionsNotSet = errors.New("filesystem options not set")
errMigrationOptionsNotSet = errors.New("migration options not set")

// NB(r): Bootstrapping data doesn't use large amounts of memory
// that won't be released, so its fine to do this as fast as possible.
Expand Down Expand Up @@ -102,6 +103,12 @@ func (o *options) Validate() error {
if o.fsOpts == nil {
return errFilesystemOptionsNotSet
}
if o.migrationOpts == nil {
return errMigrationOptionsNotSet
}
if err := o.migrationOpts.Validate(); err != nil {
return err
}
return nil
}

Expand Down

0 comments on commit 99458a1

Please sign in to comment.