Skip to content

Commit

Permalink
feat: support azure blob sync (#1428)
Browse files Browse the repository at this point in the history
## This PR

- adds support for Azure Blob Storage sync

Signed-off-by: Matthew Wilson <54033231+wilson-matthew@users.noreply.github.com>
  • Loading branch information
wilson-matthew authored Oct 23, 2024
1 parent 538da12 commit 5c39cfe
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 68 deletions.
10 changes: 10 additions & 0 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ require (
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.13 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand All @@ -65,6 +72,7 @@ require (
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand All @@ -80,10 +88,12 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
81 changes: 25 additions & 56 deletions core/go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion core/pkg/sync/blob/blob_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"gocloud.dev/blob"
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
_ "gocloud.dev/blob/azureblob" // needed to initialize Azure Blob Storage driver
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
)

type Sync struct {
Expand Down
48 changes: 46 additions & 2 deletions core/pkg/sync/builder/syncbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
syncProviderKubernetes = "kubernetes"
syncProviderHTTP = "http"
syncProviderGcs = "gcs"
syncProviderAzblob = "azblob"
)

var (
Expand All @@ -40,6 +41,7 @@ var (
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
regGcs *regexp.Regexp
regAzblob *regexp.Regexp
)

func init() {
Expand All @@ -49,6 +51,7 @@ func init() {
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
regGcs = regexp.MustCompile("^gs://.+?/")
regAzblob = regexp.MustCompile("^azblob://.+?/")
}

type ISyncBuilder interface {
Expand Down Expand Up @@ -111,11 +114,15 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo
case syncProviderGcs:
logger.Debug(fmt.Sprintf("using blob sync-provider with gcs driver for: %s", sourceConfig.URI))
return sb.newGcs(sourceConfig, logger), nil
case syncProviderAzblob:
logger.Debug(fmt.Sprintf("using blob sync-provider with azblob driver for: %s", sourceConfig.URI))
return sb.newAzblob(sourceConfig, logger)

default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', %s', '%s' or '%s'",
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with "+
"'%s', '%s', '%s', '%s', '%s', '%s', '%s' or '%s'",
sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo,
syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
syncProviderKubernetes, syncProviderHTTP, syncProviderGrpc, syncProviderGcs, syncProviderAzblob)
}
}

Expand Down Expand Up @@ -238,6 +245,43 @@ func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) *
}
}

func (sb *SyncBuilder) newAzblob(config sync.SourceConfig, logger *logger.Logger) (*blobSync.Sync, error) {
// Required to generate the azblob service URL
storageAccountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
if storageAccountName == "" {
return nil, fmt.Errorf("environment variable AZURE_STORAGE_ACCOUNT not set or is blank")
}
if regexp.MustCompile(`\s`).MatchString(storageAccountName) {
return nil, fmt.Errorf("environment variable AZURE_STORAGE_ACCOUNT contains whitespace")
}

// Extract bucket uri and object name from the full URI:
// azblob://bucket/path/to/object results in azblob://bucket/ as bucketUri and
// path/to/object as an object name.
bucketURI := regAzblob.FindString(config.URI)
objectName := regAzblob.ReplaceAllString(config.URI, "")

// Defaults to 5 seconds if interval is not set.
var interval uint32 = 5
if config.Interval != 0 {
interval = config.Interval
}

return &blobSync.Sync{
Bucket: bucketURI,
Object: objectName,

BlobURLMux: blob.DefaultURLMux(),

Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "azblob"),
),
Interval: interval,
Cron: cron.New(),
}, nil
}

type IK8sClientBuilder interface {
GetK8sClient() (dynamic.Interface, error)
}
Expand Down
94 changes: 94 additions & 0 deletions core/pkg/sync/builder/syncbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func Test_SyncsFromFromConfig(t *testing.T) {
{
name: "combined",
injectFunc: func(builder *SyncBuilder) {
t.Setenv("AZURE_STORAGE_ACCOUNT", "myaccount")

ctrl := gomock.NewController(t)

mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
Expand Down Expand Up @@ -236,6 +238,10 @@ func Test_SyncsFromFromConfig(t *testing.T) {
URI: "gs://bucket/path/to/file",
Provider: syncProviderGcs,
},
{
URI: "azblob://bucket/path/to/file",
Provider: syncProviderAzblob,
},
},
},
wantSyncs: []sync.ISync{
Expand All @@ -245,6 +251,7 @@ func Test_SyncsFromFromConfig(t *testing.T) {
&file.Sync{},
&kubernetes.Sync{},
&blob.Sync{},
&blob.Sync{},
},
wantErr: false,
},
Expand Down Expand Up @@ -324,3 +331,90 @@ func Test_GcsConfig(t *testing.T) {
})
}
}

func Test_AzblobConfig(t *testing.T) {
lg := logger.NewLogger(nil, false)
defaultInterval := uint32(5)
tests := []struct {
name string
uri string
interval uint32
storageAccount string
expectedBucket string
expectedObject string
expectedInterval uint32
wantErr bool
}{
{
name: "simple path",
uri: "azblob://bucket/path/to/object",
interval: 10,
storageAccount: "myaccount",
expectedBucket: "azblob://bucket/",
expectedObject: "path/to/object",
expectedInterval: 10,
wantErr: false,
},
{
name: "default interval",
uri: "azblob://bucket/path/to/object",
storageAccount: "myaccount",
expectedBucket: "azblob://bucket/",
expectedObject: "path/to/object",
expectedInterval: defaultInterval,
wantErr: false,
},
{
name: "no object set", // Blob syncer will return error when fetching
uri: "azblob://bucket/",
storageAccount: "myaccount",
expectedBucket: "azblob://bucket/",
expectedObject: "",
expectedInterval: defaultInterval,
wantErr: false,
},
{
name: "malformed uri", // Blob syncer will return error when opening bucket
uri: "malformed",
storageAccount: "myaccount",
expectedBucket: "",
expectedObject: "malformed",
expectedInterval: defaultInterval,
wantErr: false,
},
{
name: "storage account not set", // Sync builder will fail and return error
uri: "azblob://bucket/path/to/object",
storageAccount: "",
wantErr: true,
},
{
name: "storage account contains whitespace", // Sync builder will fail and return error
uri: "azblob://bucket/path/to/object",
storageAccount: "my account",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv("AZURE_STORAGE_ACCOUNT", tt.storageAccount)
azblobSync, err := NewSyncBuilder().newAzblob(sync.SourceConfig{
URI: tt.uri,
Interval: tt.interval,
}, lg)

if (err != nil) != tt.wantErr {
t.Errorf("newAzblob() error = %v, wantErr %v", err, tt.wantErr)
return
}

if (err != nil) && (tt.wantErr == true) {
return
}

require.Equal(t, tt.expectedBucket, azblobSync.Bucket)
require.Equal(t, tt.expectedObject, azblobSync.Object)
require.Equal(t, int(tt.expectedInterval), int(azblobSync.Interval))
})
}
}
7 changes: 6 additions & 1 deletion core/pkg/sync/builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) {
URI: uri,
Provider: syncProviderGcs,
})
case regAzblob.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: uri,
Provider: syncProviderAzblob,
})
default:
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
"'http(s)://', 'grpc(s)://', 'gs://' or 'core.openfeature.dev'", uri)
"'http(s)://', 'grpc(s)://', 'gs://', 'azblob://' or 'core.openfeature.dev'", uri)
}
}
return syncProvidersParsed, nil
Expand Down
12 changes: 11 additions & 1 deletion core/pkg/sync/builder/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestParseSource(t *testing.T) {
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
{"uri":"host:port","provider":"grpc"},
{"uri":"default/my-crd","provider":"kubernetes"},
{"uri":"gs://bucket-name/path/to/file","provider":"gcs"}
{"uri":"gs://bucket-name/path/to/file","provider":"gcs"},
{"uri":"azblob://bucket-name/path/to/file","provider":"azblob"}
]`,
expectErr: false,
out: []sync.SourceConfig{
Expand All @@ -54,6 +55,10 @@ func TestParseSource(t *testing.T) {
URI: "gs://bucket-name/path/to/file",
Provider: syncProviderGcs,
},
{
URI: "azblob://bucket-name/path/to/file",
Provider: syncProviderAzblob,
},
},
},
"multiple-syncs-with-options": {
Expand Down Expand Up @@ -188,6 +193,7 @@ func TestParseSyncProviderURIs(t *testing.T) {
"grpcs://secure-grpc",
"core.openfeature.dev/default/my-crd",
"gs://bucket-name/path/to/file",
"azblob://bucket-name/path/to/file",
},
expectErr: false,
out: []sync.SourceConfig{
Expand Down Expand Up @@ -217,6 +223,10 @@ func TestParseSyncProviderURIs(t *testing.T) {
URI: "gs://bucket-name/path/to/file",
Provider: syncProviderGcs,
},
{
URI: "azblob://bucket-name/path/to/file",
Provider: syncProviderAzblob,
},
},
},
"empty": {
Expand Down
18 changes: 17 additions & 1 deletion docs/concepts/syncs.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ See [sync source](../reference/sync-configuration.md#source-configuration) confi

### GCS sync

The GCS sync provider fetches flags from a GCS blob and periodically poll the GCS for the flag definition updates.
The GCS sync provider fetches flags from a GCS blob and periodically polls the GCS for the flag definition updates.
It uses [application default credentials](https://cloud.google.com/docs/authentication/application-default-credentials) if they
are [configured](https://cloud.google.com/docs/authentication/provide-credentials-adc) to authorize the calls to GCS.

Expand All @@ -85,6 +85,22 @@ In this example, `gs://my-bucket/my-flags.json` is expected to be a valid GCS UR
The polling interval can be configured.
See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details.

### Azure Blob sync

The Azure Blob sync provider fetches flags from an Azure Blob Storage blob and periodically polls the blob for the flag definition updates.
It uses [environment variables](https://pkg.go.dev/gocloud.dev/blob/azureblob#hdr-URLs) to set the Storage Account name and to
authorize the calls to Azure Blob Storage.

```shell
flagd start --uri azblob://my-container/my-flags.json
```

In this example, assuming the environment variable AZURE_STORAGE_ACCOUNT is set to `myaccount`, and other options are not set, the service URL will be:
`https://myaccount.blob.core.windows.net/my-container/my-flags.json`.
This is expected be a valid service URL accessible by flagd (either by being public or together with environment variable credentials).
The polling interval can be configured.
See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details.

## Merging

Flagd can be configured to read from multiple sources at once, when this is the case flagd will merge all flag definition into a single
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ flagd start [flags]
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
-g, --sync-port int32 gRPC Sync port (default 8015)
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC) or FeatureFlag custom resource. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS or Azure Blob. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
```

### Options inherited from parent commands
Expand Down
Loading

0 comments on commit 5c39cfe

Please sign in to comment.