allow for adding ports to a running cluster via the loadbalancer

pull/670/head
iwilltry42 3 years ago
parent c6ee295deb
commit 2516cad12e
No known key found for this signature in database
GPG Key ID: 7BA57AD1CFF16110
  1. 1
      cmd/cluster/cluster.go
  2. 8
      cmd/cluster/clusterEdit.go
  3. 75
      pkg/client/cluster.go
  4. 40
      pkg/client/loadbalancer.go
  5. 4
      pkg/client/node.go
  6. 127
      pkg/client/ports.go
  7. 117
      pkg/config/transform.go

@ -48,6 +48,7 @@ func NewCmdCluster() *cobra.Command {
cmd.AddCommand(NewCmdClusterStop())
cmd.AddCommand(NewCmdClusterDelete())
cmd.AddCommand(NewCmdClusterList())
cmd.AddCommand(NewCmdClusterEdit())
// add flags

@ -32,8 +32,8 @@ import (
"github.com/spf13/cobra"
)
// NewCmdNodeEdit returns a new cobra command
func NewCmdNodeEdit() *cobra.Command {
// NewCmdClusterEdit returns a new cobra command
func NewCmdClusterEdit() *cobra.Command {
// create new cobra command
cmd := &cobra.Command{
@ -49,6 +49,10 @@ func NewCmdNodeEdit() *cobra.Command {
log.Debugf("===== Current =====\n%+v\n===== Changeset =====\n%+v\n", existingCluster, changeset)
if err := client.ClusterEditChangesetSimple(cmd.Context(), runtimes.SelectedRuntime, existingCluster, changeset); err != nil {
log.Fatalf("Failed to update the cluster: %v", err)
}
log.Infof("Successfully updated %s", existingCluster.Name)
},

@ -35,8 +35,8 @@ import (
"github.com/docker/go-connections/nat"
"github.com/imdario/mergo"
copystruct "github.com/mitchellh/copystructure"
"github.com/rancher/k3d/v4/pkg/actions"
conftypes "github.com/rancher/k3d/v4/pkg/config/types"
config "github.com/rancher/k3d/v4/pkg/config/v1alpha3"
k3drt "github.com/rancher/k3d/v4/pkg/runtimes"
"github.com/rancher/k3d/v4/pkg/runtimes/docker"
@ -1033,47 +1033,78 @@ func prepCreateLocalRegistryHostingConfigMap(ctx context.Context, runtime k3drt.
// ClusterEditChangesetSimple modifies an existing cluster with a given SimpleConfig changeset
func ClusterEditChangesetSimple(ctx context.Context, runtime k3drt.Runtime, cluster *k3d.Cluster, changeset *config.SimpleConfig) error {
nodeCount := len(cluster.Nodes)
// nodeCount := len(cluster.Nodes)
nodeList := cluster.Nodes
// === Ports ===
existingLB := cluster.ServerLoadBalancer
lbChangeset :=
lbChangeset := &k3d.Loadbalancer{}
if len(changeset.Ports) > 0 {
for _, portWithNodeFilters := range changeset.Ports {
log.Tracef("inspecting port mapping for %s with nodefilters %s", portWithNodeFilters.Port, portWithNodeFilters.NodeFilters)
if len(portWithNodeFilters.NodeFilters) == 0 && nodeCount > 1 {
log.Infof("portmapping '%s' lacks a nodefilter, but there's more than one node: defaulting to %s", portWithNodeFilters.Port, conftypes.DefaultTargetsNodefiltersPortMappings)
portWithNodeFilters.NodeFilters = conftypes.DefaultTargetsNodefiltersPortMappings
}
// copy existing loadbalancer
lbChangesetNode, err := CopyNode(ctx, existingLB.Node, CopyNodeOpts{keepState: false})
if err != nil {
return fmt.Errorf("error copying existing loadbalancer: %w", err)
}
for _, f := range portWithNodeFilters.NodeFilters {
if strings.HasPrefix(f, "loadbalancer") {
log.Infof("portmapping '%s' targets the loadbalancer: defaulting to %s", portWithNodeFilters.Port, conftypes.DefaultTargetsNodefiltersPortMappings)
portWithNodeFilters.NodeFilters = conftypes.DefaultTargetsNodefiltersPortMappings
break
}
}
lbChangeset.Node = lbChangesetNode
// copy config from existing loadbalancer
lbChangesetConfig, err := copystruct.Copy(existingLB.Config)
if err != nil {
return fmt.Errorf("error copying config from existing loadbalancer: %w", err)
}
lbChangeset.Config = lbChangesetConfig.(*k3d.LoadbalancerConfig)
// loop over ports
if len(changeset.Ports) > 0 {
// 1. ensure that there are only supported suffices in the node filters // TODO: overly complex right now, needs simplification
for _, portWithNodeFilters := range changeset.Ports {
filteredNodes, err := util.FilterNodesWithSuffix(nodeList, portWithNodeFilters.NodeFilters)
if err != nil {
return err
}
for suffix, nodes := range filteredNodes {
for suffix := range filteredNodes {
switch suffix {
case "proxy", util.NodeFilterSuffixNone:
break
case util.NodeFilterMapKeyAll:
break
case "proxy", util.NodeFilterSuffixNone, util.NodeFilterMapKeyAll:
continue
default:
return fmt.Errorf("error: 'cluster edit' does not (yet) support the '%s' opt/suffix for adding ports", suffix)
}
}
}
// 2. transform
cluster.ServerLoadBalancer = lbChangeset // we're working with pointers, so let's point to the changeset here to not update the original that we keep as a reference
if err := TransformPorts(ctx, runtime, cluster, changeset.Ports); err != nil {
return fmt.Errorf("error transforming port config %s: %w", changeset.Ports, err)
}
}
log.Debugf("ORIGINAL:\n> Ports: %+v\n> Config: %+v\nCHANGESET:\n> Ports: %+v\n> Config: %+v", existingLB.Node.Ports, existingLB.Config, lbChangeset.Node.Ports, lbChangeset.Config)
// prepare to write config to lb container
configyaml, err := yaml.Marshal(lbChangeset.Config)
if err != nil {
return err
}
writeLbConfigAction := k3d.NodeHook{
Stage: k3d.LifecycleStagePreStart,
Action: actions.WriteFileAction{
Runtime: runtime,
Dest: k3d.DefaultLoadbalancerConfigPath,
Mode: 0744,
Content: configyaml,
},
}
if lbChangeset.Node.HookActions == nil {
lbChangeset.Node.HookActions = []k3d.NodeHook{}
}
lbChangeset.Node.HookActions = append(lbChangeset.Node.HookActions, writeLbConfigAction)
NodeReplace(ctx, runtime, existingLB.Node, lbChangeset.Node)
return nil
}

@ -39,8 +39,9 @@ import (
)
var (
LBConfigErrHostNotFound = errors.New("lbconfig: host not found")
LBConfigErrFailedTest = errors.New("lbconfig: failed to test")
ErrLBConfigHostNotFound error = errors.New("lbconfig: host not found")
ErrLBConfigFailedTest error = errors.New("lbconfig: failed to test")
ErrLBConfigEntryExists error = errors.New("lbconfig: entry exists in config")
)
// UpdateLoadbalancerConfig updates the loadbalancer config with an updated list of servers belonging to that cluster
@ -91,14 +92,14 @@ func UpdateLoadbalancerConfig(ctx context.Context, runtime runtimes.Runtime, clu
err = NodeWaitForLogMessage(failureCtx, runtime, cluster.ServerLoadBalancer.Node, "host not found in upstream", startTime)
if err != nil {
log.Warnf("Failed to check if the loadbalancer was configured correctly or if it broke. Please check it manually or try again: %v", err)
return LBConfigErrFailedTest
return ErrLBConfigFailedTest
} else {
log.Warnln("Failed to configure loadbalancer because one of the nodes seems to be down! Run `k3d node list` to see which one it could be.")
return LBConfigErrHostNotFound
return ErrLBConfigHostNotFound
}
} else {
log.Warnf("Failed to ensure that loadbalancer was configured correctly. Please check it manually or try again: %v", err)
return LBConfigErrFailedTest
return ErrLBConfigFailedTest
}
}
log.Infof("Successfully configured loadbalancer %s!", cluster.ServerLoadBalancer.Node.Name)
@ -206,3 +207,32 @@ func LoadbalancerPrepare(ctx context.Context, runtime runtimes.Runtime, cluster
return lbNode, nil
}
func loadbalancerAddPortConfigs(loadbalancer *k3d.Loadbalancer, portmapping nat.PortMapping, targetNodes []*k3d.Node) error {
portconfig := fmt.Sprintf("%s.%s", portmapping.Port.Port(), portmapping.Port.Proto())
nodenames := []string{}
for _, node := range targetNodes {
if node.Role == k3d.LoadBalancerRole {
return fmt.Errorf("error adding port config to loadbalancer: cannot add port config referencing the loadbalancer itself (loop)")
}
nodenames = append(nodenames, node.Name)
}
// entry for that port doesn't exist yet, so we simply create it with the list of node names
if _, ok := loadbalancer.Config.Ports[portconfig]; !ok {
loadbalancer.Config.Ports[portconfig] = nodenames
return nil
}
nodenameLoop:
for _, nodename := range nodenames {
for _, existingNames := range loadbalancer.Config.Ports[portconfig] {
if nodename == existingNames {
continue nodenameLoop
}
loadbalancer.Config.Ports[portconfig] = append(loadbalancer.Config.Ports[portconfig], nodename)
}
}
return nil
}

@ -209,7 +209,7 @@ func NodeAddToCluster(ctx context.Context, runtime runtimes.Runtime, node *k3d.N
if node.Role == k3d.ServerRole {
log.Infoln("Updating loadbalancer config to include new server node(s)")
if err := UpdateLoadbalancerConfig(ctx, runtime, cluster); err != nil {
if !errors.Is(err, LBConfigErrHostNotFound) {
if !errors.Is(err, ErrLBConfigHostNotFound) {
return fmt.Errorf("error updating loadbalancer: %w", err)
}
}
@ -473,7 +473,7 @@ func NodeDelete(ctx context.Context, runtime runtimes.Runtime, node *k3d.Node, o
// if it's a server node, then update the loadbalancer configuration
if node.Role == k3d.ServerRole {
if err := UpdateLoadbalancerConfig(ctx, runtime, cluster); err != nil {
if !errors.Is(err, LBConfigErrHostNotFound) {
if !errors.Is(err, ErrLBConfigHostNotFound) {
return fmt.Errorf("Failed to update cluster loadbalancer: %w", err)
}
}

@ -0,0 +1,127 @@
/*
Copyright © 2020-2021 The k3d Author(s)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package client
import (
"context"
"errors"
"fmt"
"strings"
"github.com/docker/go-connections/nat"
"github.com/rancher/k3d/v4/pkg/config/types"
config "github.com/rancher/k3d/v4/pkg/config/v1alpha3"
"github.com/rancher/k3d/v4/pkg/runtimes"
k3d "github.com/rancher/k3d/v4/pkg/types"
"github.com/rancher/k3d/v4/pkg/util"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
var (
ErrNodeAddPortsExists error = errors.New("port exists on target")
)
func TransformPorts(ctx context.Context, runtime runtimes.Runtime, cluster *k3d.Cluster, portsWithNodeFilters []config.PortWithNodeFilters) error {
nodeCount := len(cluster.Nodes)
nodeList := cluster.Nodes
for _, portWithNodeFilters := range portsWithNodeFilters {
log.Tracef("inspecting port mapping for %s with nodefilters %s", portWithNodeFilters.Port, portWithNodeFilters.NodeFilters)
if len(portWithNodeFilters.NodeFilters) == 0 && nodeCount > 1 {
log.Infof("portmapping '%s' lacks a nodefilter, but there's more than one node: defaulting to %s", portWithNodeFilters.Port, types.DefaultTargetsNodefiltersPortMappings)
portWithNodeFilters.NodeFilters = types.DefaultTargetsNodefiltersPortMappings
}
for _, f := range portWithNodeFilters.NodeFilters {
if strings.HasPrefix(f, "loadbalancer") {
log.Infof("portmapping '%s' targets the loadbalancer: defaulting to %s", portWithNodeFilters.Port, types.DefaultTargetsNodefiltersPortMappings)
portWithNodeFilters.NodeFilters = types.DefaultTargetsNodefiltersPortMappings
break
}
}
filteredNodes, err := util.FilterNodesWithSuffix(nodeList, portWithNodeFilters.NodeFilters)
if err != nil {
return err
}
for suffix, nodes := range filteredNodes {
portmappings, err := nat.ParsePortSpec(portWithNodeFilters.Port)
if err != nil {
return fmt.Errorf("error parsing port spec '%s': %+v", portWithNodeFilters.Port, err)
}
if suffix == "proxy" || suffix == util.NodeFilterSuffixNone { // proxy is the default suffix for port mappings
if cluster.ServerLoadBalancer == nil {
return fmt.Errorf("port-mapping of type 'proxy' specified, but loadbalancer is disabled")
}
if err := addPortMappings(cluster.ServerLoadBalancer.Node, portmappings); err != nil {
return err
}
for _, pm := range portmappings {
if err := loadbalancerAddPortConfigs(cluster.ServerLoadBalancer, pm, nodes); err != nil {
return err
}
}
} else if suffix == "direct" {
if len(nodes) > 1 {
return fmt.Errorf("error: cannot apply a direct port-mapping (%s) to more than one node", portmappings)
}
for _, node := range nodes {
if err := addPortMappings(node, portmappings); err != nil {
return err
}
}
} else if suffix != util.NodeFilterMapKeyAll {
return fmt.Errorf("error adding port mappings: unknown suffix %s", suffix)
}
}
}
// print generated loadbalancer config
if log.GetLevel() >= log.DebugLevel {
yamlized, err := yaml.Marshal(cluster.ServerLoadBalancer.Config)
if err != nil {
log.Errorf("error printing loadbalancer config: %v", err)
} else {
log.Debugf("generated loadbalancer config:\n%s", string(yamlized))
}
}
return nil
}
func addPortMappings(node *k3d.Node, portmappings []nat.PortMapping) error {
if node.Ports == nil {
node.Ports = nat.PortMap{}
}
for _, pm := range portmappings {
if _, exists := node.Ports[pm.Port]; exists {
node.Ports[pm.Port] = append(node.Ports[pm.Port], pm.Binding)
} else {
node.Ports[pm.Port] = []nat.PortBinding{pm.Binding}
}
}
return nil
}

@ -32,7 +32,6 @@ import (
"github.com/docker/go-connections/nat"
cliutil "github.com/rancher/k3d/v4/cmd/util" // TODO: move parseapiport to pkg
"github.com/rancher/k3d/v4/pkg/client"
"github.com/rancher/k3d/v4/pkg/config/types"
conf "github.com/rancher/k3d/v4/pkg/config/v1alpha3"
"github.com/rancher/k3d/v4/pkg/runtimes"
k3d "github.com/rancher/k3d/v4/pkg/types"
@ -175,7 +174,7 @@ func TransformSimpleToClusterConfig(ctx context.Context, runtime runtimes.Runtim
}
// -> PORTS
if err := TransformPorts(ctx, runtime, &newCluster, simpleConfig.Ports); err != nil {
if err := client.TransformPorts(ctx, runtime, &newCluster, simpleConfig.Ports); err != nil {
return nil, err
}
@ -347,117 +346,3 @@ func TransformSimpleToClusterConfig(ctx context.Context, runtime runtimes.Runtim
return clusterConfig, nil
}
func addPortMappings(node *k3d.Node, portmappings []nat.PortMapping) error {
if node.Ports == nil {
node.Ports = nat.PortMap{}
}
for _, pm := range portmappings {
if _, exists := node.Ports[pm.Port]; exists {
node.Ports[pm.Port] = append(node.Ports[pm.Port], pm.Binding)
} else {
node.Ports[pm.Port] = []nat.PortBinding{pm.Binding}
}
}
return nil
}
func loadbalancerAddPortConfigs(loadbalancer *k3d.Loadbalancer, portmapping nat.PortMapping, targetNodes []*k3d.Node) error {
portconfig := fmt.Sprintf("%s.%s", portmapping.Port.Port(), portmapping.Port.Proto())
nodenames := []string{}
for _, node := range targetNodes {
if node.Role == k3d.LoadBalancerRole {
return fmt.Errorf("error adding port config to loadbalancer: cannot add port config referencing the loadbalancer itself (loop)")
}
nodenames = append(nodenames, node.Name)
}
// entry for that port doesn't exist yet, so we simply create it with the list of node names
if _, ok := loadbalancer.Config.Ports[portconfig]; !ok {
loadbalancer.Config.Ports[portconfig] = nodenames
return nil
}
nodenameLoop:
for _, nodename := range nodenames {
for _, existingNames := range loadbalancer.Config.Ports[portconfig] {
if nodename == existingNames {
continue nodenameLoop
}
loadbalancer.Config.Ports[portconfig] = append(loadbalancer.Config.Ports[portconfig], nodename)
}
}
return nil
}
func TransformPorts(ctx context.Context, runtime runtimes.Runtime, cluster *k3d.Cluster, portsWithNodeFilters []conf.PortWithNodeFilters) error {
nodeCount := len(cluster.Nodes)
nodeList := cluster.Nodes
for _, portWithNodeFilters := range portsWithNodeFilters {
log.Tracef("inspecting port mapping for %s with nodefilters %s", portWithNodeFilters.Port, portWithNodeFilters.NodeFilters)
if len(portWithNodeFilters.NodeFilters) == 0 && nodeCount > 1 {
log.Infof("portmapping '%s' lacks a nodefilter, but there's more than one node: defaulting to %s", portWithNodeFilters.Port, types.DefaultTargetsNodefiltersPortMappings)
portWithNodeFilters.NodeFilters = types.DefaultTargetsNodefiltersPortMappings
}
for _, f := range portWithNodeFilters.NodeFilters {
if strings.HasPrefix(f, "loadbalancer") {
log.Infof("portmapping '%s' targets the loadbalancer: defaulting to %s", portWithNodeFilters.Port, types.DefaultTargetsNodefiltersPortMappings)
portWithNodeFilters.NodeFilters = types.DefaultTargetsNodefiltersPortMappings
break
}
}
filteredNodes, err := util.FilterNodesWithSuffix(nodeList, portWithNodeFilters.NodeFilters)
if err != nil {
return err
}
for suffix, nodes := range filteredNodes {
portmappings, err := nat.ParsePortSpec(portWithNodeFilters.Port)
if err != nil {
return fmt.Errorf("error parsing port spec '%s': %+v", portWithNodeFilters.Port, err)
}
if suffix == "proxy" || suffix == util.NodeFilterSuffixNone { // proxy is the default suffix for port mappings
if cluster.ServerLoadBalancer == nil {
return fmt.Errorf("port-mapping of type 'proxy' specified, but loadbalancer is disabled")
}
if err := addPortMappings(cluster.ServerLoadBalancer.Node, portmappings); err != nil {
return err
}
for _, pm := range portmappings {
if err := loadbalancerAddPortConfigs(cluster.ServerLoadBalancer, pm, nodes); err != nil {
return err
}
}
} else if suffix == "direct" {
if len(nodes) > 1 {
return fmt.Errorf("error: cannot apply a direct port-mapping (%s) to more than one node", portmappings)
}
for _, node := range nodes {
if err := addPortMappings(node, portmappings); err != nil {
return err
}
}
} else if suffix != util.NodeFilterMapKeyAll {
return fmt.Errorf("error adding port mappings: unknown suffix %s", suffix)
}
}
}
// print generated loadbalancer config
if log.GetLevel() >= log.DebugLevel {
yamlized, err := yaml.Marshal(cluster.ServerLoadBalancer.Config)
if err != nil {
log.Errorf("error printing loadbalancer config: %v", err)
} else {
log.Debugf("generated loadbalancer config:\n%s", string(yamlized))
}
}
return nil
}

Loading…
Cancel
Save