Skip to content

Commit

Permalink
feat: pre-pull images and sideload them into the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-airbyte committed Oct 31, 2024
1 parent 2cde3ab commit c7b251b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 22 deletions.
29 changes: 16 additions & 13 deletions internal/cmd/images/manifest_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"strings"

"github.com/airbytehq/abctl/internal/cmd/local/helm"
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
"github.com/airbytehq/abctl/internal/common"
"github.com/airbytehq/abctl/internal/trace"
helmlib "github.com/mittwald/go-helm-client"
"helm.sh/helm/v3/pkg/repo"

"github.com/airbytehq/abctl/internal/common"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -27,16 +26,11 @@ type ManifestCmd struct {
Values string `type:"existingfile" help:"An Airbyte helm chart values file to configure helm."`
}

func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
func (c *ManifestCmd) Run(ctx context.Context) error {
ctx, span := trace.NewSpan(ctx, "images manifest")
defer span.End()

client, err := helm.New(provider.Kubeconfig, provider.Context, common.AirbyteNamespace)
if err != nil {
return err
}

images, err := c.findAirbyteImages(ctx, client)
images, err := c.findAirbyteImages(ctx)
if err != nil {
return err
}
Expand All @@ -48,7 +42,7 @@ func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
return nil
}

func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) ([]string, error) {
func (c *ManifestCmd) findAirbyteImages(ctx context.Context) ([]string, error) {
valuesYaml, err := helm.BuildAirbyteValues(ctx, helm.ValuesOpts{
ValuesFile: c.Values,
})
Expand All @@ -57,11 +51,20 @@ func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client)
}

airbyteChartLoc := helm.LocateLatestAirbyteChart(c.ChartVersion, c.Chart)
return findImagesFromChart(client, valuesYaml, airbyteChartLoc, c.ChartVersion)
return FindImagesFromChart(valuesYaml, airbyteChartLoc, c.ChartVersion)
}

func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion string) ([]string, error) {
err := client.AddOrUpdateChartRepo(repo.Entry{
func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, error) {

// sharing a helm client with the install code causes some weird issues,
// and templating the chart doesn't need details about the k8s provider,
// we create a throwaway helm client here.
client, err := helmlib.New(helm.ClientOptions(common.AirbyteNamespace))
if err != nil {
return nil, err
}

err = client.AddOrUpdateChartRepo(repo.Entry{
Name: common.AirbyteRepoName,
URL: common.AirbyteRepoURL,
})
Expand Down
22 changes: 13 additions & 9 deletions internal/cmd/local/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ type Client interface {
TemplateChart(spec *helmclient.ChartSpec, options *helmclient.HelmTemplateOptions) ([]byte, error)
}

func ClientOptions(namespace string) *helmclient.Options {
logger := helmLogger{}
return &helmclient.Options{
Namespace: namespace,
Output: logger,
DebugLog: logger.Debug,
Debug: true,
RepositoryCache: paths.HelmRepoCache,
RepositoryConfig: paths.HelmRepoConfig,
}
}

// New returns the default helm client
func New(kubecfg, kubectx, namespace string) (Client, error) {
k8sCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
Expand All @@ -38,16 +50,8 @@ func New(kubecfg, kubectx, namespace string) (Client, error) {
return nil, fmt.Errorf("%w: unable to create rest config: %w", localerr.ErrKubernetes, err)
}

logger := helmLogger{}
helm, err := helmclient.NewClientFromRestConf(&helmclient.RestConfClientOptions{
Options: &helmclient.Options{
Namespace: namespace,
Output: logger,
DebugLog: logger.Debug,
Debug: true,
RepositoryCache: paths.HelmRepoCache,
RepositoryConfig: paths.HelmRepoConfig,
},
Options: ClientOptions(namespace),
RestConfig: restCfg,
})
if err != nil {
Expand Down
74 changes: 74 additions & 0 deletions internal/cmd/local/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"sync"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
Expand All @@ -13,7 +16,9 @@ import (
"github.com/pterm/pterm"
"gopkg.in/yaml.v3"
"sigs.k8s.io/kind/pkg/cluster"
"sigs.k8s.io/kind/pkg/cluster/nodeutils"
kindExec "sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/fs"
)

// ExtraVolumeMount defines a host volume mount for the Kind cluster
Expand All @@ -30,6 +35,7 @@ type Cluster interface {
Delete(ctx context.Context) error
// Exists returns true if the cluster exists, false otherwise.
Exists(ctx context.Context) bool
LoadImages(ctx context.Context, images []string)
}

// interface sanity check
Expand Down Expand Up @@ -110,6 +116,74 @@ func (k *kindCluster) Exists(ctx context.Context) bool {
return false
}

// LoadImages pulls images from Docker Hub, and loads them into the kind cluster.
// This is a best-effort optimization, which is why it doesn't an error;
// it's possible that only some images will be loaded.
// TODO this should probably take a context, and handle cancellation.
func (k *kindCluster) LoadImages(ctx context.Context, images []string) {
err := k.loadImages(ctx, images)
pterm.Debug.Printfln("failed to load images: %s", err)
}

func (k *kindCluster) loadImages(ctx context.Context, images []string) error {
// Get a list of Kind nodes.
nodes, err := k.p.ListNodes(k.clusterName)
if err != nil {
return fmt.Errorf("listing nodes: %w", err)
}

// Setup the tar path where the images will be saved.
dir, err := fs.TempDir("", "images-tar-")
if err != nil {
return err
}
defer os.RemoveAll(dir)

// Pull all the images via "docker pull", in parallel.
var wg sync.WaitGroup
wg.Add(len(images))
for _, img := range images {
pterm.Debug.Printfln("pulling image %s", img)

go func(img string) {
defer wg.Done()
out, err := exec.CommandContext(ctx, "docker", "pull", img).CombinedOutput()
if err != nil {
pterm.Debug.Printfln("error pulling image %s", out)
// don't return the error here, because other image pulls might succeed.
}
}(img)
}
wg.Wait()

// The context could be canceled by now. If so, return early.
if ctx.Err() != nil {
return ctx.Err()
}

// Save all the images to an archive, images.tar
imagesTarPath := filepath.Join(dir, "images.tar")
pterm.Debug.Printfln("saving image archive to %s", imagesTarPath)

out, err := exec.CommandContext(ctx, "docker", append([]string{"save", "-o", imagesTarPath}, images...)...).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to run 'docker save': %s", out)
}

// Load the image archive into the Kind nodes.
f, err := os.Open(imagesTarPath)
if err != nil {
return err
}
defer f.Close()

for _, n := range nodes {
pterm.Debug.Printfln("loading image archive into kind node %s", n)
nodeutils.LoadImageArchive(n, f)
}
return nil
}

func formatKindErr(err error) error {
var kindErr *kindExec.RunError
if errors.As(err, &kindErr) {
Expand Down
13 changes: 13 additions & 0 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/airbytehq/abctl/internal/cmd/images"
"github.com/airbytehq/abctl/internal/cmd/local/docker"
"github.com/airbytehq/abctl/internal/cmd/local/helm"
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
Expand Down Expand Up @@ -151,6 +152,18 @@ func (c *Command) persistentVolumeClaim(ctx context.Context, namespace, name, vo
return nil
}

// PrepImages determines the docker images needed by the chart, pulls them, and loads them into the cluster.
// This is best effort, so errors are dropped here.
func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts) {
manifest, err := images.FindImagesFromChart(opts.HelmValuesYaml, opts.AirbyteChartLoc, opts.HelmChartVersion)
if err != nil {
pterm.Debug.Printfln("error building image manifest: %s", err)
return
}

cluster.LoadImages(ctx, manifest)
}

// Install handles the installation of Airbyte
func (c *Command) Install(ctx context.Context, opts *InstallOpts) error {
ctx, span := trace.NewSpan(ctx, "command.Install")
Expand Down
2 changes: 2 additions & 0 deletions internal/cmd/local/local_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t
return fmt.Errorf("unable to initialize local command: %w", err)
}

lc.PrepImages(ctx, cluster, opts)

if err := lc.Install(ctx, opts); err != nil {
spinner.Fail("Unable to install Airbyte locally")
return err
Expand Down

0 comments on commit c7b251b

Please sign in to comment.