Skip to content

Commit

Permalink
feat(desktop): synchronized file share integration (#11614)
Browse files Browse the repository at this point in the history
  • Loading branch information
milas authored Mar 20, 2024
1 parent 1b5fa3b commit db4ed89
Show file tree
Hide file tree
Showing 17 changed files with 858 additions and 131 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0-rc6
github.com/otiai10/copy v1.14.0
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -169,6 +170,7 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/api v0.26.7 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
Expand Down Expand Up @@ -562,6 +564,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down Expand Up @@ -672,6 +675,8 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/cenkalti/backoff.v2 v2.2.1 h1:eJ9UAg01/HIHG987TwxvnzK2MgxXq97YY6rYDpY9aII=
gopkg.in/cenkalti/backoff.v2 v2.2.1/go.mod h1:S0QdOvT2AlerfSBkp0O+dk+bbIMaNbEmVk876gPCthU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
173 changes: 173 additions & 0 deletions internal/desktop/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package desktop

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"

"github.com/docker/compose/v2/internal/memnet"
"github.com/r3labs/sse"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

Expand Down Expand Up @@ -119,6 +123,175 @@ func (c *Client) FeatureFlags(ctx context.Context) (FeatureFlagResponse, error)
return ret, nil
}

type CreateFileShareRequest struct {
HostPath string `json:"hostPath"`
Labels map[string]string `json:"labels,omitempty"`
}

type CreateFileShareResponse struct {
FileShareID string `json:"fileShareID"`
}

func (c *Client) CreateFileShare(ctx context.Context, r CreateFileShareRequest) (*CreateFileShareResponse, error) {
rawBody, _ := json.Marshal(r)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, backendURL("/mutagen/file-shares"), bytes.NewReader(rawBody))
req.Header.Set("Content-Type", "application/json")
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
errBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(errBody))
}
var ret CreateFileShareResponse
if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
return nil, err
}
return &ret, nil
}

type FileShareReceiverState struct {
TotalReceivedSize uint64 `json:"totalReceivedSize"`
}

type FileShareEndpoint struct {
Path string `json:"path"`
TotalFileSize uint64 `json:"totalFileSize,omitempty"`
StagingProgress *FileShareReceiverState `json:"stagingProgress"`
}

type FileShareSession struct {
SessionID string `json:"identifier"`
Alpha FileShareEndpoint `json:"alpha"`
Beta FileShareEndpoint `json:"beta"`
Labels map[string]string `json:"labels"`
Status string `json:"status"`
}

func (c *Client) ListFileShares(ctx context.Context) ([]FileShareSession, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares"), http.NoBody)
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return nil, newHTTPStatusCodeError(resp)
}

var ret []FileShareSession
if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
return nil, err
}
return ret, nil
}

func (c *Client) DeleteFileShare(ctx context.Context, id string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, backendURL("/mutagen/file-shares/"+id), http.NoBody)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return newHTTPStatusCodeError(resp)
}
return nil
}

type EventMessage[T any] struct {
Value T
Error error
}

func newHTTPStatusCodeError(resp *http.Response) error {
r := io.LimitReader(resp.Body, 2048)
body, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("http status code %d", resp.StatusCode)
}
return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
}

func (c *Client) StreamFileShares(ctx context.Context) (<-chan EventMessage[[]FileShareSession], error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares/stream"), http.NoBody)
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() {
_ = resp.Body.Close()
}()
return nil, newHTTPStatusCodeError(resp)
}

events := make(chan EventMessage[[]FileShareSession])
go func(ctx context.Context) {
defer func() {
_ = resp.Body.Close()
for range events {
// drain the channel
}
close(events)
}()
if err := readEvents(ctx, resp.Body, events); err != nil {
select {
case <-ctx.Done():
case events <- EventMessage[[]FileShareSession]{Error: err}:
}
}
}(ctx)
return events, nil
}

func readEvents[T any](ctx context.Context, r io.Reader, events chan<- EventMessage[T]) error {
eventReader := sse.NewEventStreamReader(r)
for {
msg, err := eventReader.ReadEvent()
if errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return fmt.Errorf("reading events: %w", err)
}
msg = bytes.TrimPrefix(msg, []byte("data: "))

var event T
if err := json.Unmarshal(msg, &event); err != nil {
return err
}
select {
case <-ctx.Done():
return context.Cause(ctx)
case events <- EventMessage[T]{Value: event}:
// event was sent to channel, read next
}
}
}

// backendURL generates a URL for the given API path.
//
// NOTE: Custom transport handles communication. The host is to create a valid
Expand Down
52 changes: 52 additions & 0 deletions internal/desktop/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2024 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package desktop

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestClientPing(t *testing.T) {
if testing.Short() {
t.Skip("Skipped in short mode - test connects to Docker Desktop")
}
desktopEndpoint := os.Getenv("COMPOSE_TEST_DESKTOP_ENDPOINT")
if desktopEndpoint == "" {
t.Skip("Skipping - COMPOSE_TEST_DESKTOP_ENDPOINT not defined")
}

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

client := NewClient(desktopEndpoint)
t.Cleanup(func() {
_ = client.Close()
})

now := time.Now()

ret, err := client.Ping(ctx)
require.NoError(t, err)

serverTime := time.Unix(0, ret.ServerTime)
require.True(t, now.Before(serverTime))
}
Loading

0 comments on commit db4ed89

Please sign in to comment.