10000 drop queue · gigapi/gigapi@d9d5199 · GitHub
[go: up one dir, main page]

Skip to content

Commit d9d5199

Browse files
committed
drop queue
1 parent f3fdaac commit d9d5199

File tree

4 files changed

+82
-9
lines changed

4 files changed

+82
-9
lines changed

merge/index/json_index.go

+60
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type JSONIndex struct {
3939
stop context.CancelFunc
4040
lastId uint32
4141

42+
dropQueue []string
4243
parquetSizeBytes int64
4344
rowCount int64
4445
minTime int64
@@ -74,6 +75,48 @@ func NewJSONIndexForPartition(t *shared.Table, values [][2]string) (shared.Index
7475
return res, err
7576
}
7677

78+
func (J *JSONIndex) AddToDropQueue(files []string) utils.Promise[int32] {
79+
J.m.Lock()
80+
defer J.m.Unlock()
81+
82+
J.dropQueue = append(J.dropQueue, files...)
83+
p := utils.New[int32]()
84+
J.promises = append(J.promises, p)
85+
J.doUpdate()
86+
return p
87+
}
88+
89+
func (J *JSONIndex) RmFromDropQueue(files []string) utils.Promise[int32] {
90+
J.m.Lock()
91+
defer J.m.Unlock()
92+
93+
updated := false
94+
for i := len(J.dropQueue) - 1; i >= 0; i-- {
95+
for _, file := range files {
96+
if J.dropQueue[i] != file {
97+
continue
98+
}
99+
J.dropQueue[i] = J.dropQueue[len(J.dropQueue)-1]
100+
J.dropQueue = J.dropQueue[:len(J.dropQueue)-1]
101+
updated = true
102+
break
103+
}
104+
}
105+
106+
if !updated {
107+
return utils.Fulfilled[int32](nil, 0)
108+
}
109+
110+
p := utils.New[int32]()
111+
J.promises = append(J.promises, p)
112+
J.doUpdate()
113+
return p
114+
}
115+
116+
func (J *JSONIndex) GetDropQueue() []string {
117+
return J.dropQueue
118+
}
119+
77120
func (J *JSONIndex) populate() error {
78121
if _, err := os.Stat(path.Join(J.idxPath, "metadata.json")); os.IsNotExist(err) {
79122
return nil
@@ -88,6 +131,11 @@ func (J *JSONIndex) populate() error {
88131
iter := jsoniter.Parse(jsoniter.ConfigDefault, f, 4096)
89132
iter.ReadMapCB(func(iterator *jsoniter.Iterator, s string) bool {
90133
switch s {
134+
case "drop_queue":
135+
for iterator.ReadArray() {
136+
dropQueueEntry := iterator.ReadString()
137+
J.dropQueue = append(J.dropQueue, dropQueueEntry)
138+
}
91139
case "type":
92140
iterator.Skip()
93141
case "parquet_size_bytes":
@@ -265,6 +313,7 @@ func (J *JSONIndex) flush() {
265313
J.m.Lock()
266314
J.updateCtx, J.doUpdate = context.WithCancel(context.Background())
267315
var entries []string
316+
dropQueue := J.dropQueue
268317
parquetSizeBytes := J.parquetSizeBytes
269318
promises := J.promises
270319
J.promises = nil
@@ -318,6 +367,17 @@ func (J *JSONIndex) flush() {
318367
stream.WriteObjectField("wal_sequence")
319368
stream.WriteInt64(0)
320369

370+
stream.WriteMore()
57AE
371+
stream.WriteObjectField("drop_queue")
372+
stream.WriteArrayStart()
373+
for i, d := range dropQueue {
374+
if i > 0 {
375+
stream.WriteMore()
376+
}
377+
stream.WriteString(d)
378+
}
379+
stream.WriteArrayEnd()
380+
321381
stream.WriteMore()
322382
stream.WriteObjectField("files")
323383
stream.WriteArrayStart()

merge/service/hive_partition.go

+8
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ func NewPartition(values [][2]string, tmpPath, dataPath string, t *shared.Table)
4141
if err != nil {
4242
return nil, err
4343
}
44+
dropQueue := res.index.GetDropQueue()
45+
go func() {
46+
time.Sleep(time.Second * 10)
47+
for _, file := range dropQueue {
48+
os.Remove(filepath.Join(res.dataPath, file))
49+
}
50+
res.index.RmFromDropQueue(dropQueue)
51+
}()
4452
}
4553
err := res.initServices(tmpPath, dataPath, t)
4654
return res, err

merge/service/merge_service_fs.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,22 @@ func (f *fsMergeService) mergeFirstIteration(p PlanMerge) error {
233233
}
234234
}
235235

236+
f.cleanup(p)
237+
238+
return nil
239+
}
240+
241+
func (f *fsMergeService) cleanup(p PlanMerge) {
236242
for _, file := range p.From {
237243
_file := file
238244
go func() {
239245
<-time.After(time.Second * 30)
240246
os.Remove(_file)
247+
if f.index != nil {
248+
f.index.RmFromDropQueue([]string{_file})
249+
}
241250
}()
242251
}
243-
return nil
244252
}
245253

246254
func (f *fsMergeService) mergeMany(p PlanMerge, tmpFilePath, finalFilePath string) error {
@@ -303,14 +311,7 @@ func (f *fsMergeService) merge(p PlanMerge) error {
303311
}
304312
}
305313

306-
for _, file := range p.From {
307-
_file := file
308-
go func() {
309-
<-time.After(time.Second * 30)
310-
os.Remove(_file)
311-
}()
312-
}
313-
314+
f.cleanup(p)
314315
return nil
315316
}
316317

@@ -352,6 +353,7 @@ func (f *fsMergeService) updateIndex(merge PlanMerge) error {
352353
Max: _max,
353354
}
354355
prom := f.index.Batch([]*shared.IndexEntry{newIdx}, toDelete)
356+
f.index.AddToDropQueue(merge.From)
355357
_, err = prom.Get()
356358
return err
357359
}

merge/shared/table.go

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type Index interface {
2424
Get(path string) *IndexEntry
2525
Run()
2626
Stop()
27+
AddToDropQueue(files []string) utils.Promise[int32]
28+
RmFromDropQueue(files []string) utils.Promise[int32]
29+
GetDropQueue() []string
2730
}
2831

2932
type Table struct {

0 commit comments

Comments
 (0)
0