From d0898bf9d8ce0ffbd35534c56d54163b2e772e60 Mon Sep 17 00:00:00 2001 From: Simon Baier Date: Tue, 16 Nov 2021 12:14:30 +0000 Subject: [PATCH] [Enhancement]: improve image loading performance (#826) - use `k3d-tools` (classic) image importing for remote docker hosts - use direct streaming into the nodes (node exec with stdin) on local docker connections --- cmd/image/imageImport.go | 32 +++-- docs/usage/.pages | 1 + docs/usage/commands/k3d_image_import.md | 1 + docs/usage/importing_images.md | 18 +++ pkg/client/tools.go | 151 +++++++++++++++++++++++- pkg/runtimes/docker/node.go | 49 +++++++- pkg/runtimes/runtime.go | 2 + pkg/types/types.go | 17 +++ 8 files changed, 252 insertions(+), 19 deletions(-) create mode 100644 docs/usage/importing_images.md diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index e3b847f5..694bae24 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -59,16 +59,28 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { images, clusters := parseLoadImageCmd(cmd, args) + + loadModeStr, err := cmd.Flags().GetString("mode") + if err != nil { + l.Log().Errorln("No load-mode specified") + l.Log().Fatalln(err) + } + if mode, ok := k3d.ImportModes[loadModeStr]; !ok { + l.Log().Fatalf("Unknown image loading mode '%s'\n", loadModeStr) + } else { + loadImageOpts.Mode = mode + } + l.Log().Debugf("Importing image(s) [%+v] from runtime [%s] into cluster(s) [%+v]...", images, runtimes.SelectedRuntime, clusters) - errOccured := false + errOccurred := false for _, cluster := range clusters { l.Log().Infof("Importing image(s) into cluster '%s'", cluster.Name) - if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, &cluster, loadImageOpts); err != nil { + if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, cluster, loadImageOpts); err != nil { l.Log().Errorf("Failed to import image(s) into cluster '%s': %+v", cluster.Name, err) - errOccured = true + errOccurred = true } } - if errOccured { + if errOccurred { l.Log().Warnln("At least one error occured while trying to import the image(s) into the selected cluster(s)") os.Exit(1) } @@ -86,7 +98,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume") cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import") - + cmd.Flags().StringP("mode", "m", string(k3d.ImportModeAutoDetect), "Which method to use to import images into the cluster [auto, direct, tools]. See https://k3d.io/usage/guides/importing_images/") /* Subcommands */ // done @@ -94,16 +106,20 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of } // parseLoadImageCmd parses the command input into variables required to create a cluster -func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []k3d.Cluster) { +func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Cluster) { // --cluster clusterNames, err := cmd.Flags().GetStringArray("cluster") if err != nil { l.Log().Fatalln(err) } - clusters := []k3d.Cluster{} + clusters := []*k3d.Cluster{} for _, clusterName := range clusterNames { - clusters = append(clusters, k3d.Cluster{Name: clusterName}) + cluster, err := client.ClusterGet(cmd.Context(), runtimes.SelectedRuntime, &k3d.Cluster{Name: clusterName}) + if err != nil { + l.Log().Fatalf("failed to get cluster %s: %v", clusterName, err) + } + clusters = append(clusters, cluster) } // images diff --git a/docs/usage/.pages b/docs/usage/.pages index 3ed625f3..c95855c0 100644 --- a/docs/usage/.pages +++ b/docs/usage/.pages @@ -5,6 +5,7 @@ nav: - multiserver.md - registries.md - exposing_services.md + - importing_images.md - k3s.md - advanced - commands diff --git a/docs/usage/commands/k3d_image_import.md b/docs/usage/commands/k3d_image_import.md index c11dada6..04c147e9 100644 --- a/docs/usage/commands/k3d_image_import.md +++ b/docs/usage/commands/k3d_image_import.md @@ -29,6 +29,7 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags] -h, --help help for import -k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume -t, --keep-tools Do not delete the tools node after import + -m, --mode string Which method to use to import images into the cluster [auto, direct, tools]. (default "auto") ``` ### Options inherited from parent commands diff --git a/docs/usage/importing_images.md b/docs/usage/importing_images.md new file mode 100644 index 00000000..06ab1918 --- /dev/null +++ b/docs/usage/importing_images.md @@ -0,0 +1,18 @@ +# Importing modes + +## Auto + +Auto-determine whether to use `direct` or `tools-node`. + +For remote container runtimes, `tools-node` is faster due to less network overhead, thus it is automatically selected for remote runtimes. + +Otherwise direct is used. + +## Direct + +Directly load the given images to the k3s nodes. No separate container is spawned, no intermediate files are written. + +## Tools Node + +Start a `k3d-tools` container in the container runtime, copy images to that runtime, then load the images to k3s nodes from there. + diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 8e1dd503..bb3348c7 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -25,6 +25,8 @@ package client import ( "context" "fmt" + "golang.org/x/sync/errgroup" + "io" "os" "path" "strings" @@ -39,6 +41,13 @@ import ( // ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export // images from the runtime to import them into the nodes of the selected cluster func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, images []string, cluster *k3d.Cluster, opts k3d.ImageImportOpts) error { + + // stdin case + if len(images) == 1 && images[0] == "-" { + err := loadImageFromStream(ctx, runtime, os.Stdin, cluster) + return fmt.Errorf("failed to load image to cluster from stdin: %v", err) + } + imagesFromRuntime, imagesFromTar, err := findImages(ctx, runtime, images) if err != nil { return fmt.Errorf("failed to find images: %w", err) @@ -46,13 +55,25 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, // no images found to load -> exit early if len(imagesFromRuntime)+len(imagesFromTar) == 0 { - return fmt.Errorf("No valid images specified") + return fmt.Errorf("no valid images specified") } - // create tools node to export images - toolsNode, err := EnsureToolsNode(ctx, runtime, cluster) - if err != nil { - return fmt.Errorf("failed to ensure that tools node is running: %w", err) + loadWithToolsNode := false + + switch opts.Mode { + case k3d.ImportModeAutoDetect: + if err != nil { + return fmt.Errorf("failed to retrieve container runtime information: %w", err) + } + runtimeHost := runtime.GetHost() + if runtimeHost != "" && runtimeHost != "localhost" && runtimeHost != "127.0.0.1" { + l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images") + loadWithToolsNode = true + } + case k3d.ImportModeToolsNode: + loadWithToolsNode = true + case k3d.ImportModeDirect: + loadWithToolsNode = false } /* TODO: @@ -63,6 +84,26 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, * 3. From stdin: save to tar -> import * Note: temporary storage location is always the shared image volume and actions are always executed by the tools node */ + if loadWithToolsNode { + err = importWithToolsNode(ctx, runtime, cluster, imagesFromRuntime, imagesFromTar, opts) + } else { + err = importWithStream(ctx, runtime, cluster, imagesFromRuntime, imagesFromTar) + } + if err != nil { + return err + } + + l.Log().Infoln("Successfully imported image(s)") + return nil +} + +func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string, opts k3d.ImageImportOpts) error { + // create tools node to export images + toolsNode, err := EnsureToolsNode(ctx, runtime, cluster) + if err != nil { + return fmt.Errorf("failed to ensure that tools node is running: %w", err) + } + var importTarNames []string if len(imagesFromRuntime) > 0 { @@ -123,11 +164,109 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, l.Log().Errorf("failed to delete tools node '%s' (try to delete it manually): %v", toolsNode.Name, err) } } + return nil +} - l.Log().Infoln("Successfully imported image(s)") +func importWithStream(ctx context.Context, runtime runtimes.Runtime, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string) error { + if len(imagesFromRuntime) > 0 { + l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) + // open a stream to all given images + stream, err := runtime.GetImageStream(ctx, imagesFromRuntime) + if err != nil { + return fmt.Errorf("could not open image stream for given images %s: %w", imagesFromRuntime, err) + } + err = loadImageFromStream(ctx, runtime, stream, cluster) + if err != nil { + return fmt.Errorf("could not load image to cluster from stream %s: %w", imagesFromRuntime, err) + } + // load the images directly into the nodes + } + + if len(imagesFromTar) > 0 { + // copy tarfiles to shared volume + l.Log().Infof("Importing images from %d tarball(s)...", len(imagesFromTar)) + + files := make([]*os.File, len(imagesFromTar)) + readers := make([]io.Reader, len(imagesFromTar)) + failedFiles := 0 + for i, fileName := range imagesFromTar { + file, err := os.Open(fileName) + if err != nil { + l.Log().Errorf("failed to read file '%s', skipping. Error below:\n%+v", fileName, err) + failedFiles++ + continue + } + files[i] = file + readers[i] = file + } + multiReader := io.MultiReader(readers...) + err := loadImageFromStream(ctx, runtime, io.NopCloser(multiReader), cluster) + if err != nil { + return fmt.Errorf("could not load image to cluster from stream %s: %w", imagesFromTar, err) + } + for _, file := range files { + err := file.Close() + if err != nil { + l.Log().Errorf("Failed to close file '%s' after reading. Error below:\n%+v", file.Name(), err) + } + } + } return nil +} +func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) error { + var errorGroup errgroup.Group + + numNodes := 0 + for _, node := range cluster.Nodes { + // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) + if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { + numNodes++ + } + } + // multiplex the stream so we can write to multiple nodes + pipeReaders := make([]*io.PipeReader, numNodes) + pipeWriters := make([]io.Writer, numNodes) + for i := 0; i < numNodes; i++ { + reader, writer := io.Pipe() + pipeReaders[i] = reader + pipeWriters[i] = writer + } + + errorGroup.Go(func() error { + _, err := io.Copy(io.MultiWriter(pipeWriters...), stream) + if err != nil { + return fmt.Errorf("failed to copy read stream. %v", err) + } + err = stream.Close() + if err != nil { + return fmt.Errorf("failed to close stream. %v", err) + } + return nil + }) + + pipeId := 0 + for _, n := range cluster.Nodes { + node := n + // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) + if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { + pipeReader := pipeReaders[pipeId] + errorGroup.Go(func() error { + l.Log().Infof("Importing images into node '%s'...", node.Name) + if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, pipeReader); err != nil { + return fmt.Errorf("failed to import images in node '%s': %v", node.Name, err) + } + return nil + }) + pipeId++ + } + } + err := errorGroup.Wait() + if err != nil { + return fmt.Errorf("error loading image to cluster, first error: %v", err) + } + return nil } type runtimeImageGetter interface { diff --git a/pkg/runtimes/docker/node.go b/pkg/runtimes/docker/node.go index f5d5539f..820ddc58 100644 --- a/pkg/runtimes/docker/node.go +++ b/pkg/runtimes/docker/node.go @@ -309,7 +309,7 @@ func (d Docker) GetNodeLogs(ctx context.Context, node *k3d.Node, since time.Time // ExecInNodeGetLogs executes a command inside a node and returns the logs to the caller, e.g. to parse them func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []string) (*bufio.Reader, error) { - resp, err := executeInNode(ctx, node, cmd) + resp, err := executeInNode(ctx, node, cmd, nil) if resp != nil { defer resp.Close() } @@ -322,9 +322,23 @@ func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []str return resp.Reader, nil } +// GetImageStream creates a tar stream for the given images, to be read (and closed) by the caller +func (d Docker) GetImageStream(ctx context.Context, image []string) (io.ReadCloser, error) { + docker, err := GetDockerClient() + if err != nil { + return nil, err + } + reader, err := docker.ImageSave(ctx, image) + return reader, err +} + // ExecInNode execs a command inside a node func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) error { - execConnection, err := executeInNode(ctx, node, cmd) + return execInNode(ctx, node, cmd, nil) +} + +func execInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error { + execConnection, err := executeInNode(ctx, node, cmd, stdin) if execConnection != nil { defer execConnection.Close() } @@ -340,7 +354,11 @@ func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) er return err } -func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.HijackedResponse, error) { +func (d Docker) ExecInNodeWithStdin(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error { + return execInNode(ctx, node, cmd, stdin) +} + +func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) (*types.HijackedResponse, error) { l.Log().Debugf("Executing command '%+v' in node '%s'", cmd, node.Name) @@ -357,12 +375,19 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi } defer docker.Close() + attachStdin := false + if stdin != nil { + attachStdin = true + } + // exec exec, err := docker.ContainerExecCreate(ctx, container.ID, types.ExecConfig{ - Privileged: true, - Tty: true, + Privileged: true, + // Don't use tty true when piping stdin. + Tty: !attachStdin, AttachStderr: true, AttachStdout: true, + AttachStdin: attachStdin, Cmd: cmd, }) if err != nil { @@ -380,6 +405,20 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi return nil, fmt.Errorf("docker failed to start exec process in node '%s': %w", node.Name, err) } + // If we need to write to stdin pipe, start a new goroutine that writes the stream to stdin + if stdin != nil { + go func() { + _, err := io.Copy(execConnection.Conn, stdin) + if err != nil { + l.Log().Errorf("Failed to copy read stream. %v", err) + } + err = stdin.Close() + if err != nil { + l.Log().Errorf("Failed to close stdin stream. %v", err) + } + }() + } + for { // get info about exec process inside container execInfo, err := docker.ContainerExecInspect(ctx, exec.ID) diff --git a/pkg/runtimes/runtime.go b/pkg/runtimes/runtime.go index 3e925e6a..0234be4c 100644 --- a/pkg/runtimes/runtime.go +++ b/pkg/runtimes/runtime.go @@ -65,8 +65,10 @@ type Runtime interface { CreateVolume(context.Context, string, map[string]string) error DeleteVolume(context.Context, string) error GetVolume(string) (string, error) + GetImageStream(context.Context, []string) (io.ReadCloser, error) GetRuntimePath() string // returns e.g. '/var/run/docker.sock' for a default docker setup ExecInNode(context.Context, *k3d.Node, []string) error + ExecInNodeWithStdin(context.Context, *k3d.Node, []string, io.ReadCloser) error ExecInNodeGetLogs(context.Context, *k3d.Node, []string) (*bufio.Reader, error) GetNodeLogs(context.Context, *k3d.Node, time.Time, *runtimeTypes.NodeLogsOpts) (io.ReadCloser, error) GetImages(context.Context) ([]string, error) diff --git a/pkg/types/types.go b/pkg/types/types.go index b4239218..7c877c1c 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -180,10 +180,27 @@ type NodeHookAction interface { Info() string // returns a description of what this action does } +// LoadMode describes how images are loaded into the cluster +type ImportMode string + +const ( + ImportModeAutoDetect ImportMode = "auto" + ImportModeDirect ImportMode = "direct" + ImportModeToolsNode ImportMode = "tools-node" +) + +// ImportModes defines the loading methods for image loading +var ImportModes = map[string]ImportMode{ + string(ImportModeAutoDetect): ImportModeAutoDetect, + string(ImportModeDirect): ImportModeDirect, + string(ImportModeToolsNode): ImportModeToolsNode, +} + // ImageImportOpts describes a set of options one can set for loading image(s) into cluster(s) type ImageImportOpts struct { KeepTar bool KeepToolsNode bool + Mode ImportMode } type IPAM struct {