Skip to content

Commit

Permalink
feat: support azure blob sync
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Wilson <54033231+wilson-matthew@users.noreply.github.com>
  • Loading branch information
wilson-matthew committed Oct 21, 2024
1 parent 538da12 commit ced1139
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 67 deletions.
10 changes: 10 additions & 0 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ require (
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.13 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand All @@ -65,6 +72,7 @@ require (
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand All @@ -80,10 +88,12 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
81 changes: 25 additions & 56 deletions core/go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion core/pkg/sync/blob/blob_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"gocloud.dev/blob"
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
_ "gocloud.dev/blob/azureblob" // needed to initialize Azure Blob Storage driver
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
)

type Sync struct {
Expand Down
47 changes: 45 additions & 2 deletions core/pkg/sync/builder/syncbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
syncProviderKubernetes = "kubernetes"
syncProviderHTTP = "http"
syncProviderGcs = "gcs"
syncProviderAzblob = "azblob"
)

var (
Expand All @@ -40,6 +41,7 @@ var (
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
regGcs *regexp.Regexp
regAzblob *regexp.Regexp
)

func init() {
Expand All @@ -49,6 +51,7 @@ func init() {
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
regGcs = regexp.MustCompile("^gs://.+?/")
regAzblob = regexp.MustCompile("^azblob://.+?/")
}

type ISyncBuilder interface {
Expand Down Expand Up @@ -111,11 +114,14 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo
case syncProviderGcs:
logger.Debug(fmt.Sprintf("using blob sync-provider with gcs driver for: %s", sourceConfig.URI))
return sb.newGcs(sourceConfig, logger), nil
case syncProviderAzblob:
logger.Debug(fmt.Sprintf("using blob sync-provider with azblob driver for: %s", sourceConfig.URI))
return sb.newAzblob(sourceConfig, logger)

default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', %s', '%s' or '%s'",
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', '%s', '%s', '%s', '%s' or '%s'",
sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo,
syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
syncProviderKubernetes, syncProviderHTTP, syncProviderGrpc, syncProviderGcs, syncProviderAzblob)
}
}

Expand Down Expand Up @@ -238,6 +244,43 @@ func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) *
}
}

func (sb *SyncBuilder) newAzblob(config sync.SourceConfig, logger *logger.Logger) (*blobSync.Sync, error) {
// Required to generate the azblob service URL
storageAccountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
if storageAccountName == "" {
return nil, fmt.Errorf("environment variable AZURE_STORAGE_ACCOUNT not set or is blank")
}
if regexp.MustCompile(`\s`).MatchString(storageAccountName) {
return nil, fmt.Errorf("environment variable AZURE_STORAGE_ACCOUNT contains whitespace")
}

// Extract bucket uri and object name from the full URI:
// azblob://bucket/path/to/object results in azblob://bucket/ as bucketUri and
// path/to/object as an object name.
bucketURI := regAzblob.FindString(config.URI)
objectName := regAzblob.ReplaceAllString(config.URI, "")

// Defaults to 5 seconds if interval is not set.
var interval uint32 = 5
if config.Interval != 0 {
interval = config.Interval
}

return &blobSync.Sync{
Bucket: bucketURI,
Object: objectName,

BlobURLMux: blob.DefaultURLMux(),

Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "azblob"),
),
Interval: interval,
Cron: cron.New(),
}, nil
}

type IK8sClientBuilder interface {
GetK8sClient() (dynamic.Interface, error)
}
Expand Down
94 changes: 94 additions & 0 deletions core/pkg/sync/builder/syncbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func Test_SyncsFromFromConfig(t *testing.T) {
{
name: "combined",
injectFunc: func(builder *SyncBuilder) {
t.Setenv("AZURE_STORAGE_ACCOUNT", "myaccount")

ctrl := gomock.NewController(t)

mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl)
Expand Down Expand Up @@ -236,6 +238,10 @@ func Test_SyncsFromFromConfig(t *testing.T) {
URI: "gs://bucket/path/to/file",
Provider: syncProviderGcs,
},
{
URI: "azblob://bucket/path/to/file",
Provider: syncProviderAzblob,
},
},
},
wantSyncs: []sync.ISync{
Expand All @@ -245,6 +251,7 @@ func Test_SyncsFromFromConfig(t *testing.T) {
&file.Sync{},
&kubernetes.Sync{},
&blob.Sync{},
&blob.Sync{},
},
wantErr: false,
},
Expand Down Expand Up @@ -324,3 +331,90 @@ func Test_GcsConfig(t *testing.T) {
})
}
}

