From 9abe016562e3ffa3c45b4757eb18cae1e98081ae Mon Sep 17 00:00:00 2001 From: Ilyes Ben Dlala Date: Tue, 6 Jun 2023 17:26:30 +0200 Subject: [PATCH 1/5] #275 Added a concurrency policy option for scheduledScan CRD Signed-off-by: Ilyes Ben Dlala --- .../apis/execution/v1/scheduledscan_types.go | 28 +++++++++++++++ ...ution.securecodebox.io_scheduledscans.yaml | 11 ++++++ .../execution/scheduledscan_controller.go | 35 +++++++++++++++++++ 3 files changed, 74 insertions(+) diff --git a/operator/apis/execution/v1/scheduledscan_types.go b/operator/apis/execution/v1/scheduledscan_types.go index 09b06365a3..052d2048aa 100644 --- a/operator/apis/execution/v1/scheduledscan_types.go +++ b/operator/apis/execution/v1/scheduledscan_types.go @@ -29,11 +29,20 @@ type ScheduledScanSpec struct { // +kubebuilder:validation:Optional // +kubebuilder:validation:Minimum=0 SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"` + // FailedJobsHistoryLimit determines how many failed past Scans will be kept until the oldest one will be deleted, defaults to 3. When set to 0, Scans will be deleted directly after failure // +kubebuilder:validation:Optional // +kubebuilder:validation:Minimum=0 FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"` + // Specifies how to treat concurrent executions of a Job. + // Valid values are: + // - "Allow" (default): allows CronJobs to run concurrently; + // - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet; + // - "Replace": cancels currently running job and replaces it with a new one + // +optional + ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"` + // ScanSpec describes the scan which should be started regularly ScanSpec *ScanSpec `json:"scanSpec"` @@ -43,6 +52,25 @@ type ScheduledScanSpec struct { RetriggerOnScanTypeChange bool `json:"retriggerOnScanTypeChange,omitempty"` } +// ConcurrencyPolicy describes how the job will be handled. +// Only one of the following concurrent policies may be specified. +// If none of the following policies is specified, the default one +// is AllowConcurrent. +// +kubebuilder:validation:Enum=Allow;Forbid;Replace +type ConcurrencyPolicy string + +const ( + // AllowConcurrent allows CronJobs to run concurrently. + AllowConcurrent ConcurrencyPolicy = "Allow" + + // ForbidConcurrent forbids concurrent runs, skipping next run if previous + // hasn't finished yet. + ForbidConcurrent ConcurrencyPolicy = "Forbid" + + // ReplaceConcurrent cancels currently running job and replaces it with a new one. + ReplaceConcurrent ConcurrencyPolicy = "Replace" +) + // ScheduledScanStatus defines the observed state of ScheduledScan type ScheduledScanStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster diff --git a/operator/config/crd/bases/execution.securecodebox.io_scheduledscans.yaml b/operator/config/crd/bases/execution.securecodebox.io_scheduledscans.yaml index 71a7430485..bc12fbce86 100644 --- a/operator/config/crd/bases/execution.securecodebox.io_scheduledscans.yaml +++ b/operator/config/crd/bases/execution.securecodebox.io_scheduledscans.yaml @@ -62,6 +62,17 @@ spec: spec: description: ScheduledScanSpec defines the desired state of ScheduledScan properties: + concurrencyPolicy: + description: 'Specifies how to treat concurrent executions of a Job. + Valid values are: - "Allow" (default): allows CronJobs to run concurrently; + - "Forbid": forbids concurrent runs, skipping next run if previous + run hasn''t finished yet; - "Replace": cancels currently running + job and replaces it with a new one' + enum: + - Allow + - Forbid + - Replace + type: string failedJobsHistoryLimit: description: FailedJobsHistoryLimit determines how many failed past Scans will be kept until the oldest one will be deleted, defaults diff --git a/operator/controllers/execution/scheduledscan_controller.go b/operator/controllers/execution/scheduledscan_controller.go index 2b70c2897b..37cf10109a 100644 --- a/operator/controllers/execution/scheduledscan_controller.go +++ b/operator/controllers/execution/scheduledscan_controller.go @@ -112,8 +112,27 @@ func (r *ScheduledScanReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } + InProgressScans := getScansInProgress(childScans.Items) + // check if it is time to start the next Scan if !time.Now().Before(nextSchedule) { + // check concurrency policy + if scheduledScan.Spec.ConcurrencyPolicy == executionv1.ForbidConcurrent && len(InProgressScans) > 0 { + log.V(8).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(InProgressScans)) + return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil + } + + // ...or instruct us to replace existing ones... + if scheduledScan.Spec.ConcurrencyPolicy == executionv1.ReplaceConcurrent { + for _, scan := range InProgressScans { + // we don't care if the job was already deleted + if err := r.Delete(context.Background(), &scan, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil { + log.Error(err, "unable to delete active job", "job", scan) + return ctrl.Result{}, err + } + } + } + if scheduledScan.Spec.RetriggerOnScanTypeChange == true { // generate hash for current state of the configured ScanType var scanType executionv1.ScanType @@ -237,6 +256,22 @@ func getScansWithState(scans []executionv1.Scan, state string) []executionv1.Sca return newScans } +// Returns a sorted list of scans in progress +func getScansInProgress(scans []executionv1.Scan) []executionv1.Scan { + // Get a sorted list of scans. + var newScans []executionv1.Scan + for _, scan := range scans { + if scan.Status.State != "Done" && scan.Status.State != "Errored" { + newScans = append(newScans, scan) + } + } + sort.Slice(newScans, func(i, j int) bool { + return newScans[i].ObjectMeta.CreationTimestamp.Before(&newScans[j].ObjectMeta.CreationTimestamp) + }) + + return newScans +} + // DeleteOldScans when exceeding the history limit func (r *ScheduledScanReconciler) deleteOldScans(scans []executionv1.Scan, maxCount int32) error { for i, scan := range scans { From 91686b8afc61e41dc7a785769c3340bb8221dfef Mon Sep 17 00:00:00 2001 From: Ilyes Ben Dlala Date: Tue, 13 Jun 2023 10:48:43 +0200 Subject: [PATCH 2/5] #275 Added tests that test the ScheduledScan ConcurrencyPolicy Signed-off-by: Ilyes Ben Dlala --- .../execution/scantype_controller_test.go | 6 +- .../scheduledscan_controller_test.go | 71 ++++++++++++++++++- .../controllers/execution/test_utils_test.go | 5 +- 3 files changed, 76 insertions(+), 6 deletions(-) diff --git a/operator/controllers/execution/scantype_controller_test.go b/operator/controllers/execution/scantype_controller_test.go index 27e5137ff5..2beb22f96f 100644 --- a/operator/controllers/execution/scantype_controller_test.go +++ b/operator/controllers/execution/scantype_controller_test.go @@ -36,7 +36,7 @@ var _ = Describe("ScanType controller", func() { createNamespace(ctx, namespace) createScanType(ctx, namespace) - scheduledScan := createScheduledScanWithInterval(ctx, namespace, true) + scheduledScan := createScheduledScanWithInterval(ctx, namespace, true, 42*time.Hour, executionv1.ForbidConcurrent) // ensure that the ScheduledScan has been triggered waitForScheduledScanToBeTriggered(ctx, namespace, timeout) @@ -77,7 +77,7 @@ var _ = Describe("ScanType controller", func() { createNamespace(ctx, namespace) createScanType(ctx, namespace) - scheduledScan := createScheduledScanWithInterval(ctx, namespace, true) + scheduledScan := createScheduledScanWithInterval(ctx, namespace, true, 42*time.Hour, executionv1.ForbidConcurrent) // ensure that the ScheduledScan has been triggered waitForScheduledScanToBeTriggered(ctx, namespace, timeout) @@ -107,7 +107,7 @@ var _ = Describe("ScanType controller", func() { createNamespace(ctx, namespace) createScanType(ctx, namespace) - scheduledScan := createScheduledScanWithInterval(ctx, namespace, false) + scheduledScan := createScheduledScanWithInterval(ctx, namespace, false, 42*time.Hour, executionv1.ForbidConcurrent) // ensure that the ScheduledScan has been triggered waitForScheduledScanToBeTriggered(ctx, namespace, timeout) diff --git a/operator/controllers/execution/scheduledscan_controller_test.go b/operator/controllers/execution/scheduledscan_controller_test.go index 0e4e4ef923..37b9e6c787 100644 --- a/operator/controllers/execution/scheduledscan_controller_test.go +++ b/operator/controllers/execution/scheduledscan_controller_test.go @@ -9,6 +9,7 @@ package controllers import ( "context" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -75,7 +76,7 @@ var _ = Describe("ScheduledScan controller", func() { createNamespace(ctx, namespace) createScanType(ctx, namespace) - scheduledScan := createScheduledScanWithInterval(ctx, namespace, true) + scheduledScan := createScheduledScanWithInterval(ctx, namespace, true, 42*time.Hour, executionv1.ForbidConcurrent) var scanlist executionv1.ScanList // ensure that the ScheduledScan has been triggered @@ -108,4 +109,72 @@ var _ = Describe("ScheduledScan controller", func() { Expect(scheduledScan.Status.Findings.FindingCategories).Should(Equal(map[string]uint64{"Open Port": 42})) }) }) + + Context("A Scan is triggred due to a Scheduled Scan with a ConcurrencyPolicy", func() { + It("A second scheduled scan should not start before the first one is finished if the concurency policy is set to ForbidConcurrent", func() { + + ctx := context.Background() + namespace := "scheduled-scan-triggerd-concurrency-forbid-test" + createNamespace(ctx, namespace) + createScanType(ctx, namespace) + createScheduledScan(ctx, namespace, true, 1*time.Second, executionv1.ForbidConcurrent) + + var scanlist executionv1.ScanList + // ensure that the ScheduledScan has been triggered + waitForScheduledScanToBeTriggered(ctx, namespace) + k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) + + Expect(scanlist.Items).Should(HaveLen(1)) + time.Sleep(2 * time.Second) + // make sure that no second scan has been triggered + k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) + Expect(scanlist.Items).Should(HaveLen(1)) + + }) + + It("A second scheduled scan should start before the first one is finished if the concurency policy is set to AllowConcurrent", func() { + + ctx := context.Background() + namespace := "scheduled-scan-triggerd-concurrency-allow-test" + createNamespace(ctx, namespace) + createScanType(ctx, namespace) + createScheduledScan(ctx, namespace, true, 1*time.Second, executionv1.AllowConcurrent) + + var scanlist executionv1.ScanList + // ensure that the ScheduledScan has been triggered + waitForScheduledScanToBeTriggered(ctx, namespace) + k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) + Expect(scanlist.Items).ShouldNot(BeEmpty()) + + time.Sleep(2 * time.Second) + + // make sure more than one scan has been triggered + k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) + Expect(scanlist.Items).ShouldNot(HaveLen(1)) + }) + + It("A second scheduled scan should replace the first one, before the first one is finished if the concurency policy is set to ReplaceConcurrent", func() { + + ctx := context.Background() + namespace := "scheduled-scan-triggerd-concurrency-replace-test" + createNamespace(ctx, namespace) + createScanType(ctx, namespace) + createScheduledScan(ctx, namespace, true, 1*time.Second, executionv1.ReplaceConcurrent) + + var scanlist executionv1.ScanList + // ensure that the ScheduledScan has been triggered + waitForScheduledScanToBeTriggered(ctx, namespace) + k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) + Expect(scanlist.Items).Should(HaveLen(1)) + firstScanName := scanlist.Items[0].Name + + time.Sleep(2 * time.Second) + + // make sure the first scan has been replaced + k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) + secondScanName := scanlist.Items[0].Name + Expect(scanlist.Items).Should(HaveLen(1)) + Expect(firstScanName).ShouldNot(Equal(secondScanName)) + }) + }) }) diff --git a/operator/controllers/execution/test_utils_test.go b/operator/controllers/execution/test_utils_test.go index a28d58c848..56b70e3bea 100644 --- a/operator/controllers/execution/test_utils_test.go +++ b/operator/controllers/execution/test_utils_test.go @@ -66,7 +66,7 @@ func createScanType(ctx context.Context, namespace string) { Expect(k8sClient.Create(ctx, scanType)).Should(Succeed()) } -func createScheduledScanWithInterval(ctx context.Context, namespace string, retriggerOnScanTypeChange bool) executionv1.ScheduledScan { +func createScheduledScanWithInterval(ctx context.Context, namespace string, retriggerOnScanTypeChange bool, interval time.Duration, concurrencyPolicy executionv1.ConcurrencyPolicy) executionv1.ScheduledScan { namespaceLocalResourceMode := executionv1.NamespaceLocal scheduledScan := executionv1.ScheduledScan{ @@ -75,8 +75,9 @@ func createScheduledScanWithInterval(ctx context.Context, namespace string, retr Namespace: namespace, }, Spec: executionv1.ScheduledScanSpec{ - Interval: metav1.Duration{Duration: 42 * time.Hour}, + Interval: metav1.Duration{Duration: interval}, RetriggerOnScanTypeChange: retriggerOnScanTypeChange, + ConcurrencyPolicy: concurrencyPolicy, ScanSpec: &executionv1.ScanSpec{ ScanType: "nmap", ResourceMode: &namespaceLocalResourceMode, From 0ce3d7d9081fd08f3479acfbc699b05019f7a1bf Mon Sep 17 00:00:00 2001 From: Ilyes Ben Dlala Date: Fri, 14 Jul 2023 16:48:33 +0200 Subject: [PATCH 3/5] #275 Added k8s events to the handling ConcurencyPolicy of scheduledScan Signed-off-by: Ilyes Ben Dlala --- operator/controllers/execution/scheduledscan_controller.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/operator/controllers/execution/scheduledscan_controller.go b/operator/controllers/execution/scheduledscan_controller.go index 37cf10109a..a9e8926e08 100644 --- a/operator/controllers/execution/scheduledscan_controller.go +++ b/operator/controllers/execution/scheduledscan_controller.go @@ -119,6 +119,7 @@ func (r *ScheduledScanReconciler) Reconcile(ctx context.Context, req ctrl.Reques // check concurrency policy if scheduledScan.Spec.ConcurrencyPolicy == executionv1.ForbidConcurrent && len(InProgressScans) > 0 { log.V(8).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(InProgressScans)) + r.Recorder.Event(&scheduledScan, "Normal", "ConcurrencyPolicyBlocks", "Concurrency policy blocks concurrent runs, skipping") return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil } @@ -128,8 +129,10 @@ func (r *ScheduledScanReconciler) Reconcile(ctx context.Context, req ctrl.Reques // we don't care if the job was already deleted if err := r.Delete(context.Background(), &scan, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil { log.Error(err, "unable to delete active job", "job", scan) + r.Recorder.Event(&scheduledScan, "Warning", "JobDeletionFailed", fmt.Sprintf("Unable to delete active job: %s, error: %v", scan.Name, err)) return ctrl.Result{}, err } + r.Recorder.Event(&scheduledScan, "Normal", "JobReplaced", fmt.Sprintf("Active job %s replaced", scan.Name)) } } From 4e7c8dc6c1007b5a5c21880c5358293596bff721 Mon Sep 17 00:00:00 2001 From: Ilyes Ben Dlala Date: Tue, 18 Jul 2023 09:29:06 +0200 Subject: [PATCH 4/5] #275 Changed the requeue time from 5min to 1min This is done as a compromise between speed and performance see discussion: https://github.com/secureCodeBox/secureCodeBox/pull/1749#discussion_r1258115319 Signed-off-by: Ilyes Ben Dlala --- operator/controllers/execution/scheduledscan_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator/controllers/execution/scheduledscan_controller.go b/operator/controllers/execution/scheduledscan_controller.go index a9e8926e08..3ade791a02 100644 --- a/operator/controllers/execution/scheduledscan_controller.go +++ b/operator/controllers/execution/scheduledscan_controller.go @@ -120,7 +120,7 @@ func (r *ScheduledScanReconciler) Reconcile(ctx context.Context, req ctrl.Reques if scheduledScan.Spec.ConcurrencyPolicy == executionv1.ForbidConcurrent && len(InProgressScans) > 0 { log.V(8).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(InProgressScans)) r.Recorder.Event(&scheduledScan, "Normal", "ConcurrencyPolicyBlocks", "Concurrency policy blocks concurrent runs, skipping") - return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil + return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil } // ...or instruct us to replace existing ones... From 0eaa51f8e686f268fca781c03d8d43fbe4a34f3f Mon Sep 17 00:00:00 2001 From: Ilyes Ben Dlala Date: Tue, 18 Jul 2023 11:12:07 +0200 Subject: [PATCH 5/5] #275 Renamed ScheduledScan creation function to fit the curret state after rebase Signed-off-by: Ilyes Ben Dlala --- .../execution/scheduledscan_controller_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/operator/controllers/execution/scheduledscan_controller_test.go b/operator/controllers/execution/scheduledscan_controller_test.go index 37b9e6c787..3362b2fe7a 100644 --- a/operator/controllers/execution/scheduledscan_controller_test.go +++ b/operator/controllers/execution/scheduledscan_controller_test.go @@ -117,11 +117,11 @@ var _ = Describe("ScheduledScan controller", func() { namespace := "scheduled-scan-triggerd-concurrency-forbid-test" createNamespace(ctx, namespace) createScanType(ctx, namespace) - createScheduledScan(ctx, namespace, true, 1*time.Second, executionv1.ForbidConcurrent) + createScheduledScanWithInterval(ctx, namespace, true, 1*time.Second, executionv1.ForbidConcurrent) var scanlist executionv1.ScanList // ensure that the ScheduledScan has been triggered - waitForScheduledScanToBeTriggered(ctx, namespace) + waitForScheduledScanToBeTriggered(ctx, namespace, timeout) k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) Expect(scanlist.Items).Should(HaveLen(1)) @@ -138,11 +138,11 @@ var _ = Describe("ScheduledScan controller", func() { namespace := "scheduled-scan-triggerd-concurrency-allow-test" createNamespace(ctx, namespace) createScanType(ctx, namespace) - createScheduledScan(ctx, namespace, true, 1*time.Second, executionv1.AllowConcurrent) + createScheduledScanWithInterval(ctx, namespace, true, 1*time.Second, executionv1.AllowConcurrent) var scanlist executionv1.ScanList // ensure that the ScheduledScan has been triggered - waitForScheduledScanToBeTriggered(ctx, namespace) + waitForScheduledScanToBeTriggered(ctx, namespace, timeout) k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) Expect(scanlist.Items).ShouldNot(BeEmpty()) @@ -159,11 +159,11 @@ var _ = Describe("ScheduledScan controller", func() { namespace := "scheduled-scan-triggerd-concurrency-replace-test" createNamespace(ctx, namespace) createScanType(ctx, namespace) - createScheduledScan(ctx, namespace, true, 1*time.Second, executionv1.ReplaceConcurrent) + createScheduledScanWithInterval(ctx, namespace, true, 1*time.Second, executionv1.ReplaceConcurrent) var scanlist executionv1.ScanList // ensure that the ScheduledScan has been triggered - waitForScheduledScanToBeTriggered(ctx, namespace) + waitForScheduledScanToBeTriggered(ctx, namespace, timeout) k8sClient.List(ctx, &scanlist, client.InNamespace(namespace)) Expect(scanlist.Items).Should(HaveLen(1)) firstScanName := scanlist.Items[0].Name