@@ -9,15 +9,42 @@ import (
9
9
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv/ch/values"
10
10
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/util"
11
11
"github.com/cloudquery/plugin-sdk/v4/message"
12
+ "github.com/cloudquery/plugin-sdk/v4/schema"
12
13
)
13
14
15
+ func (c * Client ) DeleteStale (ctx context.Context , messages message.WriteDeleteStales ) error {
16
+ if len (messages ) == 0 {
17
+ return nil
18
+ }
19
+
20
+ for _ , msg := range messages {
21
+ if err := c .conn .Exec (ctx , generateDeleteForDeleteStale (msg ), msg .SourceName , msg .SyncTime ); err != nil {
22
+ return err
23
+ }
24
+ }
25
+
26
+ return nil
27
+ }
28
+
29
+ func generateDeleteForDeleteStale (msg * message.WriteDeleteStale ) string {
30
+ var sb strings.Builder
31
+ sb .WriteString ("DELETE FROM " )
32
+ sb .WriteString (util .SanitizeID (msg .TableName ))
33
+ sb .WriteString (" WHERE " )
34
+ sb .WriteString (util .SanitizeID (schema .CqSourceNameColumn .Name ))
35
+ sb .WriteString (" = $1 AND toTimeZone(" )
36
+ sb .WriteString (util .SanitizeID (schema .CqSyncTimeColumn .Name ))
37
+ sb .WriteString (", 'UTC') < $2" )
38
+ return sb .String ()
39
+ }
40
+
14
41
func (c * Client ) DeleteRecord (ctx context.Context , messages message.WriteDeleteRecords ) error {
15
42
if len (messages ) == 0 {
16
43
return nil
17
44
}
18
45
19
46
for _ , msg := range messages {
20
- sql := generateDelete (msg .DeleteRecord )
47
+ sql := generateDeleteForDeleteRecord (msg .DeleteRecord )
21
48
params , err := extractPredicateValues (msg .DeleteRecord .WhereClause )
22
49
if err != nil {
23
50
return err
@@ -62,7 +89,7 @@ func unpackArray(s any) []any {
62
89
return r
63
90
}
64
91
65
- func generateDelete (msg message.DeleteRecord ) string {
92
+ func generateDeleteForDeleteRecord (msg message.DeleteRecord ) string {
66
93
var sb strings.Builder
67
94
68
95
sb .WriteString ("DELETE FROM " )
0 commit comments