diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 3c42f197..4bd53959 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -66,7 +66,7 @@ RUN umask 0002 \ && VERSION=8.0.302 bash install.sh \ # GO && cd /opt/features/src/go \ - && VERSION=1.22.0 bash install.sh \ + && VERSION=1.23.1 bash install.sh \ && chown -R "${USERNAME}" "${GOROOT}" "${GOPATH}" \ # GitHub CLI && cd /opt/features/src/github-cli/ \ diff --git a/.notice-metadata.txt b/.notice-metadata.txt index a37a043b..18c30a71 100644 --- a/.notice-metadata.txt +++ b/.notice-metadata.txt @@ -1,5 +1,5 @@ -87a95a0f0985d9dd78841d4b8f253e1854ccc576dcb4f25d05884fe6dc66d439 cli/go.sum +d6d044a089a2292b836923b0750f640624a60c497f400426a2520fc61d7936af cli/go.sum d806a45ac319a052749da22ec42c6a08e358dbb6f6086f5b33e761e6ab39aa5b server/ControlPlane/packages.lock.json 83e9d72c489371ac10d1509e640c5daaafe4bef2be498456b4342a0700fe6608 server/DataPlane/packages.lock.json e518c00c2e96699b9cf588fe7504950aa2ca33a0ed1d7e53e0bb2b5b0dbce2c4 scripts/generate-notice.sh -0e323214bb9d10373505100678e361103ee4715e7a1c6bc95094f25cf00e1194 NOTICE.txt +3486c92776853d05aa57bb64d0ef7ea46120380bb35b2f5deb4cb3884620052b NOTICE.txt diff --git a/Makefile.cloud b/Makefile.cloud index f5bffaef..578b74ea 100644 --- a/Makefile.cloud +++ b/Makefile.cloud @@ -100,12 +100,17 @@ set-localsettings: set-context "cloudStorage": { "storageAccounts": $$(echo $${helm_values} | jq -c '.buffers.storageAccounts') }, - "bufferSidecarImage": "$$(echo '${ENVIRONMENT_CONFIG_JSON}' | jq -r '.api.helm.tyger.values.bufferSidecarImage')" + "bufferSidecarImage": "$$(echo '${ENVIRONMENT_CONFIG_JSON}' | jq -r '.api.helm.tyger.values.bufferSidecarImage')", + "bufferCopierImage": "$$(echo '${ENVIRONMENT_CONFIG_JSON}' | jq -r '.api.helm.tyger.values.bufferCopierImage')" }, "database": { - "connectionString": "Host=$$(echo $${helm_values} | jq -r '.database.host'); Database=$$(echo $${helm_values} | jq -r '.database.databaseName'); Port=$$(echo $${helm_values} | jq -r '.database.port'); Username=$$(az account show | jq -r '.user.name'); SslMode=VerifyFull", + "host": "$$(echo $${helm_values} | jq -r '.database.host')", + "databaseName": "$$(echo $${helm_values} | jq -r '.database.databaseName')", + "port": "$$(echo $${helm_values} | jq -r '.database.port')", + "username": "$$(az account show | jq -r '.user.name')", "autoMigrate": ${AUTO_MIGRATE}, - "tygerServerRoleName": "$$(echo $${helm_values} | jq -r '.identity.tygerServer.name')" + "tygerServerRoleName": "$$(echo $${helm_values} | jq -r '.identity.tygerServer.name')", + "tygerServerIdentity": "$$(echo $${helm_values} | jq -r '.identity.tygerServer.name')" } } EOF diff --git a/Makefile.docker b/Makefile.docker index 8fc2199b..cfa0087e 100644 --- a/Makefile.docker +++ b/Makefile.docker @@ -42,7 +42,8 @@ set-localsettings: ensure-data-plane-cert } }, "database": { - "connectionString": "Host=/opt/tyger/database; Username=tyger-server", + "host": "/opt/tyger/database", + "databaseName": "tyger-server", "autoMigrate": "true", "tygerServerRoleName": "tyger-server" } diff --git a/NOTICE.txt b/NOTICE.txt index de7a8fd1..03d6126e 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -349,6 +349,32 @@ SOFTWARE. ================================================================================ +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob + + MIT License + + Copyright (c) Microsoft Corporation. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE + +================================================================================ + github.com/AzureAD/microsoft-authentication-library-for-go/apps MIT License @@ -12206,6 +12232,114 @@ limitations under the License. ================================================================================ +github.com/jackc/pgpassfile + +Copyright (c) 2019 Jack Christensen + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +================================================================================ + +github.com/jackc/pgservicefile + +Copyright (c) 2020 Jack Christensen + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +================================================================================ + +github.com/jackc/pgx/v5 + +Copyright (c) 2013-2021 Jack Christensen + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +================================================================================ + +github.com/jackc/puddle/v2 + +Copyright (c) 2018 Jack Christensen + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +================================================================================ + github.com/jmoiron/sqlx Copyright (c) 2013, Jason Moiron @@ -18918,7 +19052,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. golang.org/x/crypto -Copyright (c) 2009 The Go Authors. All rights reserved. +Copyright 2009 The Go Authors. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are @@ -18930,7 +19064,7 @@ notice, this list of conditions and the following disclaimer. copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Google Inc. nor the names of its + * Neither the name of Google LLC nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. @@ -19046,7 +19180,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. golang.org/x/sync -Copyright (c) 2009 The Go Authors. All rights reserved. +Copyright 2009 The Go Authors. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are @@ -19058,7 +19192,7 @@ notice, this list of conditions and the following disclaimer. copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Google Inc. nor the names of its + * Neither the name of Google LLC nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. @@ -19078,7 +19212,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. golang.org/x/sys/unix -Copyright (c) 2009 The Go Authors. All rights reserved. +Copyright 2009 The Go Authors. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are @@ -19090,7 +19224,7 @@ notice, this list of conditions and the following disclaimer. copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Google Inc. nor the names of its + * Neither the name of Google LLC nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. @@ -19110,7 +19244,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. golang.org/x/term -Copyright (c) 2009 The Go Authors. All rights reserved. +Copyright 2009 The Go Authors. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are @@ -19122,7 +19256,7 @@ notice, this list of conditions and the following disclaimer. copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Google Inc. nor the names of its + * Neither the name of Google LLC nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. @@ -19142,7 +19276,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. golang.org/x/text -Copyright (c) 2009 The Go Authors. All rights reserved. +Copyright 2009 The Go Authors. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are @@ -19154,7 +19288,7 @@ notice, this list of conditions and the following disclaimer. copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Google Inc. nor the names of its + * Neither the name of Google LLC nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. @@ -25844,6 +25978,10 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI ================================================================================ ** Microsoft.Extensions.ApiDescription.Server; version 6.0.5 -- +(c) 2008 VeriSign, Inc. +(c) Microsoft Corporation +Copyright James Newton-King 2008 +Copyright James Newton-King 2008 Json.NET MIT License diff --git a/cli/Dockerfile b/cli/Dockerfile index c1fc7b85..e75c5cc8 100644 --- a/cli/Dockerfile +++ b/cli/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM mcr.microsoft.com/oss/go/microsoft/golang:1.22.0-bullseye as go-build +FROM --platform=$BUILDPLATFORM mcr.microsoft.com/oss/go/microsoft/golang:1.23.1-bullseye as go-build ARG TARGETARCH BUILDOS WORKDIR /go/src/app @@ -7,7 +7,7 @@ ARG TYGER_VERSION="a" RUN --mount=type=cache,target=/root/.cache/go-build go mod download COPY . . RUN --mount=type=cache,target=/root/.cache/go-build \ - CGO_ENABLED=0 GOOS=${BUILDOS} GOARCH=${TARGETARCH} go build -ldflags="-s -w -X main.version=${TYGER_VERSION}" -v -o /go/bin/dist/${BUILDOS}/${TARGETARCH}/ ./cmd/buffer-sidecar ./cmd/tyger + CGO_ENABLED=0 GOOS=${BUILDOS} GOARCH=${TARGETARCH} go build -ldflags="-s -w -X main.version=${TYGER_VERSION}" -v -o /go/bin/dist/${BUILDOS}/${TARGETARCH}/ ./cmd/buffer-sidecar ./cmd/tyger ./cmd/buffer-copier FROM mcr.microsoft.com/cbl-mariner/distroless/minimal:2.0-nonroot.20240112 as buffer-sidecar ARG TARGETARCH BUILDOS @@ -21,3 +21,10 @@ ARG TARGETARCH BUILDOS WORKDIR /app COPY --from=go-build /go/bin/dist/${BUILDOS}/${TARGETARCH}/tyger . ENTRYPOINT ["/app/tyger"] + +FROM mcr.microsoft.com/cbl-mariner/distroless/minimal:2.0-nonroot.20240112 as buffer-copier +ARG TARGETARCH BUILDOS + +WORKDIR /app +COPY --from=go-build /go/bin/dist/${BUILDOS}/${TARGETARCH}/buffer-copier . +ENTRYPOINT ["/app/buffer-copier"] diff --git a/cli/cmd/buffer-copier/export.go b/cli/cmd/buffer-copier/export.go new file mode 100644 index 00000000..b3aea279 --- /dev/null +++ b/cli/cmd/buffer-copier/export.go @@ -0,0 +1,416 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package main + +import ( + "context" + "crypto/sha256" + "encoding/base32" + "fmt" + "iter" + "os" + "strings" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/jackc/pgx/v5" + "github.com/microsoft/tyger/cli/internal/dataplane" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "golang.org/x/sync/semaphore" +) + +const ( + parallelExportBufferCount = 512 + maxExportConcurrentRequests = 1024 + exportPageSize = 8192 +) + +func newExportCommand(dbFlags *databaseFlags) *cobra.Command { + sourceStorageEndpoint := "" + destinationStorageEndpoint := "" + bufferIdTransform := func(id string) string { return id } + filter := make(map[string]string) + cmd := &cobra.Command{ + Use: "export", + Short: "Exports the buffers from the current Tyger instance a storage account", + DisableFlagsInUseLine: true, + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + if runId := os.Getenv("TYGER_RUN_ID"); runId != "" { + ctx = log.Ctx(ctx).With().Str("runId", runId).Logger().WithContext(ctx) + } + cred, err := createCredential() + if err != nil { + log.Ctx(cmd.Context()).Fatal().Err(err).Msg("Failed to create credentials") + } + + if hashIds, err := cmd.Flags().GetBool("hash-ids"); err != nil { + panic(err) + } else if hashIds { + bufferIdTransform = hashBufferId + } + + sourceBlobServiceClient, err := azblob.NewClient(sourceStorageEndpoint, cred, &blobClientOptions) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to create blob service client") + } + + destBlobServiceClient, err := azblob.NewClient(destinationStorageEndpoint, cred, &blobClientOptions) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to create blob service client") + } + + if err := verifyStorageAccountConnectivity(ctx, sourceBlobServiceClient); err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to connect to source storage account") + } + if err := verifyStorageAccountConnectivity(ctx, destBlobServiceClient); err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to connect to source storage account") + } + + sema := semaphore.NewWeighted(maxExportConcurrentRequests) + + transferMetrics := dataplane.NewTransferMetrics(ctx) + + overallWg := sync.WaitGroup{} + + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + bufferChannel := make(chan bufferIdAndTags, parallelExportBufferCount) + for range parallelExportBufferCount { + go func() { + for bufferIdAndTags := range bufferChannel { + if err := copyBuffer(ctx, bufferIdAndTags, sourceBlobServiceClient, destBlobServiceClient, transferMetrics, sema, bufferIdTransform); err != nil { + cancel(err) + } else { + transferMetrics.Update(0, 1) + } + overallWg.Done() + } + }() + } + + for page, err := range getBufferIdsAndTags(ctx, dbFlags, filter, cred) { + if err != nil { + cancel(fmt.Errorf("failed to get buffer IDs and tags: %w", err)) + break + } + + for _, bufferIdAndTags := range page { + overallWg.Add(1) + bufferChannel <- bufferIdAndTags + } + } + + close(bufferChannel) + + doneChan := make(chan any) + go func() { + overallWg.Wait() + close(doneChan) + }() + + select { + case <-doneChan: + case <-ctx.Done(): + } + + err = context.Cause(ctx) + if err != nil { + if bloberror.HasCode(err, bloberror.AuthorizationFailure, bloberror.AuthorizationPermissionMismatch, bloberror.InvalidAuthenticationInfo) { + log.Ctx(ctx).Fatal().Err(err).Msgf("Failed to access storage account. Ensure %s has Storage Blob Data Contributor access on the storage account %s", getCurrentPrincipal(context.Background(), cred), destinationStorageEndpoint) + } else { + log.Ctx(ctx).Fatal().Err(err).Msg("Failed to export buffers") + } + } + + transferMetrics.Stop() + }, + } + + cmd.Flags().StringVar(&sourceStorageEndpoint, "source-storage-endpoint", "", "The storage account to export buffers from") + cmd.Flags().StringVar(&destinationStorageEndpoint, "destination-storage-endpoint", "", "The storage account to export buffers to") + cmd.Flags().StringToStringVar(&filter, "filter", filter, "key-value tags to filter the buffers to export") + cmd.Flags().Bool("hash-ids", false, "Hash the buffer IDs before exporting them") + cmd.Flags().MarkHidden("hash-ids") + cmd.MarkFlagRequired("source-storage-endpoint") + cmd.MarkFlagRequired("destination-storage-endpoint") + + return cmd +} + +func copyBuffer(ctx context.Context, + bufferIdAndTags bufferIdAndTags, + sourceBlobServiceClient *azblob.Client, + destBlobServiceClient *azblob.Client, + transferMetrics *dataplane.TransferMetrics, + sema *semaphore.Weighted, + bufferIdTransform func(string) string, +) error { + sourceContainerId := bufferIdAndTags.id + destinationContainerId := bufferIdTransform(sourceContainerId) + sourceContainerClient := sourceBlobServiceClient.ServiceClient().NewContainerClient(sourceContainerId) + destContainerClient := destBlobServiceClient.ServiceClient().NewContainerClient(destinationContainerId) + + _, err := destContainerClient.Create(ctx, nil) + if err != nil { + if bloberror.HasCode(err, bloberror.ContainerAlreadyExists) { + props, err := destContainerClient.GetProperties(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get container properties: %w", err) + } + + // Note: casing is normalized because this is coming from an HTTP header + if status, ok := props.Metadata[exportedBufferStatusKeyHttpHeaderCasing]; ok && status != nil && *status == exportedStatus { + return nil + } + + } else { + return fmt.Errorf("failed to create container: %w", err) + } + } + + blobPager := sourceBlobServiceClient.NewListBlobsFlatPager(sourceContainerId, nil) + + bufferWaitGoup := sync.WaitGroup{} + + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + for blobPager.More() { + blobPage, err := blobPager.NextPage(ctx) + if err != nil { + if bloberror.HasCode(err, bloberror.ContainerNotFound) { + log.Ctx(ctx).Warn().Msgf("container '%s' not found", sourceContainerId) + break + } + return fmt.Errorf("failed to get page of blobs: %w", err) + } + + for _, blob := range blobPage.Segment.BlobItems { + sourceBlobClient := sourceContainerClient.NewBlockBlobClient(*blob.Name) + destBlobClient := destContainerClient.NewBlockBlobClient(*blob.Name) + if err := sema.Acquire(ctx, 1); err != nil { + // context canceled + return err + } + + bufferWaitGoup.Add(1) + go func() { + defer bufferWaitGoup.Done() + defer sema.Release(1) + for { + _, err := destBlobClient.UploadBlobFromURL(ctx, sourceBlobClient.URL(), nil) + if err != nil { + if bloberror.HasCode(err, bloberror.ServerBusy) { + continue + } + + cancel(err) + return + } + break + } + + transferMetrics.Update(uint64(*blob.Properties.ContentLength), 0) + }() + } + } + + bufferWaitGoup.Wait() + + err = context.Cause(ctx) + if err == nil { + tags := make(map[string]*string, len(bufferIdAndTags.tags)+1) + exportedStatus := exportedStatus + tags[exportedBufferStatusKey] = &exportedStatus + for k, v := range bufferIdAndTags.tags { + tags[customTagPrefix+k] = &v + } + + _, err = destContainerClient.SetMetadata(ctx, &container.SetMetadataOptions{Metadata: tags}) + if err != nil { + return fmt.Errorf("failed to set metadata: %w", err) + } + } + + return err +} + +func getBufferIdsAndTags(ctx context.Context, dbFlags *databaseFlags, filter map[string]string, cred azcore.TokenCredential) iter.Seq2[[]bufferIdAndTags, error] { + return func(yield func([]bufferIdAndTags, error) bool) { + pool, err := createDatabaseConnectionPool(ctx, dbFlags, cred) + if err != nil { + yield(nil, fmt.Errorf("failed to create database connection pool: %w", err)) + return + } + defer pool.Close() + + filterTagIds := make(map[string]int) + if len(filter) > 0 { + tagNames := make([]string, 0, len(filter)) + for k := range filter { + tagNames = append(tagNames, k) + } + + keyRows, _ := pool.Query(ctx, `SELECT name, id FROM tag_keys WHERE name = ANY ($1)`, tagNames) + + var name string + var id int + _, err = pgx.ForEachRow(keyRows, []any{&name, &id}, func() error { + filterTagIds[name] = id + return nil + }) + if err != nil { + yield(nil, fmt.Errorf("failed to fetch keys: %w", err)) + return + } + + if len(filterTagIds) != len(filter) { + return + } + } + + lastCreatedAt := time.Time{} + lastBufferId := "" + pageCount := 0 + + // we fetch the results in pages because otherwise this reader could be open for many hours. + query, params := func() (string, []any) { + queryBuilder := strings.Builder{} + params := []any{lastCreatedAt, lastBufferId, exportPageSize} + paramOffset := len(params) + for k, v := range filter { + params = append(params, filterTagIds[k]) + params = append(params, v) + } + + var matchTable string + if len(filter) > 0 { + matchTable = "tags" + } else { + matchTable = "buffers" + } + + queryBuilder.WriteString(` + WITH matches AS ( + SELECT t0.created_at, t0.id + FROM `) + queryBuilder.WriteString(matchTable) + queryBuilder.WriteString(" AS t0\n") + + if len(filter) > 0 { + for i := range len(filter) - 1 { + aliasNumber := i + 1 + queryBuilder.WriteString(fmt.Sprintf("INNER JOIN tags AS t%d ON t0.created_at = t%d.created_at AND t0.id = t%d.id\n", aliasNumber, aliasNumber, aliasNumber)) + } + + queryBuilder.WriteString("WHERE\n") + + for i := range len(filter) { + if i > 0 { + queryBuilder.WriteString("AND\n") + } + queryBuilder.WriteString(fmt.Sprintf("t%d.key = $%d AND t%d.value = $%d\n", i, paramOffset+1, i, paramOffset+2)) + paramOffset += 2 + } + } + + if len(filter) == 0 { + queryBuilder.WriteString("WHERE\n") + } else { + queryBuilder.WriteString("AND\n") + } + + queryBuilder.WriteString(` + (t0.created_at, t0.id) > ($1, $2) + ORDER BY t0.created_at ASC, t0.id ASC + LIMIT $3 + ) + SELECT matches.created_at, matches.id, tag_keys.name, tags.value + FROM matches + LEFT JOIN tags ON + matches.created_at = tags.created_at AND matches.id = tags.id + LEFT JOIN tag_keys on tags.key = tag_keys.id + ORDER BY matches.created_at ASC, matches.id ASC`) + return queryBuilder.String(), params + }() + + for { + params[0] = lastCreatedAt + params[1] = lastBufferId + + rows, _ := pool.Query(ctx, query, params...) + + var page []bufferIdAndTags + if pageCount == 0 { + page = make([]bufferIdAndTags, 0, 1024) + } else { + page = make([]bufferIdAndTags, 0, exportPageSize) + } + pageCount++ + + current := bufferIdAndTags{} + + for rows.Next() { + var id string + var tagKey *string + var tagValue *string + err := rows.Scan(&lastCreatedAt, &id, &tagKey, &tagValue) + if err != nil { + yield(nil, fmt.Errorf("failed to read buffers from database: %w", err)) + rows.Close() + return + } + + lastBufferId = id + if id != current.id { + if current.id != "" { + page = append(page, current) + } + current = bufferIdAndTags{id: id} + if tagKey != nil { + current.tags = map[string]string{*tagKey: *tagValue} + } + } + + if tagKey != nil { + current.tags[*tagKey] = *tagValue + } + } + + if current.id != "" { + page = append(page, current) + } + + if err := rows.Err(); err != nil { + yield(nil, fmt.Errorf("failed to read buffers from database: %w", err)) + return + } + + if len(page) == 0 { + return + } + + if !yield(page, nil) { + return + } + + if len(page) < exportPageSize { + return + } + } + } +} + +func hashBufferId(id string) string { + hash := sha256.Sum256([]byte(id)) + return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:])) +} diff --git a/cli/cmd/buffer-copier/import.go b/cli/cmd/buffer-copier/import.go new file mode 100644 index 00000000..c8cd0337 --- /dev/null +++ b/cli/cmd/buffer-copier/import.go @@ -0,0 +1,244 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package main + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" + "github.com/dustin/go-humanize" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" +) + +const ( + listContainerPageSize = 5000 // max is 5000 + dbBatchSize = 25_000 + containerPrefixChars = "abcdefghijklmnopqrstuvwxyz0123456789" // container names must start with a lowercase letter or number +) + +func newImportCommand(dbFlags *databaseFlags) *cobra.Command { + storageEndpoint := "" + cmd := &cobra.Command{ + Use: "import", + Short: "Imports buffers in a storage account to the current Tyger instance", + DisableFlagsInUseLine: true, + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + if runId := os.Getenv("TYGER_RUN_ID"); runId != "" { + ctx = log.Ctx(ctx).With().Str("runId", runId).Logger().WithContext(ctx) + } + cred, err := createCredential() + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("Failed to create credentials") + } + + blobServiceClient, err := azblob.NewClient(storageEndpoint, cred, &blobClientOptions) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to create blob service client") + } + + if err := verifyStorageAccountConnectivity(ctx, blobServiceClient); err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to connect to source storage account") + } + + containerChannel := make(chan *service.ContainerItem, listContainerPageSize*10) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + wg := sync.WaitGroup{} + for _, r := range containerPrefixChars { + wg.Add(1) + go func() { + defer wg.Done() + prefix := string(r) + pageSize := int32(listContainerPageSize) + pager := blobServiceClient.NewListContainersPager(&azblob.ListContainersOptions{Include: azblob.ListContainersInclude{Metadata: true}, MaxResults: &pageSize, Prefix: &prefix}) + for pager.More() { + page, err := pager.NextPage(ctx) + if err != nil { + cancel(fmt.Errorf("failed to list containers: %w", err)) + return + } + + for _, container := range page.ContainerItems { + if status, ok := container.Metadata[exportedBufferStatusKey]; ok && *status == exportedStatus { + containerChannel <- container + } + } + } + }() + } + + go func() { + wg.Wait() + close(containerChannel) + }() + + err = bulkInsert(ctx, dbFlags, cred, dbBatchSize, containerChannel) + if ctxCause := context.Cause(ctx); ctxCause != nil { + err = ctxCause + } + + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("Import failed") + } + }, + } + + cmd.Flags().StringVar(&storageEndpoint, "storage-endpoint", "", "The storage account to import buffers from") + cmd.MarkFlagRequired("storage-endpoint") + + return cmd +} + +func bulkInsert(ctx context.Context, dbFlags *databaseFlags, cred azcore.TokenCredential, batchSize int, containers <-chan *service.ContainerItem) error { + pool, err := createDatabaseConnectionPool(ctx, dbFlags, cred) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Msg("failed to create database connection pool") + } + + totalCount := int64(0) + page := make([]*service.ContainerItem, 0, batchSize) + + for container := range containers { + totalCount++ + page = append(page, container) + if len(page) == batchSize { + if err := insertBatch(ctx, pool, page, totalCount); err != nil { + return fmt.Errorf("failed to insert batch: %w", err) + } + page = page[:0] + } + } + + if len(page) > 0 { + if err := insertBatch(ctx, pool, page, totalCount); err != nil { + return fmt.Errorf("failed to insert batch: %w", err) + } + } + return nil +} + +func insertBatch(ctx context.Context, pool *pgxpool.Pool, containerBatch []*service.ContainerItem, totalCount int64) error { + start := time.Now() + conn, err := pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("failed to acquire connection: %w", err) + } + defer conn.Release() + + tx, err := conn.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + + defer tx.Rollback(ctx) + + _, err = conn.Exec(ctx, ` + CREATE TEMPORARY TABLE temp_buffers ( + id TEXT, + created_at timestamp with time zone + ) + ON COMMIT DROP; + + CREATE TEMPORARY TABLE temp_tags ( + id TEXT, + key TEXT, + value TEXT + + ) + ON COMMIT DROP; + `) + + if err != nil { + return err + } + + // insert buffers to temp table + createdAt := time.Now().UTC() + + bufferSource := pgx.CopyFromSlice(len(containerBatch), func(i int) ([]any, error) { return []any{containerBatch[i].Name, createdAt}, nil }) + if _, err := tx.CopyFrom(ctx, []string{"temp_buffers"}, []string{"id", "created_at"}, bufferSource); err != nil { + return fmt.Errorf("failed to bulk copy data: %w", err) + } + + // insert tags to temp table + tagRows := make([][]any, 0, len(containerBatch)) + for _, container := range containerBatch { + for k, v := range container.Metadata { + if strings.HasPrefix(k, customTagPrefix) { + tagRows = append(tagRows, []any{container.Name, k[len(customTagPrefix):], v}) + } + } + } + + tagSource := pgx.CopyFromRows(tagRows) + if _, err := tx.CopyFrom(ctx, []string{"temp_tags"}, []string{"id", "key", "value"}, tagSource); err != nil { + return fmt.Errorf("failed to bulk copy tags: %w", err) + } + + commandBatch := &pgx.Batch{} + commandBatch.Queue(` + INSERT INTO tag_keys (name) + SELECT DISTINCT key + FROM temp_tags + WHERE NOT EXISTS (SELECT * FROM tag_keys WHERE name = temp_tags.key) + ON CONFLICT (name) DO NOTHING + `) + + newBufferCount := 0 + commandBatch.Queue(` + WITH inserted_buffers AS ( + INSERT INTO buffers (id, created_at, etag) + SELECT id, created_at, '0' + FROM temp_buffers + ON CONFLICT (id) DO NOTHING + RETURNING id, created_at + ), mapped_tags AS ( + SELECT + temp_tags.id AS id, + inserted_buffers.created_at AS created_at, + tag_keys.id AS key, + temp_tags.value as value + FROM temp_tags + INNER JOIN inserted_buffers ON temp_tags.id = inserted_buffers.id + INNER JOIN tag_keys ON temp_tags.key = tag_keys.name + ), inserted_tags AS ( + INSERT INTO tags (id, created_at, key, value) + SELECT id, created_at, key, value + FROM mapped_tags + ) + SELECT COUNT(*) FROM inserted_buffers; + `).QueryRow(func(row pgx.Row) error { + return row.Scan(&newBufferCount) + }) + + batchResults := tx.SendBatch(ctx, commandBatch) + err = batchResults.Close() + if err != nil { + return fmt.Errorf("failed to insert buffers: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.Info(). + Str("duration", time.Since(start).String()). + Str("newBuffers", humanize.Comma(int64(newBufferCount))). + Str("totalCount", humanize.Comma(totalCount)).Msg("Inserted batch") + + return err +} diff --git a/cli/cmd/buffer-copier/main.go b/cli/cmd/buffer-copier/main.go new file mode 100644 index 00000000..c906613f --- /dev/null +++ b/cli/cmd/buffer-copier/main.go @@ -0,0 +1,222 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package main + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/golang-jwt/jwt/v5" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/microsoft/tyger/cli/internal/cmd" +) + +const ( + exportedBufferStatusKey = "tyger_exported_buffer_status" + exportedStatus = "exported" + customTagPrefix = "tyger_custom_tag_" +) + +var ( + exportedBufferStatusKeyHttpHeaderCasing = http.CanonicalHeaderKey(exportedBufferStatusKey) +) + +var ( + // set during build + version = "" + + blobClientOptions = azblob.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: &RoundripTransporter{ + inner: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: false, + MaxIdleConns: 10000, + MaxIdleConnsPerHost: 5000, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + TLSClientConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + Renegotiation: tls.RenegotiateFreelyAsClient, + }, + }}, + Retry: policy.RetryOptions{ + MaxRetries: 50, + }, + }, + } +) + +func main() { + dbFlags := databaseFlags{} + + rootCommand := cmd.NewCommonRootCommand(version) + rootCommand.Use = "buffer-copier" + rootCommand.Long = `Export and import buffers from one Tyger instance to another` + + // add flags for the root command based on the commonFlags struct + rootCommand.PersistentFlags().StringVar(&dbFlags.dbName, "db-name", "postgres", "The name of the database to use for exporting or importing buffers") + rootCommand.PersistentFlags().StringVar(&dbFlags.dbHost, "db-host", "", "The host of the database to use for exporting or importing buffers") + rootCommand.PersistentFlags().IntVar(&dbFlags.dbPort, "db-port", 5432, "The port of the database to use for exporting or importing buffers") + rootCommand.PersistentFlags().StringVar(&dbFlags.dbUser, "db-user", "", "The user of the database to use for exporting or importing buffers") + + rootCommand.MarkPersistentFlagRequired("destination-storage-endpoint") + rootCommand.MarkPersistentFlagRequired("db-host") + rootCommand.MarkPersistentFlagRequired("db-user") + + rootCommand.AddCommand(newExportCommand(&dbFlags)) + rootCommand.AddCommand(newImportCommand(&dbFlags)) + + err := rootCommand.Execute() + if err != nil { + os.Exit(1) + } +} + +// Do a quick check to see if we can reach the storage account. Do not wait for the retries to complete. +func verifyStorageAccountConnectivity(ctx context.Context, client *azblob.Client) error { + resChan := make(chan any) + go func() { + _, err := client.ServiceClient().GetAccountInfo(ctx, nil) + resChan <- err + close(resChan) + }() + + select { + case <-resChan: + return nil + case <-time.After(time.Minute): + return fmt.Errorf("failed to connect to storage endpoint %s", client.ServiceClient().URL()) + } +} + +type databaseFlags struct { + dbName string + dbHost string + dbPort int + dbUser string +} + +type bufferIdAndTags struct { + id string + tags map[string]string +} + +func createCredential() (azcore.TokenCredential, error) { + cred := make([]azcore.TokenCredential, 0) + cliCred, err := azidentity.NewAzureCLICredential(nil) + if err == nil { + cred = append(cred, cliCred) + } + + workloadCred, err := azidentity.NewWorkloadIdentityCredential(nil) + if err == nil { + cred = append(cred, workloadCred) + } + + return azidentity.NewChainedTokenCredential(cred, nil) +} + +func createDatabaseConnectionPool(ctx context.Context, commonFlags *databaseFlags, cred azcore.TokenCredential) (*pgxpool.Pool, error) { + connectionString := fmt.Sprintf("host=%s port=%d dbname=%s user=%s sslmode=verify-full", commonFlags.dbHost, commonFlags.dbPort, commonFlags.dbName, commonFlags.dbUser) + config, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, fmt.Errorf("failed to parse database connection config: %w", err) + } + + config.BeforeConnect = func(ctx context.Context, cc *pgx.ConnConfig) error { + tokenResponse, err := cred.GetToken(context.Background(), policy.TokenRequestOptions{ + Scopes: []string{"https://ossrdbms-aad.database.windows.net/.default"}, + }) + if err != nil { + return fmt.Errorf("failed to get database token: %w", err) + } + cc.Config.Password = tokenResponse.Token + return nil + } + + pool, err := pgxpool.NewWithConfig(ctx, config) + + if err != nil { + return nil, fmt.Errorf("failed to create database connection pool: %w", err) + } + + return pool, nil +} + +type RoundripTransporter struct { + inner http.RoundTripper +} + +func (t *RoundripTransporter) Do(req *http.Request) (*http.Response, error) { + return t.inner.RoundTrip(req) +} + +func getCurrentPrincipal(ctx context.Context, cred azcore.TokenCredential) string { + const unknownPrincipal = "unknown" + tokenResponse, err := cred.GetToken(ctx, policy.TokenRequestOptions{Scopes: []string{"https://storage.azure.com/.default"}}) + if err != nil { + return unknownPrincipal + } + + claims := jwt.MapClaims{} + _, _, err = jwt.NewParser().ParseUnverified(tokenResponse.Token, claims) + if err != nil { + return unknownPrincipal + } + + oid, _ := claims["oid"].(string) + appId, _ := claims["appid"].(string) + mi, _ := claims["xms_mirid"].(string) + idtyp, _ := claims["idtyp"].(string) + + if mi != "" { + miString := fmt.Sprintf("managed identity %s", mi) + if appId != "" { + miString += fmt.Sprintf(" (app ID %s)", appId) + } + return miString + } + + switch idtyp { + case "user": + if name, _ := claims["unique_name"].(string); name != "" { + return name + } + + if name, _ := claims["upn"].(string); name != "" { + return name + } + + if oid != "" { + return fmt.Sprintf("user %s", oid) + } + + case "app": + if appId != "" { + return fmt.Sprintf("app ID %s", appId) + } + + if oid != "" { + return fmt.Sprintf("app %s", oid) + } + } + + return unknownPrincipal +} diff --git a/cli/go.mod b/cli/go.mod index c1f5b4bf..d6860bcc 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -1,11 +1,11 @@ module github.com/microsoft/tyger/cli -go 1.22 +go 1.23 require ( dario.cat/mergo v1.0.0 - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/authorization/armauthorization/v2 v2.2.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 @@ -14,7 +14,8 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/postgresql/armpostgresqlflexibleservers/v4 v4.0.0-beta.4 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 - github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 github.com/IGLOU-EU/go-wildcard/v2 v2.0.2 github.com/a8m/envsubst v1.4.2 @@ -35,9 +36,11 @@ require ( github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/ipinfo/go/v2 v2.10.0 + github.com/jackc/pgx/v5 v5.7.1 github.com/knadh/koanf/parsers/yaml v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 github.com/knadh/koanf/v2 v2.1.0 + github.com/lib/pq v1.10.9 github.com/mattn/go-ieproxy v0.0.11 github.com/mittwald/go-helm-client v0.12.9 github.com/pkg/errors v0.9.1 @@ -45,18 +48,19 @@ require ( github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.28.0 - golang.org/x/sync v0.7.0 - golang.org/x/term v0.21.0 + golang.org/x/sync v0.8.0 + golang.org/x/term v0.24.0 helm.sh/helm/v3 v3.14.3 k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/client-go v0.29.3 + oras.land/oras-go v1.2.5 sigs.k8s.io/yaml v1.4.0 ) require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect - github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/BurntSushi/toml v1.3.2 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect @@ -115,6 +119,9 @@ require ( github.com/huandu/xstrings v1.4.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/invopop/yaml v0.1.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -122,7 +129,6 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect - github.com/lib/pq v1.10.9 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -173,12 +179,12 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.starlark.net v0.0.0-20240329153429-e6e8e7ce1b7a // indirect - golang.org/x/crypto v0.24.0 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.26.0 // indirect + golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect @@ -196,7 +202,6 @@ require ( k8s.io/kube-openapi v0.0.0-20240322212309-b815d8309940 // indirect k8s.io/kubectl v0.29.3 // indirect k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect - oras.land/oras-go v1.2.5 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.16.0 // indirect sigs.k8s.io/kustomize/kyaml v0.16.0 // indirect @@ -218,6 +223,6 @@ require ( github.com/sergi/go-diff v1.2.0 // indirect github.com/spf13/pflag v1.0.5 github.com/thediveo/enumflag v0.10.1 - golang.org/x/sys v0.21.0 + golang.org/x/sys v0.25.0 gopkg.in/inf.v0 v0.9.1 // indirect ) diff --git a/cli/go.sum b/cli/go.sum index 3cf7d7d4..76dc1930 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -4,18 +4,20 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/azure-sdk-for-go v56.3.0+incompatible h1:DmhwMrUIvpeoTDiWRDtNHqelNUd3Og8JCkrLHQK795c= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0 h1:U2rTu3Ef+7w9FHKIAXM6ZyqF3UOWJZ12zIm8zECAFfg= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 h1:jBQA3cKT4L2rWMpgE7Yt3Hwh2aUj8KXjIGLxjHeYNNo= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0/go.mod h1:4OG6tQ9EOP/MT0NMjDlRzWoVFxfu9rN9B2X+tlSVktg= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/authorization/armauthorization/v2 v2.2.0 h1:Hp+EScFOu9HeCbeW8WU2yQPJd4gGwhMgKxWe+G6jNzw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/authorization/armauthorization/v2 v2.2.0/go.mod h1:/pz8dyNQe+Ey3yBp/XuYz7oqX8YDNWVpPB0hH3XWfbc= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0 h1:0nGmzwBv5ougvzfGPCO2ljFRHvun57KpNrVCMrlk0ns= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0/go.mod h1:gYq8wyDgv6JLhGbAU6gg8amCPgQWRE+aCvrV2gyzdfs= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0 h1:PTFGRSlMKCQelWwxUyYVEUqseBJVemLyqWJjvMyt0do= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0/go.mod h1:LRr2FzBTQlONPPa5HREE5+RjSCTXl7BwOvYOaWTqCaI= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v3 v3.0.0 h1:Kb8eVvjdP6kZqYnER5w/PiGCFp91yVgaxve3d7kCEpY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v3 v3.0.0/go.mod h1:lYq15QkJyEsNegz5EhI/0SXQ6spvGfgwBH/Qyzkoc/s= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/managementgroups/armmanagementgroups v1.0.0 h1:pPvTJ1dY0sA35JOeFq6TsY2xj6Z85Yo23Pj4wCCvu4o= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/managementgroups/armmanagementgroups v1.0.0/go.mod h1:mLfWfj8v3jfWKsL9G4eoBoXVcsqcIUTapmdKy7uGOp0= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 h1:Ds0KRF8ggpEGg4Vo42oX1cIt/IfOhHWJBikksZbVxeg= @@ -30,8 +32,10 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1. github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= -github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0/go.mod h1:T5RfihdXtBDxt1Ch2wobif3TvzTdumDy29kahv6AV9A= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 h1:Be6KInmFEKV81c0pOAEbRYehLMwmmGI1exuFj248AMk= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0/go.mod h1:WCPBHsOXfBVnivScjs2ypRfimjEW0qPVLGgJkZlrIOA= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= @@ -303,6 +307,14 @@ github.com/invopop/yaml v0.1.0 h1:YW3WGUoJEXYfzWBjn00zIlrw7brGVD0fUKRYDPAPhrc= github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= github.com/ipinfo/go/v2 v2.10.0 h1:v9sFjaxnVVD+JVgpWpjgwols18Tuu4SgBDaHHaw0IXo= github.com/ipinfo/go/v2 v2.10.0/go.mod h1:tRDkYfM20b1XzNqorn1Q1O6Xtg7uzw3Wn3I2R0SyJh4= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -599,8 +611,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -624,8 +636,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -637,8 +649,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -660,19 +672,19 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/cli/integrationtest/controlplane_test.go b/cli/integrationtest/controlplane_test.go index a0203c5c..9e8997f9 100644 --- a/cli/integrationtest/controlplane_test.go +++ b/cli/integrationtest/controlplane_test.go @@ -1649,6 +1649,44 @@ timeoutSeconds: 600`, BasicImage) assert.Error(t, err) } +func TestExport(t *testing.T) { + t.Parallel() + skipIfUsingUnixSocket(t) + + testId := uuid.NewString() + + originalBufferIds := []string{} + for i := range 10 { + id := runTygerSucceeds(t, "buffer", "create", "--tag", fmt.Sprintf("exporttestindex=%d", i), "--tag", fmt.Sprintf("exporttest=%s", testId)) + originalBufferIds = append(originalBufferIds, id) + + writeCommand := exec.Command("tyger", "buffer", "write", id) + writeCommand.Stdin = bytes.NewBufferString("hello") + + writeStdErr := &bytes.Buffer{} + writeCommand.Stderr = writeStdErr + + assert.NoError(t, writeCommand.Run()) + } + + sas := runTygerSucceeds(t, "buffer", "access", originalBufferIds[0]) + sasUrl, err := url.Parse(sas) + require.NoError(t, err) + storageEndpoint := fmt.Sprintf("https://%s", sasUrl.Host) + + runTygerSucceeds(t, "buffer", "export", storageEndpoint, "--tag", fmt.Sprintf("exporttest=%s", testId), "--hash-ids") + + runTygerSucceeds(t, "buffer", "import") + + jsonOutput := runTygerSucceeds(t, "buffer", "list", "--tag", fmt.Sprintf("exporttest=%s", testId)) + var buffers []model.Buffer + require.NoError(t, json.Unmarshal([]byte(jsonOutput), &buffers)) + assert.Len(t, buffers, len(originalBufferIds)*2) + for _, buffer := range buffers { + assert.Len(t, buffer.Tags, 2) + } +} + func waitForRunStarted(t *testing.T, runId string) model.Run { t.Helper() return waitForRun(t, runId, true, false) diff --git a/cli/integrationtest/dataplane_test.go b/cli/integrationtest/dataplane_test.go index 7aed5870..8734c0ad 100644 --- a/cli/integrationtest/dataplane_test.go +++ b/cli/integrationtest/dataplane_test.go @@ -363,7 +363,7 @@ func TestCancellationOnWrite(t *testing.T) { err = dataplane.Write(writeCtx, writeSasUri, inputReader, dataplane.WithWriteHttpClient(writeClient), dataplane.WithWriteMetadataEndWriteTimeout(time.Minute)) assert.ErrorIs(t, err, context.Canceled) - assert.ErrorContains(t, <-errorChan, "the buffer is in a permanently failed state") + assert.ErrorContains(t, <-errorChan, dataplane.ErrBufferFailedState.Error()) } func TestRunningFromPowershellRaisesWarning(t *testing.T) { diff --git a/cli/integrationtest/expected_openapi_spec.yaml b/cli/integrationtest/expected_openapi_spec.yaml index affe4826..881ccd14 100644 --- a/cli/integrationtest/expected_openapi_spec.yaml +++ b/cli/integrationtest/expected_openapi_spec.yaml @@ -148,6 +148,42 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorBody' + /v1/buffers/export: + post: + tags: + - tyger-server + operationId: exportBuffers + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ExportBuffersRequest' + required: true + responses: + '202': + description: Accepted + content: + application/json: + schema: + $ref: '#/components/schemas/Run' + /v1/buffers/import: + post: + tags: + - tyger-server + operationId: importBuffers + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ImportBuffersRequest' + required: true + responses: + '202': + description: Accepted + content: + application/json: + schema: + $ref: '#/components/schemas/Run' '/v1/codespecs/{name}': put: tags: @@ -384,8 +420,8 @@ paths: schema: type: string responses: - '200': - description: Success + '202': + description: Accepted content: application/json: schema: @@ -578,6 +614,20 @@ components: message: type: string additionalProperties: false + ExportBuffersRequest: + type: object + properties: + destinationStorageEndpoint: + type: string + filters: + type: object + additionalProperties: + type: string + nullable: true + additionalProperties: false + ImportBuffersRequest: + type: object + additionalProperties: false JobCodespec: type: object allOf: @@ -624,6 +674,10 @@ components: - job type: object properties: + kind: + enum: + - user + - system id: type: integer description: The run ID. Populated by the system. diff --git a/cli/integrationtest/testconnectivity/Dockerfile b/cli/integrationtest/testconnectivity/Dockerfile index 5998d920..dc73a2ab 100644 --- a/cli/integrationtest/testconnectivity/Dockerfile +++ b/cli/integrationtest/testconnectivity/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM mcr.microsoft.com/oss/go/microsoft/golang:1.22.0-bullseye as go-build +FROM --platform=$BUILDPLATFORM mcr.microsoft.com/oss/go/microsoft/golang:1.23.1-bullseye as go-build ARG TARGETARCH BUILDOS WORKDIR /go/src/app diff --git a/cli/internal/cmd/buffer.go b/cli/internal/cmd/buffer.go index 1d90c99e..f0e30137 100644 --- a/cli/internal/cmd/buffer.go +++ b/cli/internal/cmd/buffer.go @@ -24,6 +24,8 @@ import ( "github.com/microsoft/tyger/cli/internal/controlplane" "github.com/microsoft/tyger/cli/internal/controlplane/model" "github.com/microsoft/tyger/cli/internal/dataplane" + "github.com/microsoft/tyger/cli/internal/logging" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" ) @@ -49,6 +51,8 @@ func NewBufferCommand() *cobra.Command { cmd.AddCommand(newBufferShowCommand()) cmd.AddCommand(newBufferSetCommand()) cmd.AddCommand(newBufferListCommand()) + cmd.AddCommand(newBufferExportCommand()) + cmd.AddCommand(newBufferImportCommand()) return cmd } @@ -473,8 +477,73 @@ func newBufferListCommand() *cobra.Command { }, } - cmd.Flags().StringToStringVar(&tagEntries, "tag", nil, "add a key-value tag to the buffer. Can be specified multiple times.") + cmd.Flags().StringToStringVar(&tagEntries, "tag", nil, "Only include buffers with the given tag. Can be specified multiple times.") cmd.Flags().IntVarP(&limit, "limit", "l", 1000, "The maximum number of buffers to list. Default 1000") return cmd } + +func newBufferExportCommand() *cobra.Command { + request := model.ExportBuffersRequest{ + Filters: make(map[string]string), + } + + cmd := &cobra.Command{ + Use: "export DESTINATION_STORAGE_ENDPOINT [--tag KEY=VALUE ...]", + Short: "Export buffers to a storage account belonging to another Tyger instance. Note that the Tyger server's managed identity must have the necessary permissions to write to the destination storage account. Only supported in cloud environments.", + DisableFlagsInUseLine: true, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + request.DestinationStorageEndpoint = args[0] + run := model.Run{} + _, err := controlplane.InvokeRequest(cmd.Context(), http.MethodPost, "v1/buffers/export", request, &run) + if err != nil { + log.Fatal().Err(err).Msg("Failed to export buffers") + } + + if err := attachToRunNoBufferIO(cmd.Context(), run, true, false, getSystemRunLogSink(cmd.Context())); err != nil { + log.Fatal().Err(err).Msg("Failed to attach to run") + } + }, + } + + cmd.Flags().StringToStringVar(&request.Filters, "tag", nil, "Only include buffers with the given tag. Can be specified multiple times.") + cmd.Flags().BoolVar(&request.HashIds, "hash-ids", false, "Hash the buffer IDs.") + cmd.Flags().MarkHidden("hash-ids") + + return cmd +} + +func newBufferImportCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "import", + Short: "Import buffers into the local Tyger instance. This command is intended to be run after `tyger buffer export` on another Tyger instance has exported to this instance's storage accounts.", + DisableFlagsInUseLine: true, + Args: cobra.ExactArgs(0), + Run: func(cmd *cobra.Command, args []string) { + run := model.Run{} + _, err := controlplane.InvokeRequest(cmd.Context(), http.MethodPost, "v1/buffers/import", struct{}{}, &run) + if err != nil { + log.Fatal().Err(err).Msg("Failed to import buffers") + } + + if err := attachToRunNoBufferIO(cmd.Context(), run, true, false, getSystemRunLogSink(cmd.Context())); err != nil { + log.Fatal().Err(err).Msg("Failed to attach to run") + } + }, + } + + return cmd +} + +// If we are using the zerolog console writer, this returns an io.Writer that +// feeds lines (that are expected to contain JSON) to the console writer, so that the output is formatted. +func getSystemRunLogSink(ctx context.Context) io.Writer { + loggingSink := logging.GetLogSinkFromContext(ctx) + if consoleWriter, ok := loggingSink.(zerolog.ConsoleWriter); ok { + formatter := logging.NewZeroLogFormatter(consoleWriter) + return formatter + } + + return os.Stderr +} diff --git a/cli/internal/cmd/run.go b/cli/internal/cmd/run.go index c8bb4059..d0746cb4 100644 --- a/cli/internal/cmd/run.go +++ b/cli/internal/cmd/run.go @@ -141,151 +141,7 @@ func newRunExecCommand() *cobra.Command { readDop := dataplane.DefaultReadDop postCreate := func(ctx context.Context, run model.Run) error { - log.Logger = log.Logger.With().Int64("runId", run.Id).Logger() - log.Info().Msg("Run created") - var inputSasUri *url.URL - var outputSasUri *url.URL - var err error - if inputBufferParameter != "" { - bufferId := run.Job.Buffers[inputBufferParameter] - inputSasUri, err = getBufferAccessUri(ctx, bufferId, true) - if err != nil { - return err - } - } - if outputBufferParameter != "" { - bufferId := run.Job.Buffers[outputBufferParameter] - outputSasUri, err = getBufferAccessUri(ctx, bufferId, false) - if err != nil { - return err - } - } - - mainWg := sync.WaitGroup{} - - var stopFunc context.CancelFunc - ctx, stopFunc = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) - - go func() { - <-ctx.Done() - stopFunc() - log.Warn().Msg("Canceling...") - }() - - if inputSasUri != nil { - mainWg.Add(1) - go func() { - defer mainWg.Done() - err := dataplane.Write(ctx, inputSasUri, os.Stdin, - dataplane.WithWriteBlockSize(blockSize), - dataplane.WithWriteDop(writeDop)) - if err != nil { - if errors.Is(err, ctx.Err()) { - err = ctx.Err() - } - log.Fatal().Err(err).Msg("Failed to write input") - } - }() - } - - if outputSasUri != nil { - mainWg.Add(1) - go func() { - defer mainWg.Done() - err := dataplane.Read(ctx, outputSasUri, os.Stdout, - dataplane.WithReadDop(readDop)) - if err != nil { - if errors.Is(err, ctx.Err()) { - err = ctx.Err() - } - log.Fatal().Err(err).Msg("Failed to read output") - } - }() - } - - logsWg := sync.WaitGroup{} - if logs { - logsWg.Add(1) - go func() { - defer logsWg.Done() - err := getLogs(ctx, strconv.FormatInt(run.Id, 10), logTimestamps, -1, nil, true, os.Stderr) - if err != nil { - log.Error().Err(err).Msg("Failed to get logs") - } - }() - } - - consecutiveErrors := 0 - beginWatch: - var runFailedErr error - eventChan, errChan := watchRun(ctx, run.Id) - - for { - select { - case err := <-errChan: - log.Error().Err(err).Msg("Error while watching run") - consecutiveErrors++ - - if consecutiveErrors > 1 { - log.Fatal().Err(err).Msg("Failed to watch run") - } - - goto beginWatch - case event, ok := <-eventChan: - if !ok { - goto end - } - consecutiveErrors = 0 - - if event.Status != nil { - logEntry := log.Info().Str("status", event.Status.String()) - if event.RunningCount != nil { - logEntry = logEntry.Int("runningCount", *event.RunningCount) - } - logEntry.Msg("Run status changed") - - switch *event.Status { - case model.Succeeded: - goto end - case model.Pending: - case model.Running: - default: - msg := fmt.Sprintf("run failed with status %s", event.Status.String()) - if event.StatusReason != "" { - msg = fmt.Sprintf("%s (%s)", msg, event.StatusReason) - } - runFailedErr = errors.New(msg) - goto end - } - } - } - } - - end: - mainWg.Wait() - - if logs { - // The run has completed and we have received all data. We just need to wait for the logs to finish streaming, - // but we will give up after a period of time. - c := make(chan struct{}) - go func() { - defer close(c) - logsWg.Wait() - }() - - select { - case <-c: - break - case <-time.After(20 * time.Second): - log.Warn().Msg("Timed out waiting for logs to finish streaming") - } - } - - if runFailedErr != nil { - log.Fatal().Err(runFailedErr).Msg("Run failed") - } - - return nil + return attachToRun(ctx, run, inputBufferParameter, outputBufferParameter, blockSize, writeDop, readDop, logs, logTimestamps, os.Stderr) } cmd := newRunCreateCommandCore("exec", preValidate, postCreate) @@ -324,6 +180,158 @@ If the job has a single output buffer, stdout is streamed from the buffer.` return cmd } +func attachToRunNoBufferIO(ctx context.Context, run model.Run, logs bool, logTimestamps bool, logSink io.Writer) error { + return attachToRun(ctx, run, "", "", dataplane.DefaultBlockSize, dataplane.DefaultWriteDop, dataplane.DefaultReadDop, logs, logTimestamps, logSink) +} + +func attachToRun(ctx context.Context, run model.Run, inputBufferParameter, outputBufferParameter string, blockSize int, writeDop int, readDop int, logs bool, logTimestamps bool, logSink io.Writer) error { + log.Logger = log.Logger.With().Int64("runId", run.Id).Logger() + log.Info().Msg("Run created") + var inputSasUri *url.URL + var outputSasUri *url.URL + var err error + if inputBufferParameter != "" { + bufferId := run.Job.Buffers[inputBufferParameter] + inputSasUri, err = getBufferAccessUri(ctx, bufferId, true) + if err != nil { + return err + } + } + if outputBufferParameter != "" { + bufferId := run.Job.Buffers[outputBufferParameter] + outputSasUri, err = getBufferAccessUri(ctx, bufferId, false) + if err != nil { + return err + } + } + + mainWg := sync.WaitGroup{} + + var stopFunc context.CancelFunc + ctx, stopFunc = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + + go func() { + <-ctx.Done() + stopFunc() + log.Warn().Msg("Canceling...") + }() + + if inputSasUri != nil { + mainWg.Add(1) + go func() { + defer mainWg.Done() + err := dataplane.Write(ctx, inputSasUri, os.Stdin, + dataplane.WithWriteBlockSize(blockSize), + dataplane.WithWriteDop(writeDop)) + if err != nil { + if errors.Is(err, ctx.Err()) { + err = ctx.Err() + } + log.Fatal().Err(err).Msg("Failed to write input") + } + }() + } + + if outputSasUri != nil { + mainWg.Add(1) + go func() { + defer mainWg.Done() + err := dataplane.Read(ctx, outputSasUri, os.Stdout, + dataplane.WithReadDop(readDop)) + if err != nil { + if errors.Is(err, ctx.Err()) { + err = ctx.Err() + } + log.Fatal().Err(err).Msg("Failed to read output") + } + }() + } + + logsWg := sync.WaitGroup{} + if logs { + logsWg.Add(1) + go func() { + defer logsWg.Done() + err := getLogs(ctx, strconv.FormatInt(run.Id, 10), logTimestamps, -1, nil, true, logSink) + if err != nil { + log.Error().Err(err).Msg("Failed to get logs") + } + }() + } + + consecutiveErrors := 0 +beginWatch: + var runFailedErr error + eventChan, errChan := watchRun(ctx, run.Id) + + for { + select { + case err := <-errChan: + log.Error().Err(err).Msg("Error while watching run") + consecutiveErrors++ + + if consecutiveErrors > 1 { + log.Fatal().Err(err).Msg("Failed to watch run") + } + + goto beginWatch + case event, ok := <-eventChan: + if !ok { + goto end + } + consecutiveErrors = 0 + + if event.Status != nil { + logEntry := log.Info().Str("status", event.Status.String()) + if event.RunningCount != nil { + logEntry = logEntry.Int("runningCount", *event.RunningCount) + } + logEntry.Msg("Run status changed") + + switch *event.Status { + case model.Succeeded: + goto end + case model.Pending: + case model.Running: + default: + msg := fmt.Sprintf("run failed with status %s", event.Status.String()) + if event.StatusReason != "" { + msg = fmt.Sprintf("%s (%s)", msg, event.StatusReason) + } + runFailedErr = errors.New(msg) + goto end + } + } + } + } + +end: + mainWg.Wait() + + if logs { + // The run has completed and we have received all data. We just need to wait for the logs to finish streaming, + // but we will give up after a period of time. + c := make(chan struct{}) + go func() { + defer close(c) + logsWg.Wait() + }() + + select { + case <-c: + break + case <-time.After(20 * time.Second): + log.Warn().Msg("Timed out waiting for logs to finish streaming") + } + } + + if runFailedErr != nil { + log.Fatal().Err(runFailedErr).Msg("Run failed") + } + + return nil +} + func newRunCreateCommandCore( commandName string, preValidate func(context.Context, model.Run) error, @@ -924,10 +932,10 @@ func pullImages(ctx context.Context, newRun model.Run) error { } if progress.Status != "" { - logger := log.With().Str("image", imageName).Logger() + logger := log.Ctx(ctx).With().Str("image", imageName).Logger() detail := "" if progress.ProgressDetail != nil && progress.ProgressDetail.Total > 0 { - detail = fmt.Sprintf(" (%s/%s)", humanize.Bytes(progress.ProgressDetail.Current), humanize.Bytes(progress.ProgressDetail.Total)) + detail = fmt.Sprintf(" (%s/%s)", humanize.IBytes(progress.ProgressDetail.Current), humanize.IBytes(progress.ProgressDetail.Total)) } logger.Info().Msgf("%s%s", progress.Status, detail) } diff --git a/cli/internal/controlplane/model/model.go b/cli/internal/controlplane/model/model.go index 74d449bd..6d04cb2d 100644 --- a/cli/internal/controlplane/model/model.go +++ b/cli/internal/controlplane/model/model.go @@ -127,6 +127,7 @@ func (ref CodespecRef) MarshalJSON() ([]byte, error) { type Run struct { RunMetadata + Kind string `json:"kind,omitempty"` Job RunCodeTarget `json:"job,omitempty"` Worker *RunCodeTarget `json:"worker,omitempty"` Cluster string `json:"cluster,omitempty"` @@ -228,3 +229,9 @@ type Cluster struct { Location string `json:"location"` NodePools []NodePool `json:"nodePools"` } + +type ExportBuffersRequest struct { + DestinationStorageEndpoint string `json:"destinationStorageEndpoint"` + Filters map[string]string `json:"filters,omitempty"` + HashIds bool `json:"hashIds,omitempty"` +} diff --git a/cli/internal/dataplane/bufferblob.go b/cli/internal/dataplane/bufferblob.go index 91085702..cf097c3d 100644 --- a/cli/internal/dataplane/bufferblob.go +++ b/cli/internal/dataplane/bufferblob.go @@ -14,11 +14,13 @@ import ( "time" ) +type BufferStatus string + const ( CurrentBufferFormatVersion = "0.3.0" - BufferStatusComplete = "complete" - BufferStatusFailed = "failed" + BufferStatusComplete BufferStatus = "complete" + BufferStatusFailed BufferStatus = "failed" HashChainHeader = "x-ms-meta-cumulative_hash_chain" ContentMD5Header = "Content-MD5" @@ -51,7 +53,7 @@ type BufferStartMetadata struct { } type BufferEndMetadata struct { - Status string `json:"status"` + Status BufferStatus `json:"status"` } type Container struct { diff --git a/cli/internal/dataplane/read.go b/cli/internal/dataplane/read.go index ff755ba6..84ca0f19 100644 --- a/cli/internal/dataplane/read.go +++ b/cli/internal/dataplane/read.go @@ -35,7 +35,7 @@ const ( var ( errPastEndOfBlob = errors.New("past end of blob") ErrNotFound = errors.New("not found") - errBufferFailedState = errors.New("the buffer is in a permanently failed state") + ErrBufferFailedState = errors.New("the buffer is in a permanently failed state") ) type readOptions struct { @@ -91,7 +91,10 @@ func Read(ctx context.Context, uri *url.URL, outputWriter io.Writer, options ... ctx, cancel := context.WithCancel(ctx) defer cancel() - ctx = log.With().Str("operation", "buffer read").Logger().WithContext(ctx) + ctx = log.Ctx(ctx).With(). + Str("operation", "buffer read"). + Str("buffer", container.GetContainerName()). + Logger().WithContext(ctx) if container.SupportsRelay() { return readRelay(ctx, httpClient, readOptions.connectionType, container, outputWriter) @@ -115,11 +118,7 @@ func Read(ctx context.Context, uri *url.URL, outputWriter io.Writer, options ... return err } - metrics := TransferMetrics{ - Context: ctx, - Container: container, - } - metrics.Start() + metrics := NewTransferMetrics(ctx) responseChannel := make(chan chan BufferBlob, readOptions.dop*2) var lock sync.Mutex @@ -164,7 +163,7 @@ func Read(ctx context.Context, uri *url.URL, outputWriter io.Writer, options ... errorChannel <- fmt.Errorf("error downloading blob: %w", err) return } - metrics.Update(uint64(len(respData.Data))) + metrics.Update(uint64(len(respData.Data)), 0) md5Header := respData.Header.Get(ContentMD5Header) if md5Header == "" { @@ -282,7 +281,7 @@ func pollForBufferEnd(ctx context.Context, httpClient *retryablehttp.Client, con case BufferStatusComplete: return nil case BufferStatusFailed: - return errBufferFailedState + return ErrBufferFailedState default: log.Warn().Msgf("Buffer end blob has unexpected status '%s'", bufferEndMetadata.Status) return nil diff --git a/cli/internal/dataplane/relayclient.go b/cli/internal/dataplane/relayclient.go index 169cfcd1..81254bad 100644 --- a/cli/internal/dataplane/relayclient.go +++ b/cli/internal/dataplane/relayclient.go @@ -26,10 +26,7 @@ func relayWrite(ctx context.Context, httpClient *retryablehttp.Client, connectio httpClient = client.CloneRetryableClient(httpClient) httpClient.HTTPClient.Timeout = 0 - metrics := TransferMetrics{ - Context: ctx, - Container: container, - } + metrics := NewTransferMetrics(ctx) pipeReader, pipeWriter := io.Pipe() @@ -40,12 +37,10 @@ func relayWrite(ctx context.Context, httpClient *retryablehttp.Client, connectio }() inputReader = pipeReader - inputReader = &ReaderWithMetrics{transferMetrics: &metrics, reader: inputReader} + inputReader = &ReaderWithMetrics{transferMetrics: metrics, reader: inputReader} partiallyBufferedReader := NewPartiallyBufferedReader(inputReader, 64*1024) - metrics.Start() - request, err := http.NewRequestWithContext(ctx, http.MethodPut, container.String(), partiallyBufferedReader) if err != nil { return fmt.Errorf("error creating request: %w", err) @@ -97,14 +92,9 @@ func readRelay(ctx context.Context, httpClient *retryablehttp.Client, connection return fmt.Errorf("error reading from relay: %s", resp.Status) } - metrics := TransferMetrics{ - Context: ctx, - Container: container, - } - - metrics.Start() + metrics := NewTransferMetrics(ctx) - _, err = io.Copy(outputWriter, &ReaderWithMetrics{transferMetrics: &metrics, reader: resp.Body}) + _, err = io.Copy(outputWriter, &ReaderWithMetrics{transferMetrics: metrics, reader: resp.Body}) if err == nil { metrics.Stop() @@ -184,7 +174,7 @@ type ReaderWithMetrics struct { func (c *ReaderWithMetrics) Read(p []byte) (n int, err error) { n, err = c.reader.Read(p) if n > 0 { - c.transferMetrics.Update(uint64(n)) + c.transferMetrics.Update(uint64(n), 0) } return n, err } diff --git a/cli/internal/dataplane/transfermetrics.go b/cli/internal/dataplane/transfermetrics.go index a0ed2790..2b0ea3a8 100644 --- a/cli/internal/dataplane/transfermetrics.go +++ b/cli/internal/dataplane/transfermetrics.go @@ -5,28 +5,42 @@ package dataplane import ( "context" + "fmt" + "strings" "sync/atomic" "time" + "github.com/dustin/go-humanize" "github.com/rs/zerolog/log" ) type TransferMetrics struct { - Context context.Context - Container *Container + ctx context.Context + totalBuffers atomic.Uint64 totalBytes uint64 - currentPeriodBytes uint64 + currentPeriodBytes atomic.Uint64 startTime time.Time ticker *time.Ticker stoppedChannel chan any reportingComplete chan any } -func (ts *TransferMetrics) Update(byteCount uint64) { - atomic.AddUint64(&ts.currentPeriodBytes, byteCount) +func NewTransferMetrics(ctx context.Context) *TransferMetrics { + return &TransferMetrics{ + ctx: ctx, + } } -func (ts *TransferMetrics) Start() { +func (ts *TransferMetrics) Update(byteCount uint64, completedBuffers uint64) { + newPeriodBytes := ts.currentPeriodBytes.Add(byteCount) + newCompletedBuffers := ts.totalBuffers.Add(completedBuffers) + + if (byteCount > 0 && newPeriodBytes == byteCount || completedBuffers > 0 && newCompletedBuffers == completedBuffers) && ts.startTime == (time.Time{}) { + ts.start() + } +} + +func (ts *TransferMetrics) start() { ts.stoppedChannel = make(chan any) ts.reportingComplete = make(chan any) ts.ticker = time.NewTicker(2 * time.Second) @@ -38,31 +52,63 @@ func (ts *TransferMetrics) Start() { case <-ts.stoppedChannel: ts.reportingComplete <- nil return - case t := <-ts.ticker.C: - elapsed := t.Sub(lastTime) - lastTime = t - currentBytes := atomic.SwapUint64(&ts.currentPeriodBytes, 0) - if currentBytes > 0 { - ts.totalBytes += currentBytes - // For networking throughput in Mbps, we divide by 1000 * 1000 (not 1024 * 1024) because - // networking is traditionally done in base 10 units (not base 2). - currentMbps := float32(currentBytes*8) / (1000 * 1000) / float32(elapsed.Seconds()) - log.Ctx(ts.Context).Info().Float32("throughputMbps", currentMbps).Msg("Transfer progress") + case <-ts.ticker.C: + // The logging call may block, so we measure the current time + // rather than use the time the from the ticker channel + currentTime := time.Now() + elapsed := currentTime.Sub(lastTime) + lastTime = currentTime + currentBytes := ts.currentPeriodBytes.Swap(0) + ts.totalBytes += currentBytes + bytesPerSecond := uint64(float64(currentBytes) / elapsed.Seconds()) + // For networking throughput in Mbps, we divide by 1000 * 1000 (not 1024 * 1024) because + // networking is traditionally done in base 10 units (not base 2). + partial := log.Ctx(ts.ctx).Info().Str("throughput", fmt.Sprintf("%sps", humanizeBytesAsBits(bytesPerSecond))) + totalBuffers := ts.totalBuffers.Load() + if totalBuffers > 0 { + partial = partial.Str("totalBuffers", humanize.Comma(int64(totalBuffers))) } + + partial = partial.Str("totalData", removeSpaces(humanize.IBytes(ts.totalBytes))) + + partial.Msg("Transfer progress") } } }() - log.Ctx(ts.Context).Info().Str("container", ts.Container.GetContainerName()).Msg("Transfer starting") + log.Ctx(ts.ctx).Info().Msg("Transfer starting") } func (ts *TransferMetrics) Stop() { - elapsed := time.Since(ts.startTime) - ts.stoppedChannel <- nil - <-ts.reportingComplete - ts.totalBytes += atomic.SwapUint64(&ts.currentPeriodBytes, 0) - log.Ctx(ts.Context).Info(). - Float32("elapsedSeconds", float32(elapsed.Seconds())). - Float32("totalGiB", float32(ts.totalBytes)/(1024*1024*1024)). - Msg("Transfer complete") + var elapsed time.Duration + if ts.startTime != (time.Time{}) { + elapsed = time.Since(ts.startTime) + ts.stoppedChannel <- nil + <-ts.reportingComplete + ts.totalBytes += ts.currentPeriodBytes.Load() + } + + bytesPerSecond := uint64(float64(ts.totalBytes) / elapsed.Seconds()) + + partial := log.Ctx(ts.ctx).Info(). + Str("elapsed", elapsed.Round(time.Second).String()). + Str("avgThroughput", fmt.Sprintf("%sps", humanizeBytesAsBits(bytesPerSecond))) + + totalBuffers := ts.totalBuffers.Load() + if totalBuffers > 0 { + partial = partial.Uint64("totalBuffers", totalBuffers) + } + + partial = partial.Str("totalData", removeSpaces(humanize.IBytes(ts.totalBytes))) + partial.Msg("Transfer complete") +} + +func humanizeBytesAsBits(bytes uint64) string { + s := humanize.Bytes(bytes * 8) + return removeSpaces(strings.TrimSuffix(s, "B") + "b") +} + +// "1 MB" -> "1MB" to that log field values are not quoted in the console +func removeSpaces(s string) string { + return strings.Replace(s, " ", "", -1) } diff --git a/cli/internal/dataplane/write.go b/cli/internal/dataplane/write.go index b49deea8..adcac26f 100644 --- a/cli/internal/dataplane/write.go +++ b/cli/internal/dataplane/write.go @@ -82,7 +82,11 @@ func Write(ctx context.Context, uri *url.URL, inputReader io.Reader, options ... o(writeOptions) } - ctx = log.With().Str("operation", "buffer write").Logger().WithContext(ctx) + ctx = log.Ctx(ctx).With(). + Str("operation", "buffer write"). + Str("buffer", container.GetContainerName()). + Logger().WithContext(ctx) + if writeOptions.httpClient == nil { tygerClient, _ := controlplane.GetClientFromCache() if tygerClient != nil { @@ -121,10 +125,7 @@ func Write(ctx context.Context, uri *url.URL, inputReader io.Reader, options ... wg := sync.WaitGroup{} wg.Add(writeOptions.dop) - metrics := TransferMetrics{ - Context: ctx, - Container: container, - } + metrics := NewTransferMetrics(ctx) for i := 0; i < writeOptions.dop; i++ { go func() { @@ -158,7 +159,7 @@ func Write(ctx context.Context, uri *url.URL, inputReader io.Reader, options ... return } - metrics.Update(uint64(len(bb.Contents))) + metrics.Update(uint64(len(bb.Contents)), 0) pool.Put(bb.Contents) } @@ -177,9 +178,6 @@ func Write(ctx context.Context, uri *url.URL, inputReader io.Reader, options ... buffer := pool.Get(writeOptions.blockSize) bytesRead, err := io.ReadFull(inputReader, buffer) - if blobNumber == 0 { - metrics.Start() - } if bytesRead > 0 { currentHashChannel := make(chan string, 1) @@ -285,7 +283,7 @@ func writeStartMetadata(ctx context.Context, httpClient *retryablehttp.Client, c return uploadBlobWithRetry(ctx, httpClient, startMetadataUri, startBytes, encodedMD5Hash, "") } -func writeEndMetadata(ctx context.Context, httpClient *retryablehttp.Client, container *Container, status string) { +func writeEndMetadata(ctx context.Context, httpClient *retryablehttp.Client, container *Container, status BufferStatus) { bufferEndMetadata := BufferEndMetadata{Status: status} endBytes, err := json.Marshal(bufferEndMetadata) if err != nil { diff --git a/cli/internal/install/cloudinstall/helm.go b/cli/internal/install/cloudinstall/helm.go index fcd8c709..41fe9894 100644 --- a/cli/internal/install/cloudinstall/helm.go +++ b/cli/internal/install/cloudinstall/helm.go @@ -417,6 +417,7 @@ func (inst *Installer) InstallTygerHelmChart(ctx context.Context, restConfig *re Values: map[string]any{ "image": fmt.Sprintf("%s/tyger-server:%s", install.ContainerRegistry, install.ContainerImageTag), "bufferSidecarImage": fmt.Sprintf("%s/buffer-sidecar:%s", install.ContainerRegistry, install.ContainerImageTag), + "bufferCopierImage": fmt.Sprintf("%s/buffer-copier:%s", install.ContainerRegistry, install.ContainerImageTag), "workerWaiterImage": fmt.Sprintf("%s/worker-waiter:%s", install.ContainerRegistry, install.ContainerImageTag), "hostname": inst.Config.Api.DomainName, "identity": map[string]any{ diff --git a/cli/internal/install/dockerinstall/docker.go b/cli/internal/install/dockerinstall/docker.go index a378a97a..83002bc5 100644 --- a/cli/internal/install/dockerinstall/docker.go +++ b/cli/internal/install/dockerinstall/docker.go @@ -217,7 +217,8 @@ func (inst *Installer) createControlPlaneContainer(ctx context.Context, checkGpu fmt.Sprintf("Buffers__LocalStorage__TcpDataPlaneEndpoint=http://localhost:%d", inst.Config.DataPlanePort), "Buffers__PrimarySigningPrivateKeyPath=" + primaryPublicCertificatePath, "Buffers__SecondarySigningPrivateKeyPath=" + secondaryPublicCertificatePath, - fmt.Sprintf("Database__ConnectionString=Host=%s/database; Username=tyger-server", inst.Config.InstallationPath), + fmt.Sprintf("Database__Host=%s/database", inst.Config.InstallationPath), + "Database__Username=tyger-server", "Database__TygerServerRoleName=tyger-server", }, Healthcheck: &container.HealthConfig{ diff --git a/cli/internal/install/dockerinstall/migrations.go b/cli/internal/install/dockerinstall/migrations.go index 72c51e77..30795915 100644 --- a/cli/internal/install/dockerinstall/migrations.go +++ b/cli/internal/install/dockerinstall/migrations.go @@ -238,7 +238,8 @@ func (inst *Installer) startMigrationRunner(ctx context.Context, containerName s User: fmt.Sprintf("%d:%d", inst.Config.GetUserIdInt(), inst.Config.GetGroupIdInt()), Env: []string{ fmt.Sprintf("Urls=http://unix:%s/control-plane/tyger.sock", inst.Config.InstallationPath), - fmt.Sprintf("Database__ConnectionString=Host=%s/database; Username=tyger-server", inst.Config.InstallationPath), + fmt.Sprintf("Database__Host=%s/database", inst.Config.InstallationPath), + "Database__Username=tyger-server", "Database__AutoMigrate=true", "Database__TygerServerRoleName=tyger-server", "Compute__Docker__Enabled=true", diff --git a/cli/internal/logging/zerologformatter.go b/cli/internal/logging/zerologformatter.go new file mode 100644 index 00000000..86acfa1a --- /dev/null +++ b/cli/internal/logging/zerologformatter.go @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package logging + +import ( + "bytes" + "io" +) + +type ZerologFormatter struct { + sink io.Writer + buf []byte +} + +// Creates an io.Writer that writes lines of JSON to the given sink. +func NewZeroLogFormatter(sink io.Writer) *ZerologFormatter { + return &ZerologFormatter{ + sink: sink, + } +} + +func (w *ZerologFormatter) Write(p []byte) (n int, err error) { + w.buf = append(w.buf, p...) + + for { + i := bytes.IndexByte(w.buf, '\n') + if i < 0 { + break + } + + line := w.buf[:i] + w.sink.Write(line) + w.buf = w.buf[i+1:] + } + + return len(p), nil +} + +func (w *ZerologFormatter) Flush() { + if len(w.buf) > 0 { + w.sink.Write(w.buf) + } +} diff --git a/deploy/config/microsoft/cloudconfig.yml b/deploy/config/microsoft/cloudconfig.yml index fcc80cd0..26a5caed 100644 --- a/deploy/config/microsoft/cloudconfig.yml +++ b/deploy/config/microsoft/cloudconfig.yml @@ -64,4 +64,5 @@ api: values: image: ${TYGER_SERVER_IMAGE} bufferSidecarImage: ${BUFFER_SIDECAR_IMAGE} + bufferCopierImage: ${BUFFER_COPIER_IMAGE} workerWaiterImage: ${WORKER_WAITER_IMAGE} diff --git a/deploy/helm/tyger/templates/migration-runner.yaml b/deploy/helm/tyger/templates/migration-runner.yaml index baab0a1d..130027d1 100644 --- a/deploy/helm/tyger/templates/migration-runner.yaml +++ b/deploy/helm/tyger/templates/migration-runner.yaml @@ -56,10 +56,18 @@ spec: env: - name: AZURE_CLIENT_ID value: {{ .Values.identity.migrationRunner.clientId }} - - name: Database__ConnectionString - value: "Host={{ .Values.database.host }}; Database={{ .Values.database.databaseName }}; Port={{ .Values.database.port }}; Username={{ .Values.identity.migrationRunner.name }}; SslMode=VerifyFull;" + - name: Database__Host + value: {{ .Values.database.host }} + - name: Database__DatabaseName + value: {{ .Values.database.databaseName }} + - name: Database__Port + value: "{{ .Values.database.port }}" + - name: Database__Username + value: {{ .Values.identity.migrationRunner.name }} - name: Database__TygerServerRoleName value: {{ .Values.identity.tygerServer.name }} + - name: Database__TygerServerIdentity + value: {{ .Values.identity.tygerServer.name }} - name: Compute__Kubernetes__Namespace value: {{ .Release.Namespace }} - name: Compute__Kubernetes__JobServiceAccount diff --git a/deploy/helm/tyger/templates/server.yaml b/deploy/helm/tyger/templates/server.yaml index 0c74bf2d..02bcde3d 100644 --- a/deploy/helm/tyger/templates/server.yaml +++ b/deploy/helm/tyger/templates/server.yaml @@ -150,10 +150,18 @@ spec: - name: Compute__Kubernetes__CustomIdentities__{{ $element.name }} value: {{ $tygerName}}-custom-{{ $element.name }}-job {{- end }} - - name: Database__ConnectionString - value: "Host={{ .Values.database.host }}; Database={{ .Values.database.databaseName }}; Port={{ .Values.database.port }}; Username={{ .Values.identity.tygerServer.name }}; SslMode=VerifyFull;" + - name: Database__Host + value: {{ .Values.database.host }} + - name: Database__DatabaseName + value: {{ .Values.database.databaseName }} + - name: Database__Port + value: "{{ .Values.database.port }}" + - name: Database__Username + value: {{ .Values.identity.tygerServer.name }} - name: Database__TygerServerRoleName value: {{ .Values.identity.tygerServer.name }} + - name: Database__TygerServerIdentity + value: {{ .Values.identity.tygerServer.name }} - name: Database__AutoMigrate value: "{{ .Values.database.autoMigrate }}" {{- range $index, $element := .Values.buffers.storageAccounts }} @@ -166,6 +174,8 @@ spec: {{- end }} - name: Buffers__BufferSidecarImage value: {{ required "A value for bufferSidecarImage is required" .Values.bufferSidecarImage }} + - name: Buffers__BufferCopierImage + value: {{ required "A value for bufferCopierImage is required" .Values.bufferCopierImage }} - name: LogArchive__CloudStorage__StorageAccountEndpoint value: {{ required "A value for logArchive.storageAccountEndpoint is required" .Values.logArchive.storageAccountEndpoint }} readinessProbe: diff --git a/deploy/helm/tyger/values.yaml b/deploy/helm/tyger/values.yaml index aada759a..2eb533b2 100644 --- a/deploy/helm/tyger/values.yaml +++ b/deploy/helm/tyger/values.yaml @@ -43,6 +43,7 @@ logArchive: storageAccountEndpoint: bufferSidecarImage: +bufferCopierImage: workerWaiterImage: clusterConfiguration: "{}" diff --git a/docs/guides/buffers.md b/docs/guides/buffers.md index eedbdcfa..78f64844 100644 --- a/docs/guides/buffers.md +++ b/docs/guides/buffers.md @@ -203,3 +203,39 @@ tyger buffer list --tag mykey1=myvalue1 --tag missingkey=missingvalue ```json [] ``` + +## Copying buffers between Tyger instances +::: warning Note +This functionality is only supported when Tyger is running in the cloud. +::: + +Suppose you have two Tyger instances, and you want to copy all buffers including +their tags from one instance to another. This can be accomplished in two steps. + +### Export the buffers + +```bash +tyger buffer export DESTINATION_STORAGE_ENDPOINT [--tag KEY=VALUE ...] +``` + +`DESTINATION_STORAGE_ENDPOINT` should be the blob endpoint of the destination +Tyger instance's storage account. The Tyger server's managed identity needs to have +`Storage Blob Data Contributor` access on this storage account. + +To only export a subset of buffer, you can filter the buffers to be exported by +tags. + +This command starts a special [run](./runs). Logs are displayed inline, but can also be +retrieved later using [`tyger run logs ID`](./runs#viewing-logs). + +### Import the buffers +Once the export run has completed successfully, you can import these buffers +into the destination Tyger instance's database with the command: + +```bash +tyger buffer import +``` + +This starts a run that scans though the instance's storage account and imports +new buffers. Note that existing buffers are not touched and their tags will not +be updated. diff --git a/scripts/build-images.sh b/scripts/build-images.sh index 1112d58e..2171a094 100755 --- a/scripts/build-images.sh +++ b/scripts/build-images.sh @@ -217,6 +217,11 @@ if [[ -n "${buffer_sidecar:-}" ]]; then repo="tyger-cli" build_and_push + + target="buffer-copier" + repo="buffer-copier" + + build_and_push fi if [[ -n "${helm:-}" ]]; then diff --git a/scripts/get-config.sh b/scripts/get-config.sh index 79d99c96..06a183f8 100755 --- a/scripts/get-config.sh +++ b/scripts/get-config.sh @@ -114,6 +114,7 @@ else TYGER_SERVER_IMAGE="${repo_fqdn}/tyger-server:${EXPLICIT_IMAGE_TAG}" TYGER_DATA_PLANE_SERVER_IMAGE="${repo_fqdn}/tyger-data-plane-server:${EXPLICIT_IMAGE_TAG}" BUFFER_SIDECAR_IMAGE="${repo_fqdn}/buffer-sidecar:${EXPLICIT_IMAGE_TAG}" + BUFFER_COPIER_IMAGE="${repo_fqdn}/buffer-copier:${EXPLICIT_IMAGE_TAG}" WORKER_WAITER_IMAGE="${repo_fqdn}/worker-waiter:${EXPLICIT_IMAGE_TAG}" elif [[ "$docker" == true ]]; then arch=$(dpkg --print-architecture) @@ -126,6 +127,7 @@ else TYGER_SERVER_IMAGE="$(docker inspect "${repo_fqdn}/tyger-server:dev-${arch}" 2>/dev/null | jq -r --arg repo "${repo_fqdn}/tyger-server" '.[0].RepoDigests[] | select (startswith($repo))' 2>/dev/null || true)" BUFFER_SIDECAR_IMAGE="$(docker inspect "${repo_fqdn}/buffer-sidecar:dev-${arch}" 2>/dev/null | jq -r --arg repo "${repo_fqdn}/buffer-sidecar" '.[0].RepoDigests[] | select (startswith($repo))' 2>/dev/null || true)" + BUFFER_COPIER_IMAGE="$(docker inspect "${repo_fqdn}/buffer-copier:dev-${arch}" 2>/dev/null | jq -r --arg repo "${repo_fqdn}/buffer-copier" '.[0].RepoDigests[] | select (startswith($repo))' 2>/dev/null || true)" WORKER_WAITER_IMAGE="$(docker inspect "${repo_fqdn}/worker-waiter:dev-${arch}" 2>/dev/null | jq -r --arg repo "${repo_fqdn}/worker-waiter" '.[0].RepoDigests[] | select (startswith($repo))' 2>/dev/null || true)" fi @@ -137,6 +139,7 @@ else export GATEWAY_IMAGE else export WORKER_WAITER_IMAGE + export BUFFER_COPIER_IMAGE fi fi diff --git a/server/ControlPlane/Buffers/AzureBlobBufferProvider.cs b/server/ControlPlane/Buffers/AzureBlobBufferProvider.cs index b96d0b2f..43592b2f 100644 --- a/server/ControlPlane/Buffers/AzureBlobBufferProvider.cs +++ b/server/ControlPlane/Buffers/AzureBlobBufferProvider.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.ComponentModel.DataAnnotations; using Azure; using Azure.Core; using Azure.Storage.Blobs; @@ -8,6 +9,9 @@ using Azure.Storage.Sas; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Options; +using Tyger.ControlPlane.Database; +using Tyger.ControlPlane.Model; +using Tyger.ControlPlane.Runs; namespace Tyger.ControlPlane.Buffers; @@ -16,15 +20,27 @@ public sealed class AzureBlobBufferProvider : IBufferProvider, IHealthCheck, IHo private static readonly TimeSpan s_userDelegationKeyDuration = TimeSpan.FromDays(1); private readonly BlobServiceClient _serviceClient; + private readonly BufferOptions _bufferOptions; + private readonly DatabaseOptions _databaseOptions; + private readonly Lazy _runCreator; private readonly ILogger _logger; private readonly CancellationTokenSource _backgroundCancellationTokenSource = new(); private UserDelegationKey? _userDelegationKey; - public AzureBlobBufferProvider(TokenCredential credential, IOptions config, ILogger logger) + public AzureBlobBufferProvider( + TokenCredential credential, + IOptions storageOptions, + IOptions bufferOptions, + IOptions databaseOptions, + Lazy runCreator, + ILogger logger) { + _runCreator = runCreator; _logger = logger; - var bufferStorageAccountOptions = config.Value.StorageAccounts[0]; + var bufferStorageAccountOptions = storageOptions.Value.StorageAccounts[0]; _serviceClient = new BlobServiceClient(new Uri(bufferStorageAccountOptions.Endpoint), credential); + _bufferOptions = bufferOptions.Value; + _databaseOptions = databaseOptions.Value; } public async Task CreateBuffer(string id, CancellationToken cancellationToken) @@ -76,6 +92,110 @@ public async Task BufferExists(string id, CancellationToken cancellationTo } } + public async Task ExportBuffers(ExportBuffersRequest exportBufferRequest, CancellationToken cancellationToken) + { + if (string.IsNullOrEmpty(exportBufferRequest.DestinationStorageEndpoint)) + { + throw new ValidationException("Destination storage endpoint is required."); + } + + var args = new List + { + "export", + "--log-format", "json", + "--source-storage-endpoint", _serviceClient.Uri.ToString(), + "--destination-storage-endpoint", exportBufferRequest.DestinationStorageEndpoint.ToString(), + "--db-host", _databaseOptions.Host, + "--db-user", _databaseOptions.Username, + }; + + if (!string.IsNullOrEmpty(_databaseOptions.DatabaseName)) + { + args.Add("--db-name"); + args.Add(_databaseOptions.DatabaseName); + } + + if (_databaseOptions.Port.HasValue) + { + args.Add("--db-port"); + args.Add(_databaseOptions.Port.Value.ToString()); + } + + if (exportBufferRequest.Filters != null) + { + foreach (var filter in exportBufferRequest.Filters) + { + args.Add("--filter"); + args.Add(string.Concat(filter.Key, "=", filter.Value)); + } + } + + if (exportBufferRequest.HashIds) + { + args.Add("--hash-ids"); + } + + var newRun = new Run + { + Kind = RunKind.System, + Job = new JobRunCodeTarget + { + Codespec = new JobCodespec + { + Image = _bufferOptions.BufferCopierImage, + Identity = _databaseOptions.TygerServerIdentity, + Args = args, + }, + }, + + TimeoutSeconds = (int)TimeSpan.FromDays(7).TotalSeconds, + }; + + return await _runCreator.Value.CreateRun(newRun, cancellationToken); + } + + public async Task ImportBuffers(CancellationToken cancellationToken) + { + var args = new List + { + "import", + "--log-format", "json", + "--storage-endpoint", _serviceClient.Uri.ToString(), + "--db-host", _databaseOptions.Host, + "--db-user", _databaseOptions.Username, + }; + + if (!string.IsNullOrEmpty(_databaseOptions.DatabaseName)) + { + args.Add("--db-name"); + args.Add(_databaseOptions.DatabaseName); + } + + if (_databaseOptions.Port.HasValue) + { + args.Add("--db-port"); + args.Add(_databaseOptions.Port.Value.ToString()); + } + + var newRun = new Run + { + Kind = RunKind.System, + Job = new JobRunCodeTarget + { + Codespec = new JobCodespec + { + Image = _bufferOptions.BufferCopierImage, + Identity = _databaseOptions.TygerServerIdentity, + Args = args, + }, + }, + + TimeoutSeconds = (int)TimeSpan.FromDays(7).TotalSeconds, + }; + + return await _runCreator.Value.CreateRun(newRun, cancellationToken); + } + public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken) { await _serviceClient.GetBlobContainerClient("healthcheck").ExistsAsync(cancellationToken); diff --git a/server/ControlPlane/Buffers/BufferManager.cs b/server/ControlPlane/Buffers/BufferManager.cs index 431cf2f3..ff74594b 100644 --- a/server/ControlPlane/Buffers/BufferManager.cs +++ b/server/ControlPlane/Buffers/BufferManager.cs @@ -129,6 +129,16 @@ public string GetUnqualifiedBufferId(string id) return match.Groups["BUFFERID"].Value; } + public async Task ExportBuffers(ExportBuffersRequest exportBufferRequest, CancellationToken cancellationToken) + { + return await _bufferProvider.ExportBuffers(exportBufferRequest, cancellationToken); + } + + public async Task ImportBuffers(CancellationToken cancellationToken) + { + return await _bufferProvider.ImportBuffers(cancellationToken); + } + [GeneratedRegex(@"^(?(run-(?\d+)-)?temp-)?(?\w+)$")] private static partial Regex BufferIdRegex(); } diff --git a/server/ControlPlane/Buffers/Buffers.cs b/server/ControlPlane/Buffers/Buffers.cs index 89983639..55a45612 100644 --- a/server/ControlPlane/Buffers/Buffers.cs +++ b/server/ControlPlane/Buffers/Buffers.cs @@ -189,6 +189,22 @@ public static void MapBuffers(this WebApplication app) .WithName("getBufferAccessString") .Produces(StatusCodes.Status201Created) .Produces(StatusCodes.Status404NotFound); + + app.MapPost("/v1/buffers/export", async (BufferManager manager, ExportBuffersRequest exportRequest, CancellationToken cancellationToken) => + { + var run = await manager.ExportBuffers(exportRequest, cancellationToken); + return Results.Json(run, statusCode: StatusCodes.Status201Created); + }) + .WithName("exportBuffers") + .Produces(StatusCodes.Status202Accepted); + + app.MapPost("/v1/buffers/import", async (BufferManager manager, ImportBuffersRequest exportRequest, CancellationToken cancellationToken) => + { + var run = await manager.ImportBuffers(cancellationToken); + return Results.Json(run, statusCode: StatusCodes.Status201Created); + }) + .WithName("importBuffers") + .Produces(StatusCodes.Status202Accepted); } } @@ -197,6 +213,8 @@ public class BufferOptions [Required] public string BufferSidecarImage { get; set; } = null!; + public string BufferCopierImage { get; set; } = null!; + public string PrimarySigningPrivateKeyPath { get; init; } = null!; public string SecondarySigningPrivateKeyPath { get; init; } = null!; } diff --git a/server/ControlPlane/Buffers/IBufferProvider.cs b/server/ControlPlane/Buffers/IBufferProvider.cs index 340ce55e..4844c1a4 100644 --- a/server/ControlPlane/Buffers/IBufferProvider.cs +++ b/server/ControlPlane/Buffers/IBufferProvider.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using Tyger.ControlPlane.Model; + namespace Tyger.ControlPlane.Buffers; public interface IBufferProvider @@ -9,6 +11,9 @@ public interface IBufferProvider Task BufferExists(string id, CancellationToken cancellationToken); Uri CreateBufferAccessUrl(string id, bool writeable, bool preferTcp); + + Task ExportBuffers(ExportBuffersRequest exportBufferRequest, CancellationToken cancellationToken); + Task ImportBuffers(CancellationToken cancellationToken); } public interface IEphemeralBufferProvider diff --git a/server/ControlPlane/Buffers/LocalStorageBufferProvider.cs b/server/ControlPlane/Buffers/LocalStorageBufferProvider.cs index 00e003fe..78da5af2 100644 --- a/server/ControlPlane/Buffers/LocalStorageBufferProvider.cs +++ b/server/ControlPlane/Buffers/LocalStorageBufferProvider.cs @@ -1,10 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.ComponentModel.DataAnnotations; using System.Net.Sockets; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Options; using Tyger.Common.Buffers; +using Tyger.ControlPlane.Model; namespace Tyger.ControlPlane.Buffers; @@ -39,7 +41,7 @@ public LocalStorageBufferProvider(IOptions storageOpt { ConnectCallback = async (sockHttpConnContext, ctxToken) => { - var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified); + var socket = new System.Net.Sockets.Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified); var endpoint = new UnixDomainSocketEndPoint(socketPath); await socket.ConnectAsync(endpoint, ctxToken); @@ -134,5 +136,15 @@ public Uri CreateBufferAccessUrl(string id, bool writeable, bool preferTcp) return new Uri(preferTcp ? _baseTcpUrl : _baseUrl, $"v1/containers/{id}{queryString}"); } + public Task ExportBuffers(ExportBuffersRequest exportBufferRequest, CancellationToken cancellationToken) + { + throw new ValidationException("Exporting buffers is not supported with local storage."); + } + + public Task ImportBuffers(CancellationToken cancellationToken) + { + throw new ValidationException("Importing buffers is not supported with local storage."); + } + public void Dispose() => _dataPlaneClient.Dispose(); } diff --git a/server/ControlPlane/Compute/Kubernetes/Kubernetes.cs b/server/ControlPlane/Compute/Kubernetes/Kubernetes.cs index 97aec808..c8a68079 100644 --- a/server/ControlPlane/Compute/Kubernetes/Kubernetes.cs +++ b/server/ControlPlane/Compute/Kubernetes/Kubernetes.cs @@ -37,6 +37,7 @@ public static void AddKubernetes(this IHostApplicationBuilder builder) builder.Services.AddOptions().BindConfiguration("compute:kubernetes").ValidateDataAnnotations().ValidateOnStart(); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => sp.GetRequiredService()); + builder.Services.AddSingleton(sp => new Lazy(() => sp.GetRequiredService())); builder.Services.AddSingleton(sp => sp.GetRequiredService()); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => sp.GetRequiredService()); diff --git a/server/ControlPlane/Compute/Kubernetes/KubernetesRunCreator.cs b/server/ControlPlane/Compute/Kubernetes/KubernetesRunCreator.cs index 544dfb78..085a532f 100644 --- a/server/ControlPlane/Compute/Kubernetes/KubernetesRunCreator.cs +++ b/server/ControlPlane/Compute/Kubernetes/KubernetesRunCreator.cs @@ -63,7 +63,7 @@ public async Task CreateRun(Run newRun, CancellationToken cancellationToken } }; - var jobPodTemplateSpec = CreatePodTemplateSpec(jobCodespec, newRun.Job, targetCluster, "Never"); + var jobPodTemplateSpec = CreatePodTemplateSpec(jobCodespec, newRun.Job, targetCluster, newRun, "Never"); V1PodTemplateSpec? workerPodTemplateSpec = null; WorkerCodespec? workerCodespec = null; @@ -82,7 +82,7 @@ public async Task CreateRun(Run newRun, CancellationToken cancellationToken Codespec = workerCodespec.ToCodespecRef() } }; - workerPodTemplateSpec = CreatePodTemplateSpec(workerCodespec, newRun.Worker, targetCluster, "Always"); + workerPodTemplateSpec = CreatePodTemplateSpec(workerCodespec, newRun.Worker, targetCluster, newRun, "Always"); } if (newRun.Job.Buffers == null) @@ -418,13 +418,18 @@ private ClusterOptions GetTargetCluster(Run newRun) return targetCluster; } - private V1PodTemplateSpec CreatePodTemplateSpec(Codespec codespec, RunCodeTarget codeTarget, ClusterOptions? targetCluster, string restartPolicy) + private V1PodTemplateSpec CreatePodTemplateSpec(Codespec codespec, RunCodeTarget codeTarget, ClusterOptions? targetCluster, Run run, string restartPolicy) { string? GetServiceAccount() { var identities = _k8sOptions.CustomIdentities; if (!string.IsNullOrEmpty(codespec.Identity)) { + if (run.Kind == RunKind.System) + { + return codespec.Identity; + } + if (identities?.TryGetValue(codespec.Identity, out var serviceAccount) == true) { return serviceAccount; @@ -462,7 +467,7 @@ private V1PodTemplateSpec CreatePodTemplateSpec(Codespec codespec, RunCodeTarget Image = codespec.Image, Command = codespec.Command?.ToArray(), Args = codespec.Args?.ToArray(), - Env = codespec.Env?.Select(p => new V1EnvVar(p.Key, p.Value)).ToList() + Env = [new V1EnvVar("TYGER_RUN_ID", valueFrom: new V1EnvVarSource(fieldRef: new V1ObjectFieldSelector($"metadata.labels['{RunLabel}']")))], } ], RestartPolicy = restartPolicy, @@ -470,6 +475,14 @@ private V1PodTemplateSpec CreatePodTemplateSpec(Codespec codespec, RunCodeTarget } }; + if (codespec.Env != null) + { + foreach (var (key, value) in codespec.Env) + { + podTemplateSpec.Spec.Containers[0].Env.Add(new(key, value)); + } + } + AddComputeResources(podTemplateSpec, codespec, codeTarget, targetCluster); return podTemplateSpec; diff --git a/server/ControlPlane/Database/Database.cs b/server/ControlPlane/Database/Database.cs index f6a1b9e1..6fd2fe89 100644 --- a/server/ControlPlane/Database/Database.cs +++ b/server/ControlPlane/Database/Database.cs @@ -48,7 +48,16 @@ public static void AddDatabase(this IHostApplicationBuilder builder) builder.Services.AddSingleton(sp => { var databaseOptions = sp.GetRequiredService>().Value; - var dataSourceBuilder = new NpgsqlDataSourceBuilder(databaseOptions.ConnectionString); + + var dataSourceBuilder = new NpgsqlDataSourceBuilder(); + dataSourceBuilder.ConnectionStringBuilder.Host = databaseOptions.Host; + dataSourceBuilder.ConnectionStringBuilder.Database = databaseOptions.DatabaseName; + if (databaseOptions.Port.HasValue) + { + dataSourceBuilder.ConnectionStringBuilder.Port = databaseOptions.Port.Value; + } + + dataSourceBuilder.ConnectionStringBuilder.Username = databaseOptions.Username; if (string.IsNullOrEmpty(databaseOptions.PasswordFile)) { @@ -58,6 +67,7 @@ public static void AddDatabase(this IHostApplicationBuilder builder) !Path.Exists(dataSourceBuilder.ConnectionStringBuilder.Host)) { // assume we are connecting to a cloud database + dataSourceBuilder.ConnectionStringBuilder.SslMode = SslMode.VerifyFull; var tokenCredential = sp.GetRequiredService(); var logger = sp.GetRequiredService().CreateLogger(typeof(Database).FullName!); dataSourceBuilder.UsePeriodicPasswordProvider( @@ -211,13 +221,22 @@ public static void MapDatabaseVersionInUse(this WebApplication app) public class DatabaseOptions { [Required] - public string ConnectionString { get; set; } = null!; + public required string Host { get; set; } + + public string? DatabaseName { get; set; } + + public int? Port { get; set; } + + [Required] + public required string Username { get; set; } public string? PasswordFile { get; set; } [Required] public required string TygerServerRoleName { get; set; } + public required string TygerServerIdentity { get; set; } + public bool AutoMigrate { get; set; } } diff --git a/server/ControlPlane/Model/Model.cs b/server/ControlPlane/Model/Model.cs index 325f1835..2ec5deee 100644 --- a/server/ControlPlane/Model/Model.cs +++ b/server/ControlPlane/Model/Model.cs @@ -401,8 +401,17 @@ public enum RunStatus Canceled, } +public enum RunKind +{ + User = 0, + System, +} + public record Run : ModelBase { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public RunKind Kind { get; init; } = RunKind.User; + /// /// The run ID. Populated by the system. /// @@ -486,3 +495,12 @@ public record BufferPage(IReadOnlyList Items, Uri? NextLink); public record Cluster(string Name, string Location, IReadOnlyList NodePools); public record NodePool(string Name, string VmSize); + +public record ExportBuffersRequest(string DestinationStorageEndpoint, Dictionary? Filters, [property: OpenApiExclude] bool HashIds) : ModelBase; + +public record ImportBuffersRequest() : ModelBase; + +[AttributeUsage(AttributeTargets.Property)] +public class OpenApiExcludeAttribute : Attribute +{ +} diff --git a/server/ControlPlane/OpenApi/OpenApi.cs b/server/ControlPlane/OpenApi/OpenApi.cs index c410879b..5c18875d 100644 --- a/server/ControlPlane/OpenApi/OpenApi.cs +++ b/server/ControlPlane/OpenApi/OpenApi.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Reflection; using k8s.Models; using Microsoft.OpenApi.Any; using Microsoft.OpenApi.Models; @@ -30,6 +31,8 @@ public static void AddOpenApi(this IHostApplicationBuilder builder) _ => null }); + c.MapType(() => new OpenApiSchema { Enum = [new OpenApiString("user"), new OpenApiString("system")] }); + c.MapType(() => new OpenApiSchema { Type = "string" }); c.MapType(() => new OpenApiSchema { Type = "string" }); c.MapType(() => new OpenApiSchema @@ -44,23 +47,24 @@ public static void AddOpenApi(this IHostApplicationBuilder builder) { if (type == typeof(ICodespecRef)) { - return new[] { typeof(CommittedCodespecRef), typeof(Codespec) }; + return [typeof(CommittedCodespecRef), typeof(Codespec)]; } if (type == typeof(Codespec)) { - return new[] { typeof(JobCodespec), typeof(WorkerCodespec) }; + return [typeof(JobCodespec), typeof(WorkerCodespec)]; } if (type == typeof(RunCodeTarget)) { - return new[] { typeof(JobRunCodeTarget) }; + return [typeof(JobRunCodeTarget)]; } return Array.Empty(); }); c.SchemaFilter(); + c.SchemaFilter(); var filePath = Path.Combine(AppContext.BaseDirectory, "tyger-server.xml"); c.IncludeXmlComments(filePath); @@ -87,3 +91,18 @@ public void Apply(OpenApiSchema schema, SchemaFilterContext context) } } } + +internal sealed class OpenApiExcludeSchemaFilter : ISchemaFilter +{ + public void Apply(OpenApiSchema schema, SchemaFilterContext context) + { + var excludedProperties = context.Type.GetProperties().Where(p => p.GetCustomAttribute() != null); + + foreach (var excludedProperty in excludedProperties) + { + // casing does not match + var keyToRemove = schema.Properties.Keys.Single(k => k.Equals(excludedProperty.Name, StringComparison.OrdinalIgnoreCase)); + schema.Properties.Remove(keyToRemove); + } + } +} diff --git a/server/ControlPlane/Runs/Runs.cs b/server/ControlPlane/Runs/Runs.cs index 956a1754..71d1c5c5 100644 --- a/server/ControlPlane/Runs/Runs.cs +++ b/server/ControlPlane/Runs/Runs.cs @@ -25,6 +25,11 @@ public static void MapRuns(this WebApplication app) app.MapPost("/v1/runs", async (IRunCreator runCreator, HttpContext context) => { var run = await context.Request.ReadAndValidateJson(context.RequestAborted); + if (run.Kind == RunKind.System) + { + throw new ValidationException("System runs cannot be created directly"); + } + Run createdRun = await runCreator.CreateRun(run, context.RequestAborted); return Results.Created($"/v1/runs/{createdRun.Id}", createdRun); }) @@ -155,9 +160,9 @@ await logSource.GetLogs(parsedRunId, options, context.RequestAborted) is not Pip return Responses.NotFound(); } - return Results.Ok(run); + return Results.Accepted(value: run); }) - .Produces(StatusCodes.Status200OK) + .Produces(StatusCodes.Status202Accepted) .Produces(StatusCodes.Status404NotFound); // this endpoint is for testing purposes only, to force the background pod sweep