10000 Merge branch 'main' into renovate/github.com-cloudquery-plugin-sdk-v4… · cloudquery/cloudquery@755722c · GitHub
[go: up one dir, main page]

Skip to content

Commit 755722c

Browse files
authored
Merge branch 'main' into renovate/github.com-cloudquery-plugin-sdk-v4-4.x
2 parents 193bfc5 + 9e1fc3c commit 755722c

File tree

3 files changed

+30
-5
lines changed

3 files changed

+30
-5
lines changed

plugins/destination/clickhouse/client/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ type Client struct {
2222
logger zerolog.Logger
2323
writer *batchwriter.BatchWriter
2424
plugin.UnimplementedSource
25-
batchwriter.UnimplementedDeleteStale
2625
}
2726

2827
var _ plugin.Client = (*Client)(nil)

plugins/destination/clickhouse/client/client_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ func TestPlugin(t *testing.T) {
4141
plugin.TestWriterSuiteRunner(t,
4242
p,
4343
plugin.WriterTestSuiteTests{
44-
SkipUpsert: true,
45-
SkipDeleteStale: true,
44+
SkipUpsert: true,
4645
SafeMigrations: plugin.SafeMigrations{
4746
AddColumn: true,
4847
RemoveColumn: true,

plugins/destination/clickhouse/client/delete.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,42 @@ import (
99
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv/ch/values"
1010
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/util"
1111
"github.com/cloudquery/plugin-sdk/v4/message"
12+
"github.com/cloudquery/plugin-sdk/v4/schema"
1213
)
1314

15+
func (c *Client) DeleteStale(ctx context.Context, messages message.WriteDeleteStales) error {
16+
if len(messages) == 0 {
17+
return nil
18+
}
19+
20+
for _, msg := range messages {
21+
if err := c.conn.Exec(ctx, generateDeleteForDeleteStale(msg), msg.SourceName, msg.SyncTime); err != nil {
22+
return err
23+
}
24+
}
25+
26+
return nil
27+
}
28+
29+
func generateDeleteForDeleteStale(msg *message.WriteDeleteStale) string {
30+
var sb strings.Builder
31+
sb.WriteString("DELETE FROM ")
32+
sb.WriteString(util.SanitizeID(msg.TableName))
33+
sb.WriteString(" WHERE ")
34+
sb.WriteString(util.SanitizeID(schema.CqSourceNameColumn.Name))
35+
sb.WriteString(" = $1 AND toTimeZone(")
36+
sb.WriteString(util.SanitizeID(schema.CqSyncTimeColumn.Name))
37+
sb.WriteString(", 'UTC') < $2")
38+
return sb.String()
39+
}
40+
1441
func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteRecords) error {
1542
if len(messages) == 0 {
1643
return nil
1744
}
1845

1946
for _, msg := range messages {
20-
sql := generateDelete(msg.DeleteRecord)
47+
sql := generateDeleteForDeleteRecord(msg.DeleteRecord)
2148
params, err := extractPredicateValues(msg.DeleteRecord.WhereClause)
2249
if err != nil {
2350
return err
@@ -62,7 +89,7 @@ func unpackArray(s any) []any {
6289
return r
6390
}
6491

65-
func generateDelete(msg message.DeleteRecord) string {
92+
func generateDeleteForDeleteRecord(msg message.DeleteRecord) string {
6693
var sb strings.Builder
6794

6895
sb.WriteString("DELETE FROM ")

0 commit comments

Comments
 (0)
0