Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions control-operator/pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2026 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package client

import (
"context"
"fmt"

"github.com/AliceO2Group/Control/operator/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
crClient "sigs.k8s.io/controller-runtime/pkg/client"
)

type Client struct {
client crClient.WithWatch
namespace string
}

func New(kubeconfigPath, namespace string) (*Client, error) {
config, err := buildConfig(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("building kubeconfig: %w", err)
}
return NewFromConfig(config, namespace)
}

func NewFromConfig(config *rest.Config, namespace string) (*Client, error) {
scheme := runtime.NewScheme()
if err := v1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("registering v1alpha1 scheme: %w", err)
}

c, err := crClient.NewWithWatch(config, crClient.Options{Scheme: scheme})
if err != nil {
return nil, fmt.Errorf("creating kubernetes client: %w", err)
}

return &Client{client: c, namespace: namespace}, nil
}

func buildConfig(kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}
if config, err := rest.InClusterConfig(); err == nil {
return config, nil
}
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil).ClientConfig()
}

func (c *Client) CreateTask(ctx context.Context, task *v1alpha1.Task) error {
task.Namespace = c.namespace
return c.client.Create(ctx, task)
}

func (c *Client) GetTask(ctx context.Context, name string) (*v1alpha1.Task, error) {
task := &v1alpha1.Task{}
err := c.client.Get(ctx, types.NamespacedName{Name: name, Namespace: c.namespace}, task)
return task, err
}

func (c *Client) UpdateTask(ctx context.Context, task *v1alpha1.Task) error {
task.Namespace = c.namespace
return c.client.Update(ctx, task)
}

func (c *Client) DeleteTask(ctx context.Context, name string) error {
return c.client.Delete(ctx, &v1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: c.namespace},
})
}

// WatchTasks returns a watcher for all Task resources in the namespace.
// Each event on ResultChan() carries a *v1alpha1.Task as event.Object.
func (c *Client) WatchTasks(ctx context.Context) (watch.Interface, error) {
return c.client.Watch(ctx, &v1alpha1.TaskList{}, crClient.InNamespace(c.namespace))
}

func (c *Client) CreateEnvironment(ctx context.Context, env *v1alpha1.Environment) error {
env.Namespace = c.namespace
return c.client.Create(ctx, env)
}

func (c *Client) GetEnvironment(ctx context.Context, name string) (*v1alpha1.Environment, error) {
env := &v1alpha1.Environment{}
err := c.client.Get(ctx, types.NamespacedName{Name: name, Namespace: c.namespace}, env)
return env, err
}

func (c *Client) UpdateEnvironment(ctx context.Context, env *v1alpha1.Environment) error {
env.Namespace = c.namespace
return c.client.Update(ctx, env)
}

func (c *Client) DeleteEnvironment(ctx context.Context, name string) error {
return c.client.Delete(ctx, &v1alpha1.Environment{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: c.namespace},
})
}

// WatchEnvironments returns a watcher for all Environment resources in the namespace.
// Each event on ResultChan() carries a *v1alpha1.Environment as event.Object.
func (c *Client) WatchEnvironments(ctx context.Context) (watch.Interface, error) {
return c.client.Watch(ctx, &v1alpha1.EnvironmentList{}, crClient.InNamespace(c.namespace))
}
188 changes: 188 additions & 0 deletions control-operator/pkg/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2026 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package client_test

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8swatch "k8s.io/apimachinery/pkg/watch"

aliecsv1alpha1 "github.com/AliceO2Group/Control/operator/api/v1alpha1"
"github.com/AliceO2Group/Control/operator/pkg/client"
)

var _ = Describe("Client", func() {
var (
ctx context.Context
c *client.Client
)

BeforeEach(func() {
ctx = context.Background()
var err error
c, err = client.NewFromConfig(cfg, "default")
Expect(err).NotTo(HaveOccurred())
})

Describe("Task Create, Read, Update, Delete", func() {
var task *aliecsv1alpha1.Task

BeforeEach(func() {
task = &aliecsv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{Name: "test-task"},
Spec: aliecsv1alpha1.TaskSpec{State: "standby", Pod: v1.PodSpec{Containers: []v1.Container{}}},
}
Expect(c.CreateTask(ctx, task)).To(Succeed())
})

AfterEach(func() {
_ = c.DeleteTask(ctx, task.Name)
})

It("gets a created task", func() {
got, err := c.GetTask(ctx, "test-task")
Expect(err).NotTo(HaveOccurred())
Expect(got.Name).To(Equal("test-task"))
Expect(got.Spec.State).To(Equal("standby"))
})

It("updates a task", func() {
got, err := c.GetTask(ctx, "test-task")
Expect(err).NotTo(HaveOccurred())

got.Spec.State = "running"
Expect(c.UpdateTask(ctx, got)).To(Succeed())

updated, err := c.GetTask(ctx, "test-task")
Expect(err).NotTo(HaveOccurred())
Expect(updated.Spec.State).To(Equal("running"))
})

It("deletes a task", func() {
Expect(c.DeleteTask(ctx, "test-task")).To(Succeed())

_, err := c.GetTask(ctx, "test-task")
Expect(err).To(HaveOccurred())
})
})

Describe("Task Watch", func() {
It("receives events for task changes", func() {
watcher, err := c.WatchTasks(ctx)
Expect(err).NotTo(HaveOccurred())
defer watcher.Stop()

task := &aliecsv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{Name: "watch-task"},
Spec: aliecsv1alpha1.TaskSpec{State: "standby", Pod: v1.PodSpec{Containers: []v1.Container{}}},
}
Expect(c.CreateTask(ctx, task)).To(Succeed())
defer c.DeleteTask(ctx, task.Name)

Eventually(watcher.ResultChan()).Should(Receive(Satisfy(func(e k8swatch.Event) bool {
t, ok := e.Object.(*aliecsv1alpha1.Task)
return ok && t.Name == "watch-task" && e.Type == k8swatch.Added
})))
})
})

Describe("Environment Create, Read, Update, Delete", func() {
var env *aliecsv1alpha1.Environment

BeforeEach(func() {
env = &aliecsv1alpha1.Environment{
ObjectMeta: metav1.ObjectMeta{Name: "test-env"},
Spec: aliecsv1alpha1.EnvironmentSpec{
State: "standby",
Tasks: map[string][]aliecsv1alpha1.TaskDefinition{},
},
TaskTemplates: aliecsv1alpha1.TemplateSpecification{
Tasks: map[string][]aliecsv1alpha1.TaskReference{},
},
}
Expect(c.CreateEnvironment(ctx, env)).To(Succeed())
})

AfterEach(func() {
_ = c.DeleteEnvironment(ctx, env.Name)
})

It("gets a created environment", func() {
got, err := c.GetEnvironment(ctx, "test-env")
Expect(err).NotTo(HaveOccurred())
Expect(got.Name).To(Equal("test-env"))
Expect(got.Spec.State).To(Equal("standby"))
})

It("updates an environment", func() {
got, err := c.GetEnvironment(ctx, "test-env")
Expect(err).NotTo(HaveOccurred())

got.Spec.State = "running"
Expect(c.UpdateEnvironment(ctx, got)).To(Succeed())

updated, err := c.GetEnvironment(ctx, "test-env")
Expect(err).NotTo(HaveOccurred())
Expect(updated.Spec.State).To(Equal("running"))
})

It("deletes an environment", func() {
Expect(c.DeleteEnvironment(ctx, "test-env")).To(Succeed())

_, err := c.GetEnvironment(ctx, "test-env")
Expect(err).To(HaveOccurred())
})
})

Describe("Environment Watch", func() {
It("receives events for environment changes", func() {
watcher, err := c.WatchEnvironments(ctx)
Expect(err).NotTo(HaveOccurred())
defer watcher.Stop()

env := &aliecsv1alpha1.Environment{
ObjectMeta: metav1.ObjectMeta{Name: "watch-env"},
Spec: aliecsv1alpha1.EnvironmentSpec{
State: "standby",
Tasks: map[string][]aliecsv1alpha1.TaskDefinition{},
},
TaskTemplates: aliecsv1alpha1.TemplateSpecification{
Tasks: map[string][]aliecsv1alpha1.TaskReference{},
},
}
Expect(c.CreateEnvironment(ctx, env)).To(Succeed())
defer c.DeleteEnvironment(ctx, env.Name)

Eventually(watcher.ResultChan()).Should(Receive(Satisfy(func(e k8swatch.Event) bool {
ev, ok := e.Object.(*aliecsv1alpha1.Environment)
return ok && ev.Name == "watch-env" && e.Type == k8swatch.Added
})))
})
})
})
68 changes: 68 additions & 0 deletions control-operator/pkg/client/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2026 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package client_test

import (
"path/filepath"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

var (
cfg *rest.Config
testEnv *envtest.Environment
)

func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Client Suite")
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}

var err error
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
})

var _ = AfterSuite(func() {
By("tearing down the test environment")
Expect(testEnv.Stop()).To(Succeed())
})
Loading