From 2f97e752d59258036364674b502eb95675d77a7b Mon Sep 17 00:00:00 2001 From: Isaac Rodman Date: Thu, 8 Feb 2018 21:36:52 -0700 Subject: [PATCH] cleanup (and go fmt) of old debug logging since 17.06-ce and re-labeled all changes as follows - - support sending hostname and domainname in IPAM - support custom volume naming when using netapp volume driver - use dashes as seperators for container names - work in-progress DNS domainname changes - de-duplicate DNS search domains - add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined - cleanup duplicate ipvlan and macvlan network IDs during createNetwork (https://github.com/docker/libnetwork/pull/2055) - support optional skip of IPAM pool conflict checking - set Swarm tasks to orphaned state if node becomes unavailable - allow orphaned Swarm tasks assigned to a deleted node to start elseware - Swarm tasks in an orphaned state should not be allowed to restart - allow Swarm tasks in a remove state to be transitioned - prevent oldTaskTimer from allowing the slot to be prematurely updated - allow Swarm tasks to be cleaned up if they are in a pending state and are marked for removal - support service level anti-affinity via label h3o-limitActiveServiceSlotsPerNode --- VERSION | 2 +- .../engine/api/types/network/network.go | 4 + components/engine/container/container.go | 9 +- .../cluster/executor/container/adapter.go | 7 ++ .../cluster/executor/container/container.go | 100 +++++++++++++++++- .../engine/daemon/container_operations.go | 82 +++++++++++++- .../drivers/ipvlan/ipvlan_network.go | 21 +++- .../github.com/docker/libnetwork/endpoint.go | 18 +++- .../github.com/docker/libnetwork/network.go | 11 +- .../manager/orchestrator/replicated/tasks.go | 93 +++++++++++++--- .../manager/orchestrator/restart/restart.go | 15 ++- .../orchestrator/taskreaper/task_reaper.go | 9 +- .../swarmkit/manager/scheduler/scheduler.go | 70 +++++++++++- 13 files changed, 409 insertions(+), 32 deletions(-) diff --git a/VERSION b/VERSION index 12e33e9494c..37e2ebc6fe4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -18.01.0-ce +18.01.0-eyz diff --git a/components/engine/api/types/network/network.go b/components/engine/api/types/network/network.go index 7c7dbacc855..d073e19f1c2 100644 --- a/components/engine/api/types/network/network.go +++ b/components/engine/api/types/network/network.go @@ -26,6 +26,10 @@ type EndpointIPAMConfig struct { IPv4Address string `json:",omitempty"` IPv6Address string `json:",omitempty"` LinkLocalIPs []string `json:",omitempty"` + // eyz START: support sending hostname and domainname in IPAM + Hostname string `json:",omitempty"` + Domainname string `json:",omitempty"` + // eyz END: support sending hostname and domainname in IPAM } // Copy makes a copy of the endpoint ipam config diff --git a/components/engine/container/container.go b/components/engine/container/container.go index 11814b77195..73ec4ef0078 100644 --- a/components/engine/container/container.go +++ b/components/engine/container/container.go @@ -709,6 +709,11 @@ func (container *Container) BuildJoinOptions(n named) ([]libnetwork.EndpointOpti return nil, err } joinOptions = append(joinOptions, libnetwork.CreateOptionAlias(name, alias)) + // eyz START: add network alias which includes the container FQDN (Hostname + Domainname) + if container.Config.Domainname != "" { + joinOptions = append(joinOptions, libnetwork.CreateOptionAlias(name, container.Config.Hostname+"."+container.Config.Domainname)) + } + // eyz STOP: add network alias which includes the container FQDN (Hostname + Domainname) } for k, v := range epConfig.DriverOpts { joinOptions = append(joinOptions, libnetwork.EndpointOptionGeneric(options.Generic{k: v})) @@ -759,8 +764,10 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC return nil, errors.Errorf("Invalid IPv6 address: %s)", ipam.IPv6Address) } + // eyz START: support sending hostname and domainname in IPAM createOptions = append(createOptions, - libnetwork.CreateOptionIpam(ip, ip6, ipList, nil)) + libnetwork.CreateOptionIpam(ip, ip6, ipList, ipam.Hostname, ipam.Domainname, nil)) + // eyz END: support sending hostname and domainname in IPAM } diff --git a/components/engine/daemon/cluster/executor/container/adapter.go b/components/engine/daemon/cluster/executor/container/adapter.go index 81e740af137..7448d7c7f0f 100644 --- a/components/engine/daemon/cluster/executor/container/adapter.go +++ b/components/engine/daemon/cluster/executor/container/adapter.go @@ -382,6 +382,13 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error { continue } + // eyz START: support custom volume naming when using netapp volume driver + // Isaac (20171022): do not create netapp volumes here; we currently let them get created by the container.go code instead + if mount.VolumeOptions.DriverConfig.Name == "netapp" { + continue + } + // eyz END: support custom volume naming when using netapp volume driver + req := c.container.volumeCreateRequest(&mount) // Check if this volume exists on the engine diff --git a/components/engine/daemon/cluster/executor/container/container.go b/components/engine/daemon/cluster/executor/container/container.go index 4f41fb3e230..de7eaf8ddbd 100644 --- a/components/engine/daemon/cluster/executor/container/container.go +++ b/components/engine/daemon/cluster/executor/container/container.go @@ -4,6 +4,9 @@ import ( "errors" "fmt" "net" + // eyz START: support custom volume naming when using netapp volume driver + "regexp" + // eyz STOP: support custom volume naming when using netapp volume driver "strconv" "strings" "time" @@ -131,7 +134,16 @@ func (c *containerConfig) name() string { } // fallback to service.slot.id. - return fmt.Sprintf("%s.%s.%s", c.task.ServiceAnnotations.Name, slot, c.task.ID) + // eyz START: support custom volume naming when using netapp volume driver, use dashes as seperators for container names + //return fmt.Sprintf("%s.%s.%s", c.task.ServiceAnnotations.Name, slot, c.task.ID) + // only override container name if we are using a netapp mount volume driver + if c.hasMountDriverName("netapp") { + return fmt.Sprintf("%s-%s", c.task.ServiceAnnotations.Name, slot) + } else { + return fmt.Sprintf("%s-%s-%s", c.task.ServiceAnnotations.Name, slot, c.task.ID) + } + // eyz END: support custom volume naming when using netapp volume driver, use dashes as seperators for container names + } func (c *containerConfig) image() string { @@ -208,6 +220,40 @@ func (c *containerConfig) config() *enginecontainer.Config { Healthcheck: c.healthcheck(), } + // eyz START: support custom volume naming when using netapp volume driver, use dashes as seperators for container names, work in-progress DNS domainname changes + // If we have a defined service name and slot ... + if c.task != nil && c.task.ServiceAnnotations.Name != "" && c.task.Slot > 0 { + + slot := fmt.Sprint(c.task.Slot) + if slot == "" || c.task.Slot == 0 { + slot = c.task.NodeID + } + + if c.hasMountDriverName("netapp") { + // ... and we're using a netapp mount volume driver, then set the Hostname to "SERVICENAME-SLOT" + config.Hostname = fmt.Sprintf("%s-%s", c.task.ServiceAnnotations.Name, slot) + } else { + // ... otherwise, set the Hostname "SERVICENAME-SLOT-TASKID" + config.Hostname = fmt.Sprintf("%s-%s-%s", c.task.ServiceAnnotations.Name, slot, c.task.ID) + } + + ///// testing START + stackNamespace, stackNamespaceExists := c.task.ServiceAnnotations.Labels["com.docker.stack.namespace"] + if stackNamespaceExists { + //config.Domainname = fmt.Sprintf("%s.swarm.DOMAIN.COM", stackNamespace) + // doesn't work + // config.Domainname = stackNamespace + "-test" + // config.Domainname = stackNamespace + ".swarm" + // works - + config.Domainname = stackNamespace + } else { + config.Domainname = "nostack" + } + logrus.Debugf("* engine/daemon/cluster/executor/container config() set config.Domainname to: %s", config.Domainname) + ///// testing STOP + } + // eyz STOP: support custom volume naming when using netapp volume driver, use dashes as seperators for container names, work in-progress DNS domainname changes + if len(c.spec().Command) > 0 { // If Command is provided, we replace the whole invocation with Command // by replacing Entrypoint and specifying Cmd. Args is ignored in this @@ -255,14 +301,51 @@ func (c *containerConfig) labels() map[string]string { return labels } +// eyz START: support custom volume naming when using netapp volume driver +// helper function to find if a mount volume driver name was found +func (c *containerConfig) hasMountDriverName(mountDriverName string) bool { + for _, mount := range c.spec().Mounts { + m := convertMount(mount) + if m.VolumeOptions != nil && m.VolumeOptions.DriverConfig != nil && m.VolumeOptions.DriverConfig.Name == mountDriverName { + return true + } + } + return false +} + +// eyz END: support custom volume naming when using netapp volume driver + +// eyz START: support custom volume naming when using netapp volume driver func (c *containerConfig) mounts() []enginemount.Mount { var r []enginemount.Mount + nonWordRegex, _ := regexp.Compile("[^a-zA-Z0-9]+") for _, mount := range c.spec().Mounts { - r = append(r, convertMount(mount)) + m := convertMount(mount) + if m.VolumeOptions != nil && m.VolumeOptions.DriverConfig != nil && m.VolumeOptions.DriverConfig.Name == "netapp" && c.task != nil && c.task.ServiceAnnotations.Name != "" && c.task.Slot > 0 { + slot := fmt.Sprint(c.task.Slot) + if slot == "" || c.task.Slot == 0 { + slot = c.task.NodeID + } + + var newSourceRaw string + // if m.Source starts with c.task.ServiceAnnotations.Name, then don't include the c.task.ServiceAnnotations.Name prepend in the new volume Source; only the m.Source and slot, as c.task.ServiceAnnotations.Name would be redundant in this case + if strings.HasPrefix(m.Source, c.task.ServiceAnnotations.Name) { + newSourceRaw = fmt.Sprintf("%s_%s", m.Source, slot) + } else { + newSourceRaw = fmt.Sprintf("%s_%s_%s", c.task.ServiceAnnotations.Name, m.Source, slot) + } + // for compatibility with the NetApp Docker Volume Plugin, ensure all contiguous non-word characters are replaced with underscores + m.Source = nonWordRegex.ReplaceAllString(newSourceRaw, "_") + } + r = append(r, m) + //r = append(r, convertMount(mount)) + } return r } +// eyz END: support custom volume naming when using netapp volume driver + func convertMount(m api.Mount) enginemount.Mount { mount := enginemount.Mount{ Source: m.Source, @@ -363,6 +446,19 @@ func (c *containerConfig) hostConfig() *enginecontainer.HostConfig { hc.DNSOptions = c.spec().DNSConfig.Options } + // eyz START: work in-progress DNS domainname changes + stackNamespace, stackNamespaceExists := c.task.ServiceAnnotations.Labels["com.docker.stack.namespace"] + if stackNamespaceExists { + //stackNamespaceSearchDomainPrepend := []string{fmt.Sprintf("%s.swarm.DOMAIN.COM", stackNamespace)} + stackNamespaceSearchDomainPrepend := []string{stackNamespace} + if len(hc.DNSSearch) > 0 { + hc.DNSSearch = append(stackNamespaceSearchDomainPrepend, hc.DNSSearch...) + } else { + hc.DNSSearch = stackNamespaceSearchDomainPrepend + } + } + // eyz STOP: work in-progress DNS domainname changes + c.applyPrivileges(hc) // The format of extra hosts on swarmkit is specified in: diff --git a/components/engine/daemon/container_operations.go b/components/engine/daemon/container_operations.go index fc08f8263d7..ea09e145ef5 100644 --- a/components/engine/daemon/container_operations.go +++ b/components/engine/daemon/container_operations.go @@ -32,18 +32,43 @@ var ( getPortMapInfo = container.GetSandboxPortMapInfo ) -func (daemon *Daemon) getDNSSearchSettings(container *container.Container) []string { - if len(container.HostConfig.DNSSearch) > 0 { - return container.HostConfig.DNSSearch +// eyz START: de-duplicate DNS search domains +func removeDuplicates(elements []string) []string { + seen := map[string]bool{} + result := []string{} + + for _, element := range elements { + if !seen[element] { + seen[element] = true + result = append(result, element) + } } + return result +} + +// eyz STOP: de-duplicate DNS search domains + +// eyz START: de-duplicate DNS search domains +func (daemon *Daemon) getDNSSearchSettings(container *container.Container) []string { + /* + if len(container.HostConfig.DNSSearch) > 0 { + return container.HostConfig.DNSSearch + } + + if len(daemon.configStore.DNSSearch) > 0 { + return daemon.configStore.DNSSearch + } + */ - if len(daemon.configStore.DNSSearch) > 0 { - return daemon.configStore.DNSSearch + if len(container.HostConfig.DNSSearch) > 0 || len(daemon.configStore.DNSSearch) > 0 { + return removeDuplicates(append(container.HostConfig.DNSSearch, daemon.configStore.DNSSearch...)) } return nil } +// eyz STOP: de-duplicate DNS search domains + func (daemon *Daemon) buildSandboxOptions(container *container.Container) ([]libnetwork.SandboxOption, error) { var ( sboxOptions []libnetwork.SandboxOption @@ -116,15 +141,51 @@ func (daemon *Daemon) buildSandboxOptions(container *container.Container) ([]lib } } + // eyz START: add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined + // NOTE: if env var DOCKER_HOST_EXPORT_IP is set on the host, then + // 1) we intend to create a hosts entry of "host" with the value of DOCKER_HOST_EXPORT_IP -- dockerHostExportIpAppendHost will be set: true + // 2) we intend to persist this environment variable into each launched container + dockerHostExportIpVal, dockerHostExportIpEnvExported := os.LookupEnv("DOCKER_HOST_EXPORT_IP") + // dockerHostExportIpAppendHost defaults to true if dockerHostExportIpEnvExported + dockerHostExportIpAppendHost := dockerHostExportIpEnvExported + // eyz STOP: add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined + for _, extraHost := range container.HostConfig.ExtraHosts { // allow IPv6 addresses in extra hosts; only split on first ":" if _, err := opts.ValidateExtraHost(extraHost); err != nil { return nil, err } parts := strings.SplitN(extraHost, ":", 2) + + // eyz START: add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined + // NOTE: if an extraHost entry for "host" (case-insensitive) has been specified, then we will not append the DOCKER_HOST_EXPORT_IP -- letting this extraHost of "host" override DOCKER_HOST_EXPORT_IP even if it is set + if strings.ToLower(parts[0]) == "host" { + dockerHostExportIpAppendHost = false + } + // eyz STOP: add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined + sboxOptions = append(sboxOptions, libnetwork.OptionExtraHost(parts[0], parts[1])) } + // eyz START: add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined + // NOTE: if env var DOCKER_HOST_EXPORT_IP is set on the host, then create a hosts entry of "host" with the value of DOCKER_HOST_EXPORT_IP + if dockerHostExportIpAppendHost { + sboxOptions = append(sboxOptions, libnetwork.OptionExtraHost("host", dockerHostExportIpVal)) + } + + if dockerHostExportIpEnvExported { + dockerHostExportIpEnvInConfig := false + for _, envValue := range container.Config.Env { + if strings.HasPrefix(strings.ToUpper(envValue), "DOCKER_HOST_EXPORT_IP=") { + dockerHostExportIpEnvInConfig = true + } + } + if !dockerHostExportIpEnvInConfig { + container.Config.Env = append(container.Config.Env, fmt.Sprintf("DOCKER_HOST_EXPORT_IP=%s", dockerHostExportIpVal)) + } + } + // eyz STOP: add container "host" /etc/hosts entry equal to value of DOCKER_HOST_EXPORT_IP environment variable if defined + if container.HostConfig.PortBindings != nil { for p, b := range container.HostConfig.PortBindings { bindings[p] = []nat.PortBinding{} @@ -719,6 +780,17 @@ func (daemon *Daemon) connectToNetwork(container *container.Container, idOrName } } + // eyz START: support sending hostname and domainname in IPAM + if endpointConfig.IPAMConfig != nil { + if container.Config.Hostname != "" { + endpointConfig.IPAMConfig.Hostname = container.Config.Hostname + } + if container.Config.Domainname != "" { + endpointConfig.IPAMConfig.Domainname = container.Config.Domainname + } + } + // eyz END: support sending hostname and domainname in IPAM + err = daemon.updateNetworkConfig(container, n, endpointConfig, updateSettings) if err != nil { return err diff --git a/components/engine/vendor/github.com/docker/libnetwork/drivers/ipvlan/ipvlan_network.go b/components/engine/vendor/github.com/docker/libnetwork/drivers/ipvlan/ipvlan_network.go index a9544b5a951..46146c0a423 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/drivers/ipvlan/ipvlan_network.go +++ b/components/engine/vendor/github.com/docker/libnetwork/drivers/ipvlan/ipvlan_network.go @@ -79,10 +79,27 @@ func (d *driver) CreateNetwork(nid string, option map[string]interface{}, nInfo func (d *driver) createNetwork(config *configuration) error { networkList := d.getNetworks() for _, nw := range networkList { + // eyz START: cleanup duplicate ipvlan and macvlan network IDs during createNetwork (https://github.com/docker/libnetwork/pull/2055) + // check if the new network uses the same interface as an existing network, as different networks of this type cannot share the same interface if config.Parent == nw.config.Parent { - return fmt.Errorf("network %s is already using parent interface %s", - getDummyName(stringid.TruncateID(nw.config.ID)), config.Parent) + // if the new network ID does not match an existing network ID then we must return with an error, as the new network is not the same and cannot co-exist with the existing + if config.ID != nw.config.ID { + return fmt.Errorf("network %s is already using parent interface %s", + getDummyName(stringid.TruncateID(nw.config.ID)), config.Parent) + } + // attempt to delete the old duplicate network, and return an error if unsuccessful, or log info if successful + logrus.Infof("attempting to remove duplicate network ID %s on parent interface %s before re-creating", + getDummyName(stringid.TruncateID(config.ID)), config.Parent) + err := d.DeleteNetwork(config.ID) + if err != nil { + return fmt.Errorf("network %s is already using parent interface %s, and could not delete old duplicate network ID", + getDummyName(stringid.TruncateID(config.ID)), config.Parent) + } + // duplicate network has been removed; it is now safe to proceed + logrus.Infof("successfully removed duplicate network ID %s", + getDummyName(stringid.TruncateID(config.ID))) } + // eyz STOP: cleanup duplicate ipvlan and macvlan network IDs during createNetwork (https://github.com/docker/libnetwork/pull/2055) } if !parentExists(config.Parent) { // if the --internal flag is set, create a dummy link diff --git a/components/engine/vendor/github.com/docker/libnetwork/endpoint.go b/components/engine/vendor/github.com/docker/libnetwork/endpoint.go index a2d1dbc4c6c..e37220bdb10 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/endpoint.go +++ b/components/engine/vendor/github.com/docker/libnetwork/endpoint.go @@ -927,7 +927,8 @@ var ( ) // CreateOptionIpam function returns an option setter for the ipam configuration for this endpoint -func CreateOptionIpam(ipV4, ipV6 net.IP, llIPs []net.IP, ipamOptions map[string]string) EndpointOption { +// eyz START: support sending hostname and domainname in IPAM +func CreateOptionIpam(ipV4, ipV6 net.IP, llIPs []net.IP, hostname, domainname string, ipamOptions map[string]string) EndpointOption { return func(ep *endpoint) { ep.prefAddress = ipV4 ep.prefAddressV6 = ipV6 @@ -941,9 +942,21 @@ func CreateOptionIpam(ipV4, ipV6 net.IP, llIPs []net.IP, ipamOptions map[string] } } ep.ipamOptions = ipamOptions + + if ep.ipamOptions == nil && (hostname != "" || domainname != "") { + ep.ipamOptions = make(map[string]string) + } + if hostname != "" { + ep.ipamOptions["Hostname"] = hostname + } + if domainname != "" { + ep.ipamOptions["Domainname"] = domainname + } } } +// eyz END: support sending hostname and domainname in IPAM + // CreateOptionExposedPorts function returns an option setter for the container exposed // ports option to be passed to network.CreateEndpoint() method. func CreateOptionExposedPorts(exposedPorts []types.TransportPort) EndpointOption { @@ -999,6 +1012,9 @@ func CreateOptionAlias(name string, alias string) EndpointOption { ep.aliases = make(map[string]string) } ep.aliases[alias] = name + // eyz START: add network alias which includes the container FQDN (Hostname + Domainname) + logrus.Infof("libnetwork/endpoint.go CreateOptionAlias added ep.aliases[\"%s\"] = \"%s\"", alias, name) + // eyz STOP: add network alias which includes the container FQDN (Hostname + Domainname) } } diff --git a/components/engine/vendor/github.com/docker/libnetwork/network.go b/components/engine/vendor/github.com/docker/libnetwork/network.go index 70c55844f12..a0e244c3517 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/network.go +++ b/components/engine/vendor/github.com/docker/libnetwork/network.go @@ -1499,6 +1499,13 @@ func (n *network) requestPoolHelper(ipam ipamapi.Ipam, addressSpace, preferredPo return poolID, pool, meta, nil } + // eyz START: support optional skip of IPAM pool conflict checking + // if conflictCheck == "skip" from the IPAM pool metadata response, then we don't need to check for conflicts (in FindAvailableNetwork), and we take use the network that was given + if conflictCheck, ok := meta["conflictCheck"]; ok && conflictCheck == "skip" { + return poolID, pool, meta, nil + } + // eyz STOP: support optional skip of IPAM pool conflict checking + // Check for overlap and if none found, we have found the right pool. if _, err := netutils.FindAvailableNetwork([]*net.IPNet{pool}); err == nil { return poolID, pool, meta, nil @@ -2074,10 +2081,12 @@ func (n *network) createLoadBalancerSandbox() error { }() endpointName := n.name + "-endpoint" + // eyz START: support sending hostname and domainname in IPAM epOptions := []EndpointOption{ - CreateOptionIpam(n.loadBalancerIP, nil, nil, nil), + CreateOptionIpam(n.loadBalancerIP, nil, nil, "", "", nil), CreateOptionLoadBalancer(), } + // eyz STOP: support sending hostname and domainname in IPAM ep, err := n.createEndpoint(endpointName, epOptions...) if err != nil { return err diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go index 66000e5d867..ca071bbd4af 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go @@ -1,12 +1,15 @@ package replicated import ( + "time" + "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/orchestrator" "github.com/docker/swarmkit/manager/orchestrator/taskinit" "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/protobuf/ptypes" "golang.org/x/net/context" ) @@ -23,6 +26,9 @@ func (r *Orchestrator) handleTaskEvent(ctx context.Context, event events.Event) switch v := event.(type) { case api.EventDeleteNode: r.restartTasksByNodeID(ctx, v.Node.ID) + // eyz START: allow orphaned Swarm tasks assigned to a deleted node to start elseware + r.restartOrphanedTasksByNodeID(ctx, v.Node.ID) + // eyz STOP: allow orphaned Swarm tasks assigned to a deleted node to start elseware case api.EventCreateNode: r.handleNodeChange(ctx, v.Node) case api.EventUpdateNode: @@ -82,30 +88,87 @@ func (r *Orchestrator) tickTasks(ctx context.Context) { } } +// eyz START: set Swarm tasks to orphaned state if node becomes unavailable func (r *Orchestrator) restartTasksByNodeID(ctx context.Context, nodeID string) { - var err error - r.store.View(func(tx store.ReadTx) { - var tasks []*api.Task - tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID)) - if err != nil { - return - } + err := r.store.Batch(func(batch *store.Batch) error { + return batch.Update(func(tx store.Tx) error { + var tasks []*api.Task + var err error + tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID)) + if err != nil { + return err + } + + // Testing (Isaac, 20171106) + var n *api.Node + n = store.GetNode(tx, nodeID) + nBlockRestartNodeDown := (n != nil && n.Spec.Availability == api.NodeAvailabilityActive && n.Status.State == api.NodeStatus_DOWN) + + for _, t := range tasks { + if t.DesiredState > api.TaskStateRunning { + continue + } + + // Testing (Isaac, 20171106) + //if nBlockRestartNodeDown && t != nil && t.Status.State >= api.TaskStateAssigned && t.Status.State <= api.TaskStateRunning { + if nBlockRestartNodeDown && t.Status.State >= api.TaskStateAssigned { + //t.DesiredState = api.TaskStateOrphaned + t.Status.State = api.TaskStateOrphaned + t.Status.Timestamp = ptypes.MustTimestampProto(time.Now()) + store.UpdateTask(tx, t) + continue + } + + service := store.GetService(tx, t.ServiceID) + if orchestrator.IsReplicatedService(service) { + r.restartTasks[t.ID] = struct{}{} + } + } + + return nil + }) + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to list and/or update tasks to restart") + } +} - for _, t := range tasks { - if t.DesiredState > api.TaskStateRunning { - continue +// eyz STOP: set Swarm tasks to orphaned state if node becomes unavailable + +// eyz START: allow orphaned Swarm tasks assigned to a deleted node to start elseware +func (r *Orchestrator) restartOrphanedTasksByNodeID(ctx context.Context, nodeID string) { + err := r.store.Batch(func(batch *store.Batch) error { + return batch.Update(func(tx store.Tx) error { + var tasks []*api.Task + var err error + tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID)) + if err != nil { + return err } - service := store.GetService(tx, t.ServiceID) - if orchestrator.IsReplicatedService(service) { - r.restartTasks[t.ID] = struct{}{} + + for _, t := range tasks { + if t.Status.State == api.TaskStateOrphaned { + service := store.GetService(tx, t.ServiceID) + if orchestrator.IsReplicatedService(service) { + t.DesiredState = api.TaskStateRunning + t.Status.State = api.TaskStateShutdown + t.Status.Timestamp = ptypes.MustTimestampProto(time.Now()) + store.UpdateTask(tx, t) + r.restartTasks[t.ID] = struct{}{} + } + } } - } + + return nil + }) }) if err != nil { - log.G(ctx).WithError(err).Errorf("failed to list tasks to remove") + log.G(ctx).WithError(err).Errorf("failed to list tasks and/or restart orphans") } } +// eyz STOP: allow orphaned Swarm tasks assigned to a deleted node to start elseware + func (r *Orchestrator) handleNodeChange(ctx context.Context, n *api.Node) { if !orchestrator.InvalidNode(n) { return diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go index 6af44b734c5..9438dcae8bb 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go @@ -201,6 +201,12 @@ func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *ap return false } + // eyz START: Swarm tasks in an orphaned state should not be allowed to restart + if t.Status.State == api.TaskStateOrphaned { + return false + } + // eyz STOP: Swarm tasks in an orphaned state should not be allowed to restart + if t.Spec.Restart == nil || t.Spec.Restart.MaxAttempts == 0 { return true } @@ -306,9 +312,12 @@ func (r *Supervisor) UpdatableTasksInSlot(ctx context.Context, slot orchestrator var updatable orchestrator.Slot for _, t := range slot { - if t.DesiredState <= api.TaskStateRunning { + // eyz START: allow Swarm tasks in a remove state to be transitioned + // if t.DesiredState <= api.TaskStateRunning { + if t.DesiredState <= api.TaskStateRunning || t.DesiredState >= api.TaskStateRemove { updatable = append(updatable, t) } + // eyz STOP: allow Swarm tasks in a remove state to be transitioned } if len(updatable) > 0 { return updatable @@ -462,7 +471,9 @@ func (r *Supervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask *api.Ta if waitForTask { select { case <-watch: - case <-oldTaskTimer.C: + // eyz START: prevent oldTaskTimer from allowing the slot to be prematurely updated + //case <-oldTaskTimer.C: + // eyz STOP: prevent oldTaskTimer from allowing the slot to be prematurely updated case <-ctx.Done(): return } diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go index d7027838330..f3aa58baca7 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go @@ -141,9 +141,16 @@ func (tr *TaskReaper) Run(ctx context.Context) { // add tasks that have progressed beyond COMPLETE and have desired state REMOVE. These // tasks are associated with slots that were removed as part of a service scale down // or service removal. - if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateCompleted { + + // eyz START: allow Swarm tasks to be cleaned up if they are in a pending state and are marked for removal + // found when tasks were still in api.TaskStatePending from a bad service scale (typically when more replicas are requested than nodes which are suitable for placement + // NOTE: if we don't reap the api.TaskStatePending state tasks here, then we need to wait for an update to the service first + // RATIONALE: it should be safe to cleanup tasks which were never assigned and started, which we want to remove + // if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateCompleted { + if t.DesiredState == api.TaskStateRemove && (t.Status.State == api.TaskStatePending || t.Status.State >= api.TaskStateCompleted) { tr.cleanup = append(tr.cleanup, t.ID) } + // eyz STOP: allow Swarm tasks to be cleaned up if they are in a pending state and are marked for removal case api.EventUpdateCluster: tr.taskHistory = v.Cluster.Spec.Orchestration.TaskHistoryRetentionLimit } diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go b/components/engine/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go index 6d5b4e551bd..9920c3e1207 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go @@ -1,6 +1,10 @@ package scheduler import ( + // eyz START: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode + "sort" + "strconv" + // eyz STOP: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode "time" "github.com/docker/swarmkit/api" @@ -632,12 +636,52 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou return tasksScheduled } +// eyz START: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode +type TypeSlotSorter struct { + Keys []string + Vals []*api.Task +} + +func NewTypeSlotSorter(m map[string]*api.Task) *TypeSlotSorter { + tss := &TypeSlotSorter{ + Keys: make([]string, 0, len(m)), + Vals: make([]*api.Task, 0, len(m)), + } + for k, v := range m { + tss.Keys = append(tss.Keys, k) + tss.Vals = append(tss.Vals, v) + } + return tss +} + +func (tss TypeSlotSorter) Len() int { return len(tss.Vals) } +func (tss TypeSlotSorter) Less(i, j int) bool { return tss.Vals[i].Slot < tss.Vals[j].Slot } +func (tss TypeSlotSorter) Swap(i, j int) { + tss.Vals[i], tss.Vals[j] = tss.Vals[j], tss.Vals[i] + tss.Keys[i], tss.Keys[j] = tss.Keys[j], tss.Keys[i] +} +func (tss *TypeSlotSorter) Sort() { + sort.Sort(tss) +} + +// eyz STOP: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode + func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int { tasksScheduled := 0 failedConstraints := make(map[int]bool) // key is index in nodes slice nodeIter := 0 nodeCount := len(nodes) - for taskID, t := range taskGroup { + + // eyz START: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode + //for taskID, t := range taskGroup { + tss := NewTypeSlotSorter(taskGroup) + tss.Sort() + + for index, _ := range tss.Keys { + taskID := tss.Keys[index] + t := tss.Vals[index] + // eyz STOP: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode + // Skip tasks which were already scheduled because they ended // up in two groups at once. if _, exists := schedulingDecisions[taskID]; exists { @@ -646,6 +690,30 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup node := &nodes[nodeIter%nodeCount] + // eyz START: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode + // if t.Spec.Runtime is a *api.TaskSpec_Container, then we can check for container-related fields; if not, then we safely skip this custom code block + if candidateTaskSpecContainer, isTaskSpecContainer := t.Spec.Runtime.(*api.TaskSpec_Container); isTaskSpecContainer { + // default eyz policy; can be overwritten by container label "eyz-limitActiveServiceSlotsPerNode" being set to valid boolean false + limitActiveServiceSlotsPerNode := true + + // if there is a Labels object, then check if "eyz-limitActiveServiceSlotsPerNode" is set + if candidateTaskSpecContainer.Container.Labels != nil { + if labelStringValue, labelExists := candidateTaskSpecContainer.Container.Labels["eyz-limitActiveServiceSlotsPerNode"]; labelExists { + // if the label's string value parses to a boolean, then use that policy instead of the default for limitActiveServiceSlotsPerNode + if overrideLimitActiveServiceSlotsPerNode, booleanParseErr := strconv.ParseBool(labelStringValue); booleanParseErr == nil { + limitActiveServiceSlotsPerNode = overrideLimitActiveServiceSlotsPerNode + } + } + } + + nodeInfoCheck, nodeInfoErr := s.nodeSet.nodeInfo(node.ID) + if nodeInfoErr == nil && limitActiveServiceSlotsPerNode && nodeInfoCheck.ActiveTasksCountByService[t.ServiceID] > 0 { + log.G(ctx).WithField("task.id", t.ID).Debugf("not assigning to node %s, because this node already has %v of service ID %v active", node.ID, nodeInfoCheck.ActiveTasksCountByService[t.ServiceID], t.ServiceID) + continue + } + } + // eyz STOP: support service level anti-affinity via label eyz-limitActiveServiceSlotsPerNode + log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID) newT := *t newT.NodeID = node.ID