func Test_AzblobConfig(t *testing.T) {
lg := logger.NewLogger(nil, false)
defaultInterval := uint32(5)
tests := []struct {
name string
uri string
interval uint32
storageAccount string
expectedBucket string
expectedObject string
expectedInterval uint32
wantErr bool
}{
{
name: "simple path",
uri: "azblob://bucket/path/to/object",
interval: 10,
storageAccount: "myaccount",
expectedBucket: "azblob://bucket/",
expectedObject: "path/to/object",
expectedInterval: 10,
wantErr: false,
},
{
name: "default interval",
uri: "azblob://bucket/path/to/object",
storageAccount: "myaccount",
expectedBucket: "azblob://bucket/",
expectedObject: "path/to/object",
expectedInterval: defaultInterval,
wantErr: false,
},
{
name: "no object set", // Blob syncer will return error when fetching
uri: "azblob://bucket/",
storageAccount: "myaccount",
expectedBucket: "azblob://bucket/",
expectedObject: "",
expectedInterval: defaultInterval,
wantErr: false,
},
{
name: "malformed uri", // Blob syncer will return error when opening bucket
uri: "malformed",
storageAccount: "myaccount",
expectedBucket: "",
expectedObject: "malformed",
expectedInterval: defaultInterval,
wantErr: false,
},
{
name: "storage account not set", // Sync builder will fail and return error
uri: "azblob://bucket/path/to/object",
storageAccount: "",
wantErr: true,
},
{
name: "storage account contains whitespace", // Sync builder will fail and return error
uri: "azblob://bucket/path/to/object",
storageAccount: "my account",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv("AZURE_STORAGE_ACCOUNT", tt.storageAccount)
azblobSync, err := NewSyncBuilder().newAzblob(sync.SourceConfig{
URI: tt.uri,
Interval: tt.interval,
}, lg)

if (err != nil) != tt.wantErr {
t.Errorf("newAzblob() error = %v, wantErr %v", err, tt.wantErr)
return
}

if (err != nil) && (tt.wantErr == true) {
return
}

require.Equal(t, tt.expectedBucket, azblobSync.Bucket)
require.Equal(t, tt.expectedObject, azblobSync.Object)
require.Equal(t, int(tt.expectedInterval), int(azblobSync.Interval))
})
}
}
7 changes: 6 additions & 1 deletion core/pkg/sync/builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) {
URI: uri,
Provider: syncProviderGcs,
})
case regAzblob.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: uri,
Provider: syncProviderAzblob,
})
default:
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
"'http(s)://', 'grpc(s)://', 'gs://' or 'core.openfeature.dev'", uri)
"'http(s)://', 'grpc(s)://', 'gs://', 'azblob://' or 'core.openfeature.dev'", uri)
}
}
return syncProvidersParsed, nil
Expand Down
12 changes: 11 additions & 1 deletion core/pkg/sync/builder/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestParseSource(t *testing.T) {
{"uri":"http://test.com","provider":"http","bearerToken":":)"},
{"uri":"host:port","provider":"grpc"},
{"uri":"default/my-crd","provider":"kubernetes"},
{"uri":"gs://bucket-name/path/to/file","provider":"gcs"}
{"uri":"gs://bucket-name/path/to/file","provider":"gcs"},
{"uri":"azblob://bucket-name/path/to/file","provider":"azblob"}
]`,
expectErr: false,
out: []sync.SourceConfig{
Expand All @@ -54,6 +55,10 @@ func TestParseSource(t *testing.T) {
URI: "gs://bucket-name/path/to/file",
Provider: syncProviderGcs,
},
{
URI: "azblob://bucket-name/path/to/file",
Provider: syncProviderAzblob,
},
},
},
"multiple-syncs-with-options": {
Expand Down Expand Up @@ -188,6 +193,7 @@ func TestParseSyncProviderURIs(t *testing.T) {
"grpcs://secure-grpc",
"core.openfeature.dev/default/my-crd",
"gs://bucket-name/path/to/file",
"azblob://bucket-name/path/to/file",
},
expectErr: false,
out: []sync.SourceConfig{
Expand Down Expand Up @@ -217,6 +223,10 @@ func TestParseSyncProviderURIs(t *testing.T) {
URI: "gs://bucket-name/path/to/file",
Provider: syncProviderGcs,
},
{
URI: "azblob://bucket-name/path/to/file",
Provider: syncProviderAzblob,
},
},
},
"empty": {
Expand Down
18 changes: 17 additions & 1 deletion docs/concepts/syncs.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ See [sync source](../reference/sync-configuration.md#source-configuration) confi

### GCS sync

The GCS sync provider fetches flags from a GCS blob and periodically poll the GCS for the flag definition updates.
The GCS sync provider fetches flags from a GCS blob and periodically polls the GCS for the flag definition updates.
It uses [application default credentials](https://cloud.google.com/docs/authentication/application-default-credentials) if they
are [configured](https://cloud.google.com/docs/authentication/provide-credentials-adc) to authorize the calls to GCS.

Expand All @@ -85,6 +85,22 @@ In this example, `gs://my-bucket/my-flags.json` is expected to be a valid GCS UR
The polling interval can be configured.
See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details.

