clusterStart: only run necessary actions

- e.g. can only wait for specific log messages, if the nodes have been
restarted for real and not if they were already running

fixes #847
pull/871/head
iwilltry42 3 years ago
parent d78ef48932
commit 23ddbf67b1
No known key found for this signature in database
GPG Key ID: 7BA57AD1CFF16110
  1. 238
      pkg/client/cluster.go

@ -848,16 +848,20 @@ func ClusterStart(ctx context.Context, runtime k3drt.Runtime, cluster *k3d.Clust
var agents []*k3d.Node
var aux []*k3d.Node
for _, n := range cluster.Nodes {
if n.Role == k3d.ServerRole {
if n.ServerOpts.IsInit {
initNode = n
continue
if !n.State.Running {
if n.Role == k3d.ServerRole {
if n.ServerOpts.IsInit {
initNode = n
continue
}
servers = append(servers, n)
} else if n.Role == k3d.AgentRole {
agents = append(agents, n)
} else {
aux = append(aux, n)
}
servers = append(servers, n)
} else if n.Role == k3d.AgentRole {
agents = append(agents, n)
} else {
aux = append(aux, n)
l.Log().Tracef("Node %s already running.", n.Name)
}
}
@ -884,146 +888,162 @@ func ClusterStart(ctx context.Context, runtime k3drt.Runtime, cluster *k3d.Clust
/*
* Server Nodes
*/
l.Log().Infoln("Starting servers...")
for _, serverNode := range servers {
if err := NodeStart(ctx, runtime, serverNode, &k3d.NodeStartOpts{
Wait: true,
NodeHooks: append(clusterStartOpts.NodeHooks, serverNode.HookActions...),
EnvironmentInfo: clusterStartOpts.EnvironmentInfo,
}); err != nil {
return fmt.Errorf("Failed to start server %s: %+v", serverNode.Name, err)
if len(servers) > 0 {
l.Log().Infoln("Starting servers...")
for _, serverNode := range servers {
if err := NodeStart(ctx, runtime, serverNode, &k3d.NodeStartOpts{
Wait: true,
NodeHooks: append(clusterStartOpts.NodeHooks, serverNode.HookActions...),
EnvironmentInfo: clusterStartOpts.EnvironmentInfo,
}); err != nil {
return fmt.Errorf("Failed to start server %s: %+v", serverNode.Name, err)
}
}
} else {
l.Log().Infoln("All servers already running.")
}
/*
* Agent Nodes
*/
agentWG, aCtx := errgroup.WithContext(ctx)
l.Log().Infoln("Starting agents...")
for _, agentNode := range agents {
currentAgentNode := agentNode
agentWG.Go(func() error {
return NodeStart(aCtx, runtime, currentAgentNode, &k3d.NodeStartOpts{
Wait: true,
NodeHooks: clusterStartOpts.NodeHooks,
EnvironmentInfo: clusterStartOpts.EnvironmentInfo,
if len(agents) > 0 {
agentWG, aCtx := errgroup.WithContext(ctx)
l.Log().Infoln("Starting agents...")
for _, agentNode := range agents {
currentAgentNode := agentNode
agentWG.Go(func() error {
return NodeStart(aCtx, runtime, currentAgentNode, &k3d.NodeStartOpts{
Wait: true,
NodeHooks: clusterStartOpts.NodeHooks,
EnvironmentInfo: clusterStartOpts.EnvironmentInfo,
})
})
})
}
if err := agentWG.Wait(); err != nil {
return fmt.Errorf("Failed to add one or more agents: %w", err)
}
if err := agentWG.Wait(); err != nil {
return fmt.Errorf("Failed to add one or more agents: %w", err)
}
} else {
l.Log().Infoln("All agents already running.")
}
/*
* Auxiliary/Helper Nodes
*/
helperWG, hCtx := errgroup.WithContext(ctx)
l.Log().Infoln("Starting helpers...")
for _, helperNode := range aux {
currentHelperNode := helperNode
if len(aux) > 0 {
helperWG, hCtx := errgroup.WithContext(ctx)
l.Log().Infoln("Starting helpers...")
for _, helperNode := range aux {
currentHelperNode := helperNode
helperWG.Go(func() error {
nodeStartOpts := &k3d.NodeStartOpts{
NodeHooks: currentHelperNode.HookActions,
EnvironmentInfo: clusterStartOpts.EnvironmentInfo,
}
if currentHelperNode.Role == k3d.LoadBalancerRole {
nodeStartOpts.Wait = true
}
helperWG.Go(func() error {
nodeStartOpts := &k3d.NodeStartOpts{
NodeHooks: currentHelperNode.HookActions,
EnvironmentInfo: clusterStartOpts.EnvironmentInfo,
}
if currentHelperNode.Role == k3d.LoadBalancerRole {
nodeStartOpts.Wait = true
}
return NodeStart(hCtx, runtime, currentHelperNode, nodeStartOpts)
})
}
return NodeStart(hCtx, runtime, currentHelperNode, nodeStartOpts)
})
}
if err := helperWG.Wait(); err != nil {
return fmt.Errorf("Failed to add one or more helper nodes: %w", err)
if err := helperWG.Wait(); err != nil {
return fmt.Errorf("Failed to add one or more helper nodes: %w", err)
}
} else {
l.Log().Infoln("All helpers already running.")
}
/*
* Additional Cluster Preparation (post start)
*/
postStartErrgrp, postStartErrgrpCtx := errgroup.WithContext(ctx)
if len(servers) > 0 || len(agents) > 0 { // TODO: make checks for required cluster start actions cleaner
/*** DNS ***/
postStartErrgrp, postStartErrgrpCtx := errgroup.WithContext(ctx)
// add host.k3d.internal record to /etc/hosts in all nodes
postStartErrgrp.Go(func() error {
return prepInjectHostIP(postStartErrgrpCtx, runtime, cluster, &clusterStartOpts)
})
/*** DNS ***/
postStartErrgrp.Go(func() error {
// add host.k3d.internal record to /etc/hosts in all nodes
postStartErrgrp.Go(func() error {
return prepInjectHostIP(postStartErrgrpCtx, runtime, cluster, &clusterStartOpts)
})
if cluster.Network.Name == "host" {
l.Log().Debugf("Not injecting host.k3d.internal into CoreDNS as clusternetwork is 'host'")
return nil
}
if len(servers) > 0 {
postStartErrgrp.Go(func() error {
hosts := fmt.Sprintf("%s %s\n", clusterStartOpts.EnvironmentInfo.HostGateway.String(), k3d.DefaultK3dInternalHostRecord)
if cluster.Network.Name == "host" {
l.Log().Debugf("Not injecting host.k3d.internal into CoreDNS as clusternetwork is 'host'")
return nil
}
net, err := runtime.GetNetwork(ctx, &cluster.Network)
if err != nil {
return fmt.Errorf("failed to get cluster network %s to inject host records into CoreDNS: %w", cluster.Network.Name, err)
}
for _, member := range net.Members {
hosts += fmt.Sprintf("%s %s\n", member.IP.String(), member.Name)
}
hosts := fmt.Sprintf("%s %s\n", clusterStartOpts.EnvironmentInfo.HostGateway.String(), k3d.DefaultK3dInternalHostRecord)
l.Log().Infof("Injecting records for host.k3d.internal and for %d network members into CoreDNS configmap...", len(net.Members))
act := actions.RewriteFileAction{
Runtime: runtime,
Path: "/var/lib/rancher/k3s/server/manifests/coredns.yaml",
Mode: 0744,
RewriteFunc: func(input []byte) ([]byte, error) {
split, err := util.SplitYAML(input)
net, err := runtime.GetNetwork(ctx, &cluster.Network)
if err != nil {
return nil, fmt.Errorf("error splitting yaml: %w", err)
return fmt.Errorf("failed to get cluster network %s to inject host records into CoreDNS: %w", cluster.Network.Name, err)
}
for _, member := range net.Members {
hosts += fmt.Sprintf("%s %s\n", member.IP.String(), member.Name)
}
var outputBuf bytes.Buffer
outputEncoder := yaml.NewEncoder(&outputBuf)
l.Log().Infof("Injecting records for host.k3d.internal and for %d network members into CoreDNS configmap...", len(net.Members))
act := actions.RewriteFileAction{
Runtime: runtime,
Path: "/var/lib/rancher/k3s/server/manifests/coredns.yaml",
Mode: 0744,
RewriteFunc: func(input []byte) ([]byte, error) {
split, err := util.SplitYAML(input)
if err != nil {
return nil, fmt.Errorf("error splitting yaml: %w", err)
}
for _, d := range split {
var doc map[string]interface{}
if err := yaml.Unmarshal(d, &doc); err != nil {
return nil, err
}
if kind, ok := doc["kind"]; ok {
if strings.ToLower(kind.(string)) == "configmap" {
configmapData := doc["data"].(map[interface{}]interface{})
configmapData["NodeHosts"] = hosts
var outputBuf bytes.Buffer
outputEncoder := yaml.NewEncoder(&outputBuf)
for _, d := range split {
var doc map[string]interface{}
if err := yaml.Unmarshal(d, &doc); err != nil {
return nil, err
}
if kind, ok := doc["kind"]; ok {
if strings.ToLower(kind.(string)) == "configmap" {
configmapData := doc["data"].(map[interface{}]interface{})
configmapData["NodeHosts"] = hosts
}
}
if err := outputEncoder.Encode(doc); err != nil {
return nil, err
}
}
}
if err := outputEncoder.Encode(doc); err != nil {
return nil, err
}
outputEncoder.Close()
return outputBuf.Bytes(), nil
},
}
outputEncoder.Close()
return outputBuf.Bytes(), nil
},
}
// get the first server in the list and run action on it once it's ready for it
for _, n := range cluster.Nodes {
if n.Role == k3d.ServerRole {
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z", n.State.Started)
if err != nil {
return err
}
if err := NodeWaitForLogMessage(ctx, runtime, n, "Cluster dns configmap", ts.Truncate(time.Second)); err != nil {
return err
// get the first server in the list and run action on it once it's ready for it
for _, n := range cluster.Nodes {
if n.Role == k3d.ServerRole {
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z", n.State.Started)
if err != nil {
return err
}
if err := NodeWaitForLogMessage(ctx, runtime, n, "Cluster dns configmap", ts.Truncate(time.Second)); err != nil {
return err
}
return act.Run(ctx, n)
}
}
return act.Run(ctx, n)
}
return nil
})
}
return nil
})
if err := postStartErrgrp.Wait(); err != nil {
return fmt.Errorf("error during post-start cluster preparation: %w", err)
if err := postStartErrgrp.Wait(); err != nil {
return fmt.Errorf("error during post-start cluster preparation: %w", err)
}
}
return nil

Loading…
Cancel
Save