From 08e9a80048c3349ce990d18d71ef8837ddfe06a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 5 Mar 2026 14:41:39 +0100 Subject: [PATCH 1/5] OCTRL-1081 kubectl task to create rudimentary bridge to kubernetes user infor setup properly... kubectl passes arguments properly to the kubernetes attempt for fairmq bla --- README.md | 1 + common/controlmode/controlmode.go | 10 + core/task/scheduler.go | 3 +- core/task/task.go | 11 +- core/task/taskclass/class.go | 19 +- docs/kubernetes_ecs.md | 70 +++++ executor/executable/kubectltask.go | 395 ++++++++++++++++++++++++ executor/executable/kubectltask_test.go | 80 +++++ executor/executable/task.go | 24 +- 9 files changed, 598 insertions(+), 15 deletions(-) create mode 100644 docs/kubernetes_ecs.md create mode 100644 executor/executable/kubectltask.go create mode 100644 executor/executable/kubectltask_test.go diff --git a/README.md b/README.md index 80bb8d7a8..2d1b3d87a 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,7 @@ There are two ways of interacting with AliECS: * Kubernetes * [Operator controller](/control-operator/README.md) * [Testing manifests](/control-operator/ecs-manifests/kubernetes-ecs.md) + * [ECS bridge to Kubernetes](/docs/kubernetes_ecs.md) * Resources * T. Mrnjavac et. al, [AliECS: A New Experiment Control System for the ALICE Experiment](https://doi.org/10.1051/epjconf/202429502027), CHEP23 diff --git a/common/controlmode/controlmode.go b/common/controlmode/controlmode.go index 6d6dc8fdb..5cab19729 100644 --- a/common/controlmode/controlmode.go +++ b/common/controlmode/controlmode.go @@ -39,6 +39,8 @@ const ( FAIRMQ BASIC HOOK + KUBECTL_DIRECT + KUBECTL_FAIRMQ ) func (cm ControlMode) String() string { @@ -51,6 +53,10 @@ func (cm ControlMode) String() string { return "basic" case HOOK: return "hook" + case KUBECTL_DIRECT: + return "kubectl_direct" + case KUBECTL_FAIRMQ: + return "kubectl_fairmq" } return "direct" } @@ -71,6 +77,10 @@ func (cm *ControlMode) UnmarshalText(b []byte) error { *cm = BASIC case "hook": *cm = HOOK + case "kubectl_direct": + *cm = KUBECTL_DIRECT + case "kubectl_fairmq": + *cm = KUBECTL_FAIRMQ default: *cm = DIRECT } diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 5e2d60fe4..737cad73e 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -1432,7 +1432,8 @@ func makeTaskForMesosResources( cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%d", "OCC_CONTROL_PORT", controlPort)) } - if cmd.ControlMode == controlmode.FAIRMQ { + if cmd.ControlMode == controlmode.FAIRMQ || + cmd.ControlMode == controlmode.KUBECTL_FAIRMQ { cmd.Arguments = append(cmd.Arguments, "--control-port", strconv.FormatUint(controlPort, 10)) } diff --git a/core/task/task.go b/core/task/task.go index ba3c459e5..bee19a1b5 100644 --- a/core/task/task.go +++ b/core/task/task.go @@ -286,7 +286,9 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) { if class.Control.Mode == controlmode.BASIC || class.Control.Mode == controlmode.HOOK || class.Control.Mode == controlmode.DIRECT || - class.Control.Mode == controlmode.FAIRMQ { + class.Control.Mode == controlmode.FAIRMQ || + class.Control.Mode == controlmode.KUBECTL_DIRECT || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { var varStack map[string]string // First we get the full varStack from the parent role, and @@ -393,7 +395,8 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) { } } - if class.Control.Mode == controlmode.FAIRMQ { + if class.Control.Mode == controlmode.FAIRMQ || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { // FIXME read this from configuration // if the task class doesn't provide an id, we generate one ourselves if !utils.StringSliceContains(cmd.Arguments, "--id") { @@ -635,7 +638,9 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand // For FAIRMQ tasks, we append FairMQ channel configuration if class.Control.Mode == controlmode.FAIRMQ || - class.Control.Mode == controlmode.DIRECT { + class.Control.Mode == controlmode.DIRECT || + class.Control.Mode == controlmode.KUBECTL_DIRECT || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { for _, inbCh := range channel.MergeInbound(parent.CollectInboundChannels(), class.Bind) { // We get the FairMQ-formatted propertyMap from the inbound channel spec var chanProps controlcommands.PropertyMap diff --git a/core/task/taskclass/class.go b/core/task/taskclass/class.go index 902741c6c..4a4e6329d 100644 --- a/core/task/taskclass/class.go +++ b/core/task/taskclass/class.go @@ -123,7 +123,6 @@ func (c *Class) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { } } return - } func (c *Class) MarshalYAML() (interface{}, error) { @@ -154,13 +153,17 @@ func (c *Class) MarshalYAML() (interface{}, error) { Command: c.Command, } - if c.Control.Mode == controlmode.FAIRMQ { - aux.Control.Mode = "fairmq" - } else if c.Control.Mode == controlmode.BASIC { - aux.Control.Mode = "basic" - } else { - aux.Control.Mode = "direct" - } + // if c.Control.Mode == controlmode.FAIRMQ { + // aux.Control.Mode = "fairmq" + // } else if c.Control.Mode == controlmode.BASIC { + // aux.Control.Mode = "basic" + // } else if c.Control.Mode == controlmode.KUBECTL { + // aux.Control.Mode = "kubectl" + // } else { + // aux.Control.Mode = "direct" + // } + + aux.Control.Mode = c.Control.Mode.String() return aux, nil } diff --git a/docs/kubernetes_ecs.md b/docs/kubernetes_ecs.md new file mode 100644 index 000000000..ce698bdcf --- /dev/null +++ b/docs/kubernetes_ecs.md @@ -0,0 +1,70 @@ +# ECS with Kubernetes + +> ⚠️ **Warning** +> All Kubernetes work done is in a stage of prototype. + +## Kubernetes Cluster + +While prototyping we used many Kubernetes clusters, namely [`kind`](https://kind.sigs.k8s.io/), [`minikube`](https://minikube.sigs.k8s.io/docs/) and [`k3s`](https://k3s.io/) +in both local and remote cluster deployment. We used Openstack for remote deployment. +Follow the guides at the individual distributions in order to create the desired cluster setup. +For now we chose `k3s` for most of the activities performed because it is lightweight +and easily installed distribution which is also [`CNCF`](https://www.cncf.io/training/certification/) certified. + +All settings of `k3s` were used as default except one: locked-in-memory size. Use `ulimit -l` to learn +what is the limit for the current user and `LimitMEMLOCK` inside the k3s systemd service config +to set it for correct value. Right now the `flp` user has unlimited size (`LimitMEMLOCK=infinity`). +This config is necessary because even if you are running PODs with the privileged security context +under user flp, Kubernetes still sets limits according to its internal settings and doesn't +respect linux settings. + +Another setup we expect at this moment to be present at the target nodes +is ability to run PODs with privileged permissions and also under user `flp`. +This means that the machine has to have `flp` user setup the same way as +if you would do the installation with [`o2-flp-setup`](https://alice-flp.docs.cern.ch/Operations/Experts/system-configuration/utils/o2-flp-setup/). + +## Running tasks (`KubectlTask`) + +ECS is setup to run tasks through Mesos on all required hosts baremetal with active +task management (see [`ControllableTask`](/executor/executable/controllabletask.go)) +and OCC gRPC communication. When running docker task through ECS we could easily +wrap command to be run into the docker container with proper settings +([see](/docs/running_docker.md)). This is however not possible for Kubernetes +workloads as the PODs are "hidden" inside the cluster. So we plan +to deploy our own Task Controller which will connect to and guide +OCC state machine of required tasks. Thus we need to create custom +POC way to communicate with Kubernetes cluster from Mesos executor. + +The reason why we don't call Kubernetes cluster directly from ECS core +is that ECS does a lot of heavy lifting while deploying workloads, +monitoring workloads and by generating a lot of configuration which +is not trivial to replicate manually. However, if we create some class +that would be able to deploy one task into the Kubernetes and monitor its +state we could replicate `ControllableTask` workflow and leave ECS +mostly intact for now, save a lot of work and focus on prototyping +Kubernetes operator pattern. + +Thus [`KubectlTask`](/executor/executable/kubectltask.go) was created. This class +is written as a wrapper around `kubectl` utility to manage Kubernetes cluster. +It is based on following `kubectl` commands: + +* `apply` => `kubectl apply -f manifest.yaml` - deploys resource described inside given manifest +* `delete` => `kubectl delete -f manifest.yaml` - deletes resource from cluster +* `patch` => `kubectl patch -f exampletask.yaml --type='json' -p='[{"op": "replace", "path": "/spec/state", "value": "running"}]` - changes the state of resource inside cluster +* `get` => `kubectl get -f manifest.yaml -o jsonpath='{.spec.state}'` - queries exact field of resource (`state` in the example) inside cluster. + +These four commands allow us to deploy and monitor status of the deployed +resource without necessity to interact with it directly. However `KubectlTask` +expects that resource is the CRD [Task](/control-operator/api/v1alpha1/task_types.go). + +In order to activate `KubectlTask` you need to change yaml template +inside the `ControlWorkflows` directory. Namely: + +* add path to the kubectl manifest as the first argument in `.command.arguments` field +* change `.control.mode` to either `kubectl_direct` or `kubectl_fairmq` +You can find working template inside `control-operator/ecs-manifests/control-workflows/*_kube.yaml` + +Working kubectl manifests are to be found in `control-operator/ecs-manifests/kubernetes-manifests`. +You can see `*test.yaml` for concrete deployable manifests by `kubectl apply`, the rest +are the templates with variables to be filled in in a `${var}` format. `KubectlTask` +fills these variables from env vars. diff --git a/executor/executable/kubectltask.go b/executor/executable/kubectltask.go new file mode 100644 index 000000000..4a7112595 --- /dev/null +++ b/executor/executable/kubectltask.go @@ -0,0 +1,395 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018-2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package executable + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "os/user" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/AliceO2Group/Control/core/controlcommands" + "github.com/AliceO2Group/Control/executor/executorcmd" + mesos "github.com/mesos/mesos-go/api/v1/lib" + "github.com/sirupsen/logrus" +) + +const ( + KUBECTL string = "kubectl" + + APPLY string = "apply" + DELETE string = "delete" + PATCH string = "patch" + GET string = "get" + TASK string = "task" + CREATE string = "create" + + // TRANSITION_TIMEOUT = 10 * time.Second // inside controllable task +) + +type KubectlTask struct { + taskBase + rpc *executorcmd.RpcClient + configYaml string + running bool + latestStatus atomic.Value +} + +func GetUserInfo(username string) (uid, gid int64, supplemental []int64, err error) { + u, err := user.Lookup(username) + if err != nil { + return 0, 0, nil, err + } + + // Convert UID + uidInt, _ := strconv.ParseInt(u.Uid, 10, 64) + + // Convert Primary GID + gidInt, _ := strconv.ParseInt(u.Gid, 10, 64) + + // Get Supplemental Groups (e.g., wheel, pda) + groupStrings, _ := u.GroupIds() + var supplementalInts []int64 + for _, g := range groupStrings { + gInt, _ := strconv.ParseInt(g, 10, 64) + // Avoid adding the primary GID to the supplemental list + if gInt != gidInt { + supplementalInts = append(supplementalInts, gInt) + } + } + + return uidInt, gidInt, supplementalInts, nil +} + +func (task *KubectlTask) Launch() error { + if len(task.Tci.Arguments) == 0 { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }). + Error("no arguments in kubectl task. We need to have at least manifest location as the last argument") + return errors.New("no arguments for kubectl task. Location for kubernetes manifest needed") + } + + task.configYaml = task.Tci.Arguments[0] + + // Read the template file + content, err := os.ReadFile(task.configYaml) + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "file": task.configYaml, + }).WithError(err).Error("failed to read kubectl config file") + return err + } + + // Set the AliECS environment variables in the local process + // so os.ExpandEnv can find them + for _, envVar := range task.Tci.Env { + parts := strings.SplitN(envVar, "=", 2) + if len(parts) == 2 { + os.Setenv(parts[0], parts[1]) + } + } + + // Set arguments into the KUBE_ARGUMENTS os env leaving the kubemanifest file + arguments := task.Tci.Arguments[1:] + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "args": arguments, + }).Info("setting arguments as a KUBE_ARGUMENTS env var") + + os.Setenv("KUBE_ARGUMENTS", strings.Join(arguments, " ")) + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": *task.Tci.Value, + }).Info("setting command as a KUBE_COMMAND env var") + os.Setenv("KUBE_COMMAND", *task.Tci.Value) + + if uid, gid, supplementalIds, err := GetUserInfo("flp"); err == nil { + os.Setenv("FLP_UID", strconv.FormatInt(uid, 10)) + os.Setenv("FLP_GID", strconv.FormatInt(gid, 10)) + + var strIds []string + for _, id := range supplementalIds { + strIds = append(strIds, strconv.FormatInt(id, 10)) + } + supplementalString := "[" + strings.Join(strIds, ", ") + "]" + + os.Setenv("FLP_SUPPLEMENTAL_GORUPS", supplementalString) + } else { + log.Error("we cannot run kubectl task as flp user because we didn't find user details") + } + + // Replace ${VAR} placeholders with actual values + expandedYaml := os.ExpandEnv(string(content)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Apply via Stdin (-) + command := exec.CommandContext(ctx, KUBECTL, APPLY, "-f", "-") + command.Stdin = strings.NewReader(expandedYaml) + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("Starting kubectl apply via Stdin") + + var stdoutBuf bytes.Buffer + var stderrBuf bytes.Buffer + + command.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + command.Stderr = io.MultiWriter(os.Stderr, &stderrBuf) + + err = command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Errorf("kubectl apply failed stderr: %s , stdin: %s", stderrBuf.String(), stdoutBuf.String()) + return err + } + + task.latestStatus.Store("") + task.running = true + go task.eventLoop() + return nil +} + +func (task *KubectlTask) Kill() error { + task.running = false + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + command := exec.CommandContext(ctx, KUBECTL, DELETE, "-f", task.configYaml) + + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + err := command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Error("kubectl delete failed") + return err + } + + task.sendStatus(task.knownEnvironmentId, mesos.TASK_FINISHED, "") + + return nil +} + +func (task *KubectlTask) Transition(transition *executorcmd.ExecutorCommand_Transition) *controlcommands.MesosCommandResponse_Transition { + // kubectl patch -f exampletask.yaml --type='json' -p='[{"op": "replace", "path": "/spec/state", "value": "running"}]' + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Extract the transition arguments (the 'Mix') to pipe them to Kubernetes + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "args": transition.Arguments, + }).Info("Patching transition arguments to Kubernetes") + + argsJSON, err := json.Marshal(transition.Arguments) + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Error("failed to marshal transition arguments") + return transition.PrepareResponse(err, transition.Source, task.ti.TaskID.Value) + } + + transitionJSON := fmt.Sprintf(`[ + {"op": "add", "path": "/spec/state", "value": "%s"}, + {"op": "add", "path": "/spec/arguments", "value": %s} + ]`, strings.ToLower(transition.Destination), string(argsJSON)) + + command := exec.CommandContext(ctx, KUBECTL, PATCH, "-f", task.configYaml, "--type=json", "-p", transitionJSON) + + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("Starting kubectl patch") + + statusBeforeTransition := task.latestStatus.Load().(string) + + err = command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).WithError(err).Error("kubectl patch failed") + return transition.PrepareResponse(err, transition.Source, task.ti.TaskID.Value) + } + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("kubectl patch suceeded, waiting for actual status change") + actualStatus := "" + timeout := time.After(TRANSITION_TIMEOUT) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + +loop: + for { + select { + case <-ticker.C: + actualStatus = task.latestStatus.Load().(string) + if actualStatus != statusBeforeTransition { + break loop + } + case <-timeout: + return transition.PrepareResponse(errors.New("timeout waiting for status change"), statusBeforeTransition, task.ti.TaskID.Value) + } + } + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Infof("status changed from %s to %s", statusBeforeTransition, actualStatus) + + // TODO: I am not sure what PID should I put here + return transition.PrepareResponse(nil, actualStatus, task.ti.TaskID.Value) +} + +func (task *KubectlTask) getTaskStatus() (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + // command := exec.CommandContext(ctx, KUBECTL, GET, TASK, task.ti.Name, "-o", "jsonpath={.status.state}") + command := exec.CommandContext(ctx, KUBECTL, GET, "-f", task.configYaml, "-o", "jsonpath={.status.state}") + + var stdoutBuf bytes.Buffer + + command.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + command.Stderr = os.Stderr + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Debug("Starting kubectl get task") + + err := command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).WithError(err).Error("kubectl get task failed") + return "", err + } + + // no newlines + return strings.TrimSpace(stdoutBuf.String()), nil +} + +func (task *KubectlTask) eventLoop() { + errorCount := 0 + maxErrors := 5 + for task.running { + time.Sleep(5 * time.Second) + status, err := task.getTaskStatus() + if err != nil { + errorCount += 1 + if errorCount < maxErrors { + log.WithError(err).Warnf("failed to get Task Status, retrying %d/%d", errorCount, maxErrors) + continue + } + log.WithError(err).Errorf("failed to get Task Status, sending TASK_FAILED and breaking from the eventLoop") + task.sendStatus(task.knownEnvironmentId, mesos.TASK_FAILED, "couldn't get task status via kubectl") + task.running = false + // TODO: remove when debugging done + // _ = task.Kill() + break + } + + status = strings.ToUpper(status) + + if task.latestStatus.Load().(string) == status { + continue + } + task.latestStatus.Store(status) + + var state mesos.TaskState + switch status { + case "CONFIGURED", "RUNNING", "STANDBY": + state = mesos.TASK_RUNNING + + case "ERROR": + state = mesos.TASK_FAILED + log.WithError(err).Error("Received error from kubectl task, terminating everything and sending update") + task.running = false + // TODO: remove when debugging done + // _ = task.Kill() + // + + default: + log.Errorf("received different status than expected: %s", status) + continue + } + + log.Debugf("sending new status from kubectl task %s", status) + task.sendStatus(task.knownEnvironmentId, state, "") + + } +} + +func (task *KubectlTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) { + cmd = new(executorcmd.ExecutorCommand_Transition) + if task.rpc != nil { + cmd.Transitioner = task.rpc.Transitioner + } + err = json.Unmarshal(data, cmd) + if err != nil { + cmd = nil + } + return +} diff --git a/executor/executable/kubectltask_test.go b/executor/executable/kubectltask_test.go new file mode 100644 index 000000000..6dbb6d300 --- /dev/null +++ b/executor/executable/kubectltask_test.go @@ -0,0 +1,80 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018-2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package executable + +import ( + "testing" + "time" + + "github.com/AliceO2Group/Control/common" + "github.com/AliceO2Group/Control/executor/executorcmd" + mesos "github.com/mesos/mesos-go/api/v1/lib" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("kubectl task test", func() { + task := KubectlTask{} + task.Tci = &common.TaskCommandInfo{} + task.Tci.Arguments = []string{"exampletask.yaml"} + task.configYaml = "exampletask.yaml" + task.ti = &mesos.TaskInfo{Name: "exampletask"} + When("starting and stoping the task", func() { + It("should start and stop accordingly", func() { + err := task.Launch() + Expect(err).NotTo(HaveOccurred()) + + // just so we can see monitoring tools show something + time.Sleep(2 * time.Second) + + err = task.Kill() + Expect(err).NotTo(HaveOccurred()) + }) + }) + + When("transitioning to running", func() { + It("should transition", func() { + task.configYaml = "exampletask.yaml" + transitionMsg := &executorcmd.ExecutorCommand_Transition{} + transitionMsg.Destination = "running" + result := task.Transition(transitionMsg) + Expect(result.ErrorString).To(BeEmpty()) + }) + }) + + // this test expects there is a task with a status on cluster. + When("geting current status", func() { + It("should get proper status", func() { + status, err := task.getTaskStatus() + Expect(err).NotTo(HaveOccurred()) + Expect(status).To(Equal("standby")) + }) + }) +}) + +func TestKubectlTask(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "KubectlTask suite") +} diff --git a/executor/executable/task.go b/executor/executable/task.go index e23b56394..abb92b14e 100644 --- a/executor/executable/task.go +++ b/executor/executable/task.go @@ -58,9 +58,11 @@ const ( var log = logger.New(logrus.StandardLogger(), "executor") -type SendStatusFunc func(envId uid.ID, state mesos.TaskState, message string) -type SendDeviceEventFunc func(envId uid.ID, event event.DeviceEvent) -type SendMessageFunc func(message []byte) +type ( + SendStatusFunc func(envId uid.ID, state mesos.TaskState, message string) + SendDeviceEventFunc func(envId uid.ID, event event.DeviceEvent) + SendMessageFunc func(message []byte) +) type Task interface { Launch() error @@ -90,6 +92,7 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE log.WithField("json", string(tciData[:])). Trace("received TaskCommandInfo") + log.WithField("findme", "here").Info(string(tciData)) if err := json.Unmarshal(tciData, &commandInfo); tciData != nil && err == nil { log.WithFields(logrus.Fields{ "shell": *commandInfo.Shell, @@ -177,6 +180,21 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE }, rpc: nil, } + case controlmode.KUBECTL_DIRECT: + fallthrough + case controlmode.KUBECTL_FAIRMQ: + newTask = &KubectlTask{ + taskBase: taskBase{ + ti: &taskInfo, + Tci: &commandInfo, + sendStatus: sendStatusFunc, + sendDeviceEvent: sendDeviceEventFunc, + sendMessage: sendMessageFunc, + knownEnvironmentId: envId, + knownDetector: detector, + }, + rpc: nil, + } } return newTask From a8890fdd3253a7af858a5a47e550c2ccc53d005c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 10 Apr 2026 12:41:17 +0200 Subject: [PATCH 2/5] fixed documentation --- docs/kubernetes_ecs.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/kubernetes_ecs.md b/docs/kubernetes_ecs.md index ce698bdcf..893c4cff4 100644 --- a/docs/kubernetes_ecs.md +++ b/docs/kubernetes_ecs.md @@ -8,18 +8,18 @@ While prototyping we used many Kubernetes clusters, namely [`kind`](https://kind.sigs.k8s.io/), [`minikube`](https://minikube.sigs.k8s.io/docs/) and [`k3s`](https://k3s.io/) in both local and remote cluster deployment. We used Openstack for remote deployment. Follow the guides at the individual distributions in order to create the desired cluster setup. -For now we chose `k3s` for most of the activities performed because it is lightweight +k3s is recommended to run this prototype, as it is lightweight and easily installed distribution which is also [`CNCF`](https://www.cncf.io/training/certification/) certified. All settings of `k3s` were used as default except one: locked-in-memory size. Use `ulimit -l` to learn what is the limit for the current user and `LimitMEMLOCK` inside the k3s systemd service config to set it for correct value. Right now the `flp` user has unlimited size (`LimitMEMLOCK=infinity`). -This config is necessary because even if you are running PODs with the privileged security context +This config is necessary because even if you are running Pods with the privileged security context under user flp, Kubernetes still sets limits according to its internal settings and doesn't respect linux settings. Another setup we expect at this moment to be present at the target nodes -is ability to run PODs with privileged permissions and also under user `flp`. +is ability to run Pods with privileged permissions and also under user `flp`. This means that the machine has to have `flp` user setup the same way as if you would do the installation with [`o2-flp-setup`](https://alice-flp.docs.cern.ch/Operations/Experts/system-configuration/utils/o2-flp-setup/). @@ -30,7 +30,7 @@ task management (see [`ControllableTask`](/executor/executable/controllabletask. and OCC gRPC communication. When running docker task through ECS we could easily wrap command to be run into the docker container with proper settings ([see](/docs/running_docker.md)). This is however not possible for Kubernetes -workloads as the PODs are "hidden" inside the cluster. So we plan +workloads as the Pods are "hidden" inside the cluster. So we plan to deploy our own Task Controller which will connect to and guide OCC state machine of required tasks. Thus we need to create custom POC way to communicate with Kubernetes cluster from Mesos executor. From d756d8006c554a5505a543013d0394c2bb5d5392 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 14 Apr 2026 17:39:58 +0200 Subject: [PATCH 3/5] adding kubeconfig to kubectl calls so it works even without $HOME set --- executor/executable/kubectltask.go | 44 ++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/executor/executable/kubectltask.go b/executor/executable/kubectltask.go index 4a7112595..9311129e4 100644 --- a/executor/executable/kubectltask.go +++ b/executor/executable/kubectltask.go @@ -34,6 +34,7 @@ import ( "os" "os/exec" "os/user" + "path/filepath" "strconv" "strings" "sync/atomic" @@ -58,6 +59,32 @@ const ( // TRANSITION_TIMEOUT = 10 * time.Second // inside controllable task ) +var kubeconfigDir string + +func init() { + if kc := os.Getenv("KUBECONFIG"); kc != "" { + kubeconfigDir = kc + return + } + if u, err := user.Current(); err == nil { + tempPath := filepath.Join(u.HomeDir, ".kube", "config") + if _, err = os.Stat(tempPath); err == nil { + kubeconfigDir = tempPath + } + } +} + +func kubectl(ctx context.Context, arg ...string) *exec.Cmd { + if kubeconfigDir == "" { + log.Warn(`kubectl config was not set, thus kubectl might not be able to find a cluster. +Either KUBECONFIG env var was not found, or current user was not determined, or home/.kube/config does not exist. +Using kubectl builtin defaults`) + return exec.CommandContext(ctx, KUBECTL, arg...) + } else { + return exec.CommandContext(ctx, KUBECTL, append([]string{"--kubeconfig", kubeconfigDir}, arg...)...) + } +} + type KubectlTask struct { taskBase rpc *executorcmd.RpcClient @@ -164,13 +191,14 @@ func (task *KubectlTask) Launch() error { defer cancel() // Apply via Stdin (-) - command := exec.CommandContext(ctx, KUBECTL, APPLY, "-f", "-") + command := kubectl(ctx, APPLY, "-f", "-") command.Stdin = strings.NewReader(expandedYaml) log.WithFields(logrus.Fields{ - "controlmode": task.Tci.ControlMode, - "name": task.ti.Name, - "command": command, + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + "expandedYaml": expandedYaml, }).Info("Starting kubectl apply via Stdin") var stdoutBuf bytes.Buffer @@ -199,7 +227,7 @@ func (task *KubectlTask) Kill() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - command := exec.CommandContext(ctx, KUBECTL, DELETE, "-f", task.configYaml) + command := kubectl(ctx, DELETE, "-f", task.configYaml) command.Stdout = os.Stdout command.Stderr = os.Stderr @@ -245,7 +273,7 @@ func (task *KubectlTask) Transition(transition *executorcmd.ExecutorCommand_Tran {"op": "add", "path": "/spec/arguments", "value": %s} ]`, strings.ToLower(transition.Destination), string(argsJSON)) - command := exec.CommandContext(ctx, KUBECTL, PATCH, "-f", task.configYaml, "--type=json", "-p", transitionJSON) + command := kubectl(ctx, PATCH, "-f", task.configYaml, "--type=json", "-p", transitionJSON) command.Stdout = os.Stdout command.Stderr = os.Stderr @@ -303,8 +331,8 @@ loop: func (task *KubectlTask) getTaskStatus() (string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - // command := exec.CommandContext(ctx, KUBECTL, GET, TASK, task.ti.Name, "-o", "jsonpath={.status.state}") - command := exec.CommandContext(ctx, KUBECTL, GET, "-f", task.configYaml, "-o", "jsonpath={.status.state}") + // command := exec.CommandContext(ctx, KUBECTL, GET, "-f", task.configYaml, "-o", "jsonpath={.status.state}") + command := kubectl(ctx, GET, "-f", task.configYaml, "-o", "jsonpath={.status.state}") var stdoutBuf bytes.Buffer From da0124140c79ef8a8e4cfcbe04d3a536f13dda8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 28 Apr 2026 17:45:52 +0200 Subject: [PATCH 4/5] fixing documentation and typos --- control-operator/README.md | 29 +++++++++++++++++++++-------- docs/kubernetes_ecs.md | 8 +++++++- executor/executable/kubectltask.go | 4 ++-- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/control-operator/README.md b/control-operator/README.md index 4813880ef..852a2aa5b 100644 --- a/control-operator/README.md +++ b/control-operator/README.md @@ -1,33 +1,42 @@ # operator -// TODO(user): Add simple overview of use/purpose + +Folder with operators regarding Task and Environment deployment. ## Description -// TODO(user): An in-depth paragraph about your project and overview of use + +In order to deploy Task and Environment workflows to the k8s cluster you need controllers and operators +controlling custom CRDs defining ALICE custom workload. This Folder defines and implements all moving parts together with Makefile +to build, deploy, install CRDs and operators. ## Getting Started -You’ll need a Kubernetes cluster to run against. You can use [KIND](https://sigs.k8s.io/kind) to get a local cluster for testing, or run against a remote cluster. -**Note:** Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster `kubectl cluster-info` shows). + +You’ll need a Kubernetes cluster to run against. You can use [KIND](https://sigs.k8s.io/kind) to get a local cluster for testing, or run against a remote cluster. Author had the most success with K3s [see](/docs/kubernetes_ecs.md). +**Note:** Your controller will automatically use the current context in your kubeconfig (usually ~/.kube/config) file (i.e. whatever cluster `kubectl cluster-info` shows). ### Running on the cluster + +Following commands show basic use of Makefile. However this isn't exhaustive list. + 1. Install Instances of Custom Resources: ```sh kubectl apply -f config/samples/ ``` -2. Build and push your image to the location specified by `IMG`: +1. Build and push your image to the location specified by `IMG`: ```sh make docker-build docker-push IMG=/operator:tag ``` -3. Deploy the controller to the cluster with the image specified by `IMG`: +1. Deploy the controller to the cluster with the image specified by `IMG`: ```sh make deploy IMG=/operator:tag ``` ### Uninstall CRDs + To delete the CRDs from the cluster: ```sh @@ -35,6 +44,7 @@ make uninstall ``` ### Undeploy controller + UnDeploy the controller from the cluster: ```sh @@ -42,22 +52,25 @@ make undeploy ``` ## Contributing + // TODO(user): Add detailed information on how you would like others to contribute to this project ### How it works + This project aims to follow the Kubernetes [Operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/). It uses [Controllers](https://kubernetes.io/docs/concepts/architecture/controller/), which provide a reconcile function responsible for synchronizing resources until the desired state is reached on the cluster. ### Test It Out + 1. Install the CRDs into the cluster: ```sh make install ``` -2. Run your controller (this will run in the foreground, so switch to a new terminal if you want to leave it running): +1. Run your controller (this will run in the foreground, so switch to a new terminal if you want to leave it running): ```sh make run @@ -66,6 +79,7 @@ make run **NOTE:** You can also run this in one step by running: `make install run` ### Modifying the API definitions + If you are editing the API definitions, generate the manifests such as CRs or CRDs using: ```sh @@ -91,4 +105,3 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - diff --git a/docs/kubernetes_ecs.md b/docs/kubernetes_ecs.md index 893c4cff4..dfe89e03d 100644 --- a/docs/kubernetes_ecs.md +++ b/docs/kubernetes_ecs.md @@ -23,6 +23,12 @@ is ability to run Pods with privileged permissions and also under user `flp`. This means that the machine has to have `flp` user setup the same way as if you would do the installation with [`o2-flp-setup`](https://alice-flp.docs.cern.ch/Operations/Experts/system-configuration/utils/o2-flp-setup/). +## Task Controller + +Following text assumes that there is a Task Controller from `control-operator` running +at your K8s cluster and Task CRD installed at your cluster. +You can find the details about the usage in the [documentation](/control-operator/README.md). + ## Running tasks (`KubectlTask`) ECS is setup to run tasks through Mesos on all required hosts baremetal with active @@ -62,7 +68,7 @@ inside the `ControlWorkflows` directory. Namely: * add path to the kubectl manifest as the first argument in `.command.arguments` field * change `.control.mode` to either `kubectl_direct` or `kubectl_fairmq` -You can find working template inside `control-operator/ecs-manifests/control-workflows/*_kube.yaml` +You can find working template inside `control-operator/ecs-manifests/control-workflows/*-kube.yaml` Working kubectl manifests are to be found in `control-operator/ecs-manifests/kubernetes-manifests`. You can see `*test.yaml` for concrete deployable manifests by `kubectl apply`, the rest diff --git a/executor/executable/kubectltask.go b/executor/executable/kubectltask.go index 9311129e4..dc685d3c8 100644 --- a/executor/executable/kubectltask.go +++ b/executor/executable/kubectltask.go @@ -179,7 +179,7 @@ func (task *KubectlTask) Launch() error { } supplementalString := "[" + strings.Join(strIds, ", ") + "]" - os.Setenv("FLP_SUPPLEMENTAL_GORUPS", supplementalString) + os.Setenv("FLP_SUPPLEMENTAL_GROUPS", supplementalString) } else { log.Error("we cannot run kubectl task as flp user because we didn't find user details") } @@ -212,7 +212,7 @@ func (task *KubectlTask) Launch() error { log.WithFields(logrus.Fields{ "controlmode": task.Tci.ControlMode, "name": task.ti.Name, - }).WithError(err).Errorf("kubectl apply failed stderr: %s , stdin: %s", stderrBuf.String(), stdoutBuf.String()) + }).WithError(err).Errorf("kubectl apply failed stderr: %s , stdout: %s", stderrBuf.String(), stdoutBuf.String()) return err } From d36fdf7f36cc0e04b5442848a25de55c0b91be95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Mon, 4 May 2026 16:54:34 +0200 Subject: [PATCH 5/5] strip quotes in kubectl_task.go --- executor/executable/kubectltask.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/executor/executable/kubectltask.go b/executor/executable/kubectltask.go index dc685d3c8..7a764390a 100644 --- a/executor/executable/kubectltask.go +++ b/executor/executable/kubectltask.go @@ -147,7 +147,11 @@ func (task *KubectlTask) Launch() error { for _, envVar := range task.Tci.Env { parts := strings.SplitN(envVar, "=", 2) if len(parts) == 2 { - os.Setenv(parts[0], parts[1]) + value := parts[1] + if n := len(value); n >= 2 && ((value[0] == '"' && value[n-1] == '"') || (value[0] == '\'' && value[n-1] == '\'')) { + value = value[1 : n-1] + } + os.Setenv(parts[0], value) } }