### Azure Blob sync

The Azure Blob sync provider fetches flags from an Azure Blob Storage blob and periodically polls the blob for the flag definition updates.
It uses [environment variables](https://pkg.go.dev/gocloud.dev/blob/azureblob#hdr-URLs) to set the Storage Account name and to
authorize the calls to Azure Blob Storage.

```shell
flagd start --uri azblob://my-container/my-flags.json
```

In this example, assuming the environment variable AZURE_STORAGE_ACCOUNT is set to `myaccount`, and other options are not set, the service URL will be:
"https://myaccount.blob.core.windows.net/my-container/my-flags.json".
This is expected be a valid service URL accessible by flagd (either by being public or together with environment variable credentials).
The polling interval can be configured.
See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details.

## Merging

Flagd can be configured to read from multiple sources at once, when this is the case flagd will merge all flag definition into a single
Expand Down
13 changes: 9 additions & 4 deletions docs/reference/sync-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ See [syncs](../concepts/syncs.md) for a conceptual overview.

## URI patterns

Any URI passed to flagd via the `--uri` (`-f`) flag must follow one of the 4 following patterns with prefixes to ensure that
Any URI passed to flagd via the `--uri` (`-f`) flag must follow one of the 6 following patterns with prefixes to ensure that
it is passed to the correct implementation:

| Implied Sync Provider | Prefix | Example |
Expand All @@ -18,6 +18,7 @@ it is passed to the correct implementation:
| `http` | `http(s)://` | `https://my-flags.com/flags` |
| `grpc` | `grpc(s)://` | `grpc://my-flags-server` |
| `gcs` | `gs://` | `gs://my-bucket/my-flags.json` |
| `azblob` | `azblob://` | `azblob://my-container/my-flags.json` |

## Source Configuration

Expand All @@ -31,10 +32,10 @@ Alternatively, these configurations can be passed to flagd via config file, spec
| Field | Type | Note |
| ----------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| uri | required `string` | Flag configuration source of the sync |
| provider | required `string` | Provider type - `file`, `fsnotify`, `fileinfo`, `kubernetes`, `http`, `grpc` or `gcs` |
| provider | required `string` | Provider type - `file`, `fsnotify`, `fileinfo`, `kubernetes`, `http`, `grpc`, `gcs` or `azblob` |
| authHeader | optional `string` | Used for http sync; set this to include the complete `Authorization` header value for any authentication scheme (e.g., "Bearer token_here", "Basic base64_credentials", etc.). Cannot be used with `bearerToken` |
| bearerToken | optional `string` | (Deprecated) Used for http sync; token gets appended to `Authorization` header with [bearer schema](https://www.rfc-editor.org/rfc/rfc6750#section-2.1). Cannot be used with `authHeader` |
| interval | optional `uint32` | Used for http and gcs syncs; requests will be made at this interval. Defaults to 5 seconds. |
| interval | optional `uint32` | Used for http, gcs and azblob syncs; requests will be made at this interval. Defaults to 5 seconds. |
| tls | optional `boolean` | Enable/Disable secure TLS connectivity. Currently used only by gRPC sync. Default (ex: if unset) is false, which will use an insecure connection |
| providerID | optional `string` | Value binds to grpc connection's providerID field. gRPC server implementations may use this to identify connecting flagd instance |
| selector | optional `string` | Value binds to grpc connection's selector field. gRPC server implementations may use this to filter flag configurations |
Expand Down Expand Up @@ -65,6 +66,7 @@ Sync providers:
- `grpc`(insecure) - grpc-source:8080
- `grpcs`(secure) - my-flag-source:8080
- `gcs` - gs://my-bucket/my-flags.json
- `azblob` - azblob://my-container/my-flags.json

Startup command:

Expand All @@ -80,7 +82,8 @@ Startup command:
{"uri":"grpc-source:8080","provider":"grpc"},
{"uri":"my-flag-source:8080","provider":"grpc", "maxMsgSize": 5242880},
{"uri":"my-flag-source:8080","provider":"grpc", "certPath": "/certs/ca.cert", "tls": true, "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"},
{"uri":"gs://my-bucket/my-flag.json","provider":"gcs"}]'
{"uri":"gs://my-bucket/my-flag.json","provider":"gcs"},
{"uri":"azblob://my-container/my-flag.json","provider":"azblob"}]'
```

Configuration file,
Expand Down Expand Up @@ -111,4 +114,6 @@ sources:
selector: "source=database,app=weatherapp"
- uri: gs://my-bucket/my-flag.json
provider: gcs
- uri: azblob://my-container/my-flags.json
provider: azblob
```
Loading

0 comments on commit ced1139

Please sign in to comment.