8000 [Feature] [Platform] Shutdown migration to CE by ajanikow · Pull Request #1776 · arangodb/kube-arangodb · GitHub
[go: up one dir, main page]

Skip to content

[Feature] [Platform] Shutdown migration to CE #1776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- (Feature) (Platform) Platform Requirements support
- (Improvement) Drop slash requirement from ArangoRoute
- (Feature) (Networking) Pass through Server Header
- (Feature) (Platform) Shutdown migration to CE

## [1.2.43](https://github.com/arangodb/kube-arangodb/tree/1.2.43) (2024-10-14)
- (Feature) ArangoRoute CRD
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ Flags:
--kubernetes.max-batch-size int Size of batch during objects read (default 256)
--kubernetes.qps float32 Number of queries per second for k8s API (default 15)
--log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty")
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
--log.sampling If true, operator will try to minimize duplication of logging events (default true)
--memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing
--metrics.excluded-prefixes stringArray List of the excluded metrics prefixes
Expand Down
2 changes: 1 addition & 1 deletion docs/cli/arangodb_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Flags:
--kubernetes.max-batch-size int Size of batch during objects read (default 256)
--kubernetes.qps float32 Number of queries per second for k8s API (default 15)
--log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty")
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
--log.sampling If true, operator will try to minimize duplication of logging events (default true)
--memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing
--metrics.excluded-prefixes stringArray List of the excluded metrics prefixes
Expand Down
170 changes: 170 additions & 0 deletions pkg/handlers/platform/shutdown/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package shutdown

import (
"context"
"fmt"
"strconv"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
core "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

pbSharedV1 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition"
pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition"
"github.com/arangodb/kube-arangodb/pkg/logging"
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/operation"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
)

var logger = logging.Global().RegisterAndGetLogger("platform-pod-shutdown", logging.Info)

type handler struct {
kubeClient kubernetes.Interface

eventRecorder event.RecorderInstance

operator operator.Operator
}

func (h *handler) Name() string {
return Kind()
}

func (h *handler) Handle(ctx context.Context, item operation.Item) error {
pod, err := util.WithKubernetesContextTimeoutP2A2(ctx, h.kubeClient.CoreV1().Pods(item.Namespace).Get, item.Name, meta.GetOptions{})
if err != nil {
if apiErrors.IsNotFound(err) {
return nil
}

return err
}

// If not annotated, stop execution
if _, ok := pod.Annotations[constants.AnnotationShutdownManagedContainer]; !ok {
return nil
}

for _, container := range pod.Status.ContainerStatuses {
v, ok := pod.Annotations[fmt.Sprintf("%s/%s", constants.AnnotationShutdownCoreContainer, container.Name)]
if !ok {
continue
}

switch v {
case constants.AnnotationShutdownCoreContainerModeWait:
if container.State.Terminated == nil {
// Container is not yet stopped, skip shutdown
return nil
}
}
}

// All containers, which are expected to shutdown, are down

for _, container := range pod.Status.ContainerStatuses {
v, ok := pod.Annotations[fmt.Sprintf("%s/%s", constants.AnnotationShutdownContainer, container.Name)]
if !ok {
continue
}

// We did not reach running state, nothing to do
if container.State.Running == nil {
continue
}

port, ok := h.getContainerPort(pod.Spec.Containers, container.Name, v)
if !ok {
// We did not find port, continue
continue
}

if port.ContainerPort == 0 {
continue
}

if pod.Status.PodIP == "" {
continue
}

if err := util.WithKubernetesContextTimeoutP1A1(ctx, h.invokeShutdown, fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)); err != nil {
logger.WrapObj(item).Err(err).Str("container", container.Name).Debug("Unable to send shutdown request")
}

logger.WrapObj(item).Str("container", container.Name).Debug("Shutdown request sent")
}

// Always return nil
return nil
}

func (h *handler) CanBeHandled(item operation.Item) bool {
return item.Group == Group() &&
item.Version == Version() &&
item.Kind == Kind()
}

func (h *handler) getContainerPort(containers []core.Container, container, port string) (core.ContainerPort, bool) {
if v, err := strconv.Atoi(port); err == nil {
return core.ContainerPort{
ContainerPort: int32(v),
}, true
}

for _, c := range containers {
if c.Name != container {
continue
}

for _, p := range c.Ports {
if p.Name == port {
return p, true
}
}
}

return core.ContainerPort{}, false
}

func (h *handler) invokeShutdown(ctx context.Context, addr string) error {
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

defer conn.Close()

client := pbShutdownV1.NewShutdownV1Client(conn)

if _, err := client.Shutdown(ctx, &pbSharedV1.Empty{}); err != nil {
return err
}

return nil
}
33 changes: 33 additions & 0 deletions pkg/handlers/platform/shutdown/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package shutdown

func Kind() string {
return "Pod"
}

func Group() string {
return ""
}

func Version() string {
return "v1"
}
62 changes: 62 additions & 0 deletions pkg/handlers/platform/shutdown/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package shutdown

import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"

operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
)

// RegisterInformer into operator
func RegisterInformer(operator operator.Operator, recorder event.Recorder, kubeClient kubernetes.Interface, informer informers.SharedInformerFactory) error {
if err := operator.RegisterInformer(informer.Core().V1().Pods().Informer(),
Group(),
Version(),
Kind(), func(obj meta.Object) bool {
if anns := obj.GetAnnotations(); len(anns) != 0 {
if _, ok := anns[constants.AnnotationShutdownManagedContainer]; ok {
return true
}
}
return false
}); err != nil {
return err
}

h := &handler{
kubeClient: kubeClient,

eventRecorder: recorder.NewInstance(Group(), Version(), Kind()),

operator: operator,
}

if err := operator.RegisterHandler(h); err != nil {
return err
}

return nil
}
9 changes: 7 additions & 2 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/handlers/job"
"github.com/arangodb/kube-arangodb/pkg/handlers/networking/route"
platformChart "github.com/arangodb/kube-arangodb/pkg/handlers/platform/chart"
platformShutdown "github.com/arangodb/kube-arangodb/pkg/handlers/platform/shutdown"
platformStorage "github.com/arangodb/kube-arangodb/pkg/handlers/platform/storage"
"github.com/arangodb/kube-arangodb/pkg/handlers/policy"
schedulerBatchJobHandler "github.com/arangodb/kube-arangodb/pkg/handlers/scheduler/batchjob"
Expand Down Expand Up @@ -350,7 +351,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st
o.onStartOperatorV2Networking(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
o.Dependencies.NetworkingProbe.SetReady()
case platformOperator:
o.onStartOperatorV2Platform(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer)
o.onStartOperatorV2Platform(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
o.Dependencies.PlatformProbe.SetReady()
case schedulerOperator:
o.onStartOperatorV2Scheduler(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
Expand Down Expand Up @@ -398,7 +399,7 @@ func (o *Operator) onStartOperatorV2Networking(operator operatorV2.Operator, rec
}
}

func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory) {
func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) {
checkFn := func() error {
_, err := o.Client.Arango().PlatformV1alpha1().ArangoPlatformStorages(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
Expand All @@ -412,6 +413,10 @@ func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recor
if err := platformChart.RegisterInformer(operator, recorder, client, kubeClient, informer); err != nil {
panic(err)
}

if err := platformShutdown.RegisterInformer(operator, recorder, kubeClient, kubeInformer); err != nil {
panic(err)
}
}

func (o *Operator) onStartOperatorV2Scheduler(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) {
Expand Down
0