@ -25,6 +25,8 @@ package client
import (
import (
"context"
"context"
"fmt"
"fmt"
"golang.org/x/sync/errgroup"
"io"
"os"
"os"
"path"
"path"
"strings"
"strings"
@ -39,6 +41,13 @@ import (
// ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export
// ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export
// images from the runtime to import them into the nodes of the selected cluster
// images from the runtime to import them into the nodes of the selected cluster
func ImageImportIntoClusterMulti ( ctx context . Context , runtime runtimes . Runtime , images [ ] string , cluster * k3d . Cluster , opts k3d . ImageImportOpts ) error {
func ImageImportIntoClusterMulti ( ctx context . Context , runtime runtimes . Runtime , images [ ] string , cluster * k3d . Cluster , opts k3d . ImageImportOpts ) error {
// stdin case
if len ( images ) == 1 && images [ 0 ] == "-" {
err := loadImageFromStream ( ctx , runtime , os . Stdin , cluster )
return fmt . Errorf ( "failed to load image to cluster from stdin: %v" , err )
}
imagesFromRuntime , imagesFromTar , err := findImages ( ctx , runtime , images )
imagesFromRuntime , imagesFromTar , err := findImages ( ctx , runtime , images )
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to find images: %w" , err )
return fmt . Errorf ( "failed to find images: %w" , err )
@ -46,13 +55,25 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
// no images found to load -> exit early
// no images found to load -> exit early
if len ( imagesFromRuntime ) + len ( imagesFromTar ) == 0 {
if len ( imagesFromRuntime ) + len ( imagesFromTar ) == 0 {
return fmt . Errorf ( "N o valid images specified" )
return fmt . Errorf ( "n o valid images specified" )
}
}
// create tools node to export images
loadWithToolsNode := false
toolsNode , err := EnsureToolsNode ( ctx , runtime , cluster )
switch opts . Mode {
case k3d . ImportModeAutoDetect :
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to ensure that tools node is running: %w" , err )
return fmt . Errorf ( "failed to retrieve container runtime information: %w" , err )
}
runtimeHost := runtime . GetHost ( )
if runtimeHost != "" && runtimeHost != "localhost" && runtimeHost != "127.0.0.1" {
l . Log ( ) . Infof ( "Auto-detected a remote docker daemon, using tools node for loading images" )
loadWithToolsNode = true
}
case k3d . ImportModeToolsNode :
loadWithToolsNode = true
case k3d . ImportModeDirect :
loadWithToolsNode = false
}
}
/ * TODO :
/ * TODO :
@ -63,6 +84,26 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
* 3. From stdin : save to tar - > import
* 3. From stdin : save to tar - > import
* Note : temporary storage location is always the shared image volume and actions are always executed by the tools node
* Note : temporary storage location is always the shared image volume and actions are always executed by the tools node
* /
* /
if loadWithToolsNode {
err = importWithToolsNode ( ctx , runtime , cluster , imagesFromRuntime , imagesFromTar , opts )
} else {
err = importWithStream ( ctx , runtime , cluster , imagesFromRuntime , imagesFromTar )
}
if err != nil {
return err
}
l . Log ( ) . Infoln ( "Successfully imported image(s)" )
return nil
}
func importWithToolsNode ( ctx context . Context , runtime runtimes . Runtime , cluster * k3d . Cluster , imagesFromRuntime [ ] string , imagesFromTar [ ] string , opts k3d . ImageImportOpts ) error {
// create tools node to export images
toolsNode , err := EnsureToolsNode ( ctx , runtime , cluster )
if err != nil {
return fmt . Errorf ( "failed to ensure that tools node is running: %w" , err )
}
var importTarNames [ ] string
var importTarNames [ ] string
if len ( imagesFromRuntime ) > 0 {
if len ( imagesFromRuntime ) > 0 {
@ -123,11 +164,109 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
l . Log ( ) . Errorf ( "failed to delete tools node '%s' (try to delete it manually): %v" , toolsNode . Name , err )
l . Log ( ) . Errorf ( "failed to delete tools node '%s' (try to delete it manually): %v" , toolsNode . Name , err )
}
}
}
}
return nil
}
l . Log ( ) . Infoln ( "Successfully imported image(s)" )
func importWithStream ( ctx context . Context , runtime runtimes . Runtime , cluster * k3d . Cluster , imagesFromRuntime [ ] string , imagesFromTar [ ] string ) error {
if len ( imagesFromRuntime ) > 0 {
l . Log ( ) . Infof ( "Loading %d image(s) from runtime into nodes..." , len ( imagesFromRuntime ) )
// open a stream to all given images
stream , err := runtime . GetImageStream ( ctx , imagesFromRuntime )
if err != nil {
return fmt . Errorf ( "could not open image stream for given images %s: %w" , imagesFromRuntime , err )
}
err = loadImageFromStream ( ctx , runtime , stream , cluster )
if err != nil {
return fmt . Errorf ( "could not load image to cluster from stream %s: %w" , imagesFromRuntime , err )
}
// load the images directly into the nodes
}
if len ( imagesFromTar ) > 0 {
// copy tarfiles to shared volume
l . Log ( ) . Infof ( "Importing images from %d tarball(s)..." , len ( imagesFromTar ) )
files := make ( [ ] * os . File , len ( imagesFromTar ) )
readers := make ( [ ] io . Reader , len ( imagesFromTar ) )
failedFiles := 0
for i , fileName := range imagesFromTar {
file , err := os . Open ( fileName )
if err != nil {
l . Log ( ) . Errorf ( "failed to read file '%s', skipping. Error below:\n%+v" , fileName , err )
failedFiles ++
continue
}
files [ i ] = file
readers [ i ] = file
}
multiReader := io . MultiReader ( readers ... )
err := loadImageFromStream ( ctx , runtime , io . NopCloser ( multiReader ) , cluster )
if err != nil {
return fmt . Errorf ( "could not load image to cluster from stream %s: %w" , imagesFromTar , err )
}
for _ , file := range files {
err := file . Close ( )
if err != nil {
l . Log ( ) . Errorf ( "Failed to close file '%s' after reading. Error below:\n%+v" , file . Name ( ) , err )
}
}
}
return nil
}
func loadImageFromStream ( ctx context . Context , runtime runtimes . Runtime , stream io . ReadCloser , cluster * k3d . Cluster ) error {
var errorGroup errgroup . Group
numNodes := 0
for _ , node := range cluster . Nodes {
// only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer)
if node . Role == k3d . ServerRole || node . Role == k3d . AgentRole {
numNodes ++
}
}
// multiplex the stream so we can write to multiple nodes
pipeReaders := make ( [ ] * io . PipeReader , numNodes )
pipeWriters := make ( [ ] io . Writer , numNodes )
for i := 0 ; i < numNodes ; i ++ {
reader , writer := io . Pipe ( )
pipeReaders [ i ] = reader
pipeWriters [ i ] = writer
}
errorGroup . Go ( func ( ) error {
_ , err := io . Copy ( io . MultiWriter ( pipeWriters ... ) , stream )
if err != nil {
return fmt . Errorf ( "failed to copy read stream. %v" , err )
}
err = stream . Close ( )
if err != nil {
return fmt . Errorf ( "failed to close stream. %v" , err )
}
return nil
return nil
} )
pipeId := 0
for _ , n := range cluster . Nodes {
node := n
// only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer)
if node . Role == k3d . ServerRole || node . Role == k3d . AgentRole {
pipeReader := pipeReaders [ pipeId ]
errorGroup . Go ( func ( ) error {
l . Log ( ) . Infof ( "Importing images into node '%s'..." , node . Name )
if err := runtime . ExecInNodeWithStdin ( ctx , node , [ ] string { "ctr" , "image" , "import" , "-" } , pipeReader ) ; err != nil {
return fmt . Errorf ( "failed to import images in node '%s': %v" , node . Name , err )
}
return nil
} )
pipeId ++
}
}
err := errorGroup . Wait ( )
if err != nil {
return fmt . Errorf ( "error loading image to cluster, first error: %v" , err )
}
return nil
}
}
type runtimeImageGetter interface {
type runtimeImageGetter interface {