8000 add flush and load functions for neighbours · postgrespro/aqo@9c543e3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9c543e3

Browse files
author
Alexandra Pervushina
committed
add flush and load functions for neighbours
1 parent a547fab commit 9c543e3

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

aqo_shared.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ aqo_init_shmem(void)
263263
/* Doesn't use DSA, so can be loaded in postmaster */
264264
aqo_stat_load();
265265
aqo_queries_load();
266+
aqo_neighbours_load();
266267
}
267268
}
268269

@@ -281,6 +282,7 @@ on_shmem_shutdown(int code, Datum arg)
281282
*/
282283
aqo_stat_flush();
283284
aqo_queries_flush();
285+
aqo_neighbours_flush();
284286
return;
285287
}
286288

storage.c

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#define PGAQO_TEXT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_query_texts.stat"
3737
#define PGAQO_DATA_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_data.stat"
3838
#define PGAQO_QUERIES_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_queries.stat"
39+
#define PGAQO_NEIGHBOURS_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_neighbours.stat"
3940

4041
#define AQO_DATA_COLUMNS (7)
4142
#define FormVectorSz(v_name) (form_vector((v_name), (v_name ## _size)))
@@ -1393,6 +1394,7 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13931394
if (!found) {
13941395
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
13951396

1397+
elog(NOTICE, "Entering neighbours %d", fss);
13961398
prev = (NeighboursEntry *) hash_search(fss_neighbours, &key.fss, HASH_ENTER, &found);
13971399
if (!found)
13981400
{
@@ -1406,6 +1408,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
14061408
entry->list.prev = prev->data;
14071409
}
14081410
prev->data = entry;
1411+
elog(NOTICE, "Entered neighbours %ld, found %s", prev->fss, found ? "true" : "false");
1412+
prev = (NeighboursEntry *) hash_search(fss_neighbours, &key.fss, HASH_FIND, &found);
1413+
elog(NOTICE, "Entered neighbours check 2 key %ld found %s number of entries %ld", prev->fss, found ? "true" : "false", hash_get_num_entries(fss_neighbours));
14091414

14101415
LWLockRelease(&aqo_state->neighbours_lock);
14111416
}
@@ -1585,8 +1590,10 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
15851590
NeighboursEntry *neighbour_entry;
15861591

15871592
found = false;
1593+
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
15881594
neighbour_entry = (NeighboursEntry *) hash_search(fss_neighbours, &fss, HASH_FIND, &found);
15891595
entry = found ? neighbour_entry->data : NULL;
1596+
elog(NOTICE, "load_aqo_data, find %d, found %d", fss, found ? 1 : 0);
15901597

15911598
while (entry != NULL)
15921599
{
@@ -1620,6 +1627,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
16201627

16211628
entry = entry->list.prev;
16221629
}
1630+
LWLockRelease(&aqo_state->neighbours_lock);
16231631
}
16241632

16251633
Assert(!found || (data->rows > 0 && data->rows <= aqo_K));
@@ -1768,15 +1776,18 @@ _aqo_data_clean(uint64 fs)
17681776
/* Fix or remove neighbours htab entry*/
17691777
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
17701778
fss_htab_entry = (NeighboursEntry *) hash_search(fss_neighbours, &entry->key.fss, HASH_FIND, &found);
1779+
elog(NOTICE,"_aqo_data_clean %ld", fss_htab_entry->fss);
17711780
if (found && fss_htab_entry->data->key.fs == fs)
17721781
{
17731782
if (has_prev)
17741783
{
17751784
fss_htab_entry->data = entry->list.prev;
1785+
elog(NOTICE, "_aqo_data_clean, change %ld", entry->key.fss);
17761786
}
17771787
else
17781788
{
17791789
hash_search(fss_neighbours, &entry->key.fss, HASH_REMOVE, NULL);
1790+
elog(NOTICE, "_aqo_data_clean, find %ld", entry->key.fss);
17801791
}
17811792
}
17821793
LWLockRelease(&aqo_state->neighbours_lock);
@@ -2082,6 +2093,89 @@ aqo_queries_update(PG_FUNCTION_ARGS)
20822093
PG_RETURN_BOOL(true);
20832094
}
20842095

2096+
static bool
2097+
_deform_neighbours_record_cb(void *data, size_t size)
2098+
{
2099+
bool found;
2100+
NeighboursEntry *entry;
2101+
int64 fss;
2102+
2103+
Assert(LWLockHeldByMeInMode(&aqo_state->neighbours_lock, LW_EXCLUSIVE));
2104+
Assert(size == sizeof(NeighboursEntry));
2105+
2106+
fss = ((NeighboursEntry *) data)->fss;
2107+
entry = (NeighboursEntry *) hash_search(fss_neighbours, &fss, HASH_ENTER, &found);
2108+
Assert(!found);
2109+
memcpy(entry, data, sizeof(NeighboursEntry));
2110+
return true;
2111+
}
2112+
2113+
void
2114+
aqo_neighbours_load(void)
2115+
{
2116+
elog(NOTICE, "Loading aqo_neighbours");
2117+
Assert(!LWLockHeldByMe(&aqo_state->neighbours_lock));
2118+
2119+
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
2120+
2121+
if (hash_get_num_entries(fss_neighbours) != 0)
2122+
{
2123+
/* Someone have done it concurrently. */
2124+
elog(LOG, "[AQO] Another backend have loaded neighbours data concurrently.");
2125+
LWLockRelease(&aqo_state->neighbours_lock);
2126+
return;
2127+
}
2128+
2129+
data_load(PGAQO_NEIGHBOURS_FILE, _deform_neighbours_record_cb, NULL);
2130+
2131+
aqo_state->neighbours_changed = false; /* mem data is consistent with disk */
2132+
LWLockRelease(&aqo_state->neighbours_lock);
2133+
}
2134+
2135+
static void *
2136+
_form_neighbours_record_cb(void *ctx, size_t *size)
2137+
{
2138+
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
2139+
NeighboursEntry *entry;
2140+
2141+
*size = sizeof(NeighboursEntry);
2142+
entry = hash_seq_search(hash_seq);
2143+
if (entry == NULL)
2144+
return NULL;
2145+
2146+
return memcpy(palloc(*size), entry, *size);
2147+
}
2148+
2149+
2150+
void
2151+
aqo_neighbours_flush(void)
2152+
{
2153+
HASH_SEQ_STATUS hash_seq;
2154+
int ret;
2155+
long entries;
2156+
2157+
elog(NOTICE, "Flushing aqo_neighbours");
2158+
2159+
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
2160+
2161+
if (!aqo_state->neighbours_changed)
2162+
goto end;
2163+
2164+
entries = hash_get_num_entries(fss_neighbours);
2165+
hash_seq_init(&hash_seq, fss_neighbours);
2166+
ret = data_store(PGAQO_NEIGHBOURS_FILE, _form_neighbours_record_cb, entries,
2167+
(void *) &hash_seq);
2168+
if (ret != 0)
2169+
hash_seq_term(&hash_seq);
2170+
else
2171+
/* Hash table and disk storage are now consistent */
2172+
aqo_state->neighbours_changed = false;
2173+
2174+
end:
2175+
LWLockRelease(&aqo_state->neighbours_lock);
2176+
}
2177+
2178+
20852179
static long
20862180
aqo_neighbours_reset(void)
20872181
{
@@ -2093,8 +2187,11 @@ aqo_neighbours_reset(void)
20932187
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
20942188
num_entries = hash_get_num_entries(fss_neighbours);
20952189
hash_seq_init(&hash_seq, fss_neighbours);
2190+
elog(NOTICE, "fss_neighbours num entries: %ld", num_entries);
20962191
while ((entry = hash_seq_search(&hash_seq)) != NULL)
20972192
{
2193+
elog(NOTICE, "delete %ld", entry->fss);
2194+
20982195
if (hash_search(fss_neighbours, &entry->fss, HASH_REMOVE, NULL) == NULL)
20992196
elog(ERROR, "[AQO] hash table corrupted");
21002197
num_remove++;
@@ -2105,8 +2202,10 @@ aqo_neighbours_reset(void)
21052202

21062203
LWLockRelease(&aqo_state->neighbours_lock);
21072204

2108-
if (num_remove != num_entries)
2109-
elog(ERROR, "[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected.");
2205+
// if (num_remove != num_entries)
2206+
// elog(ERROR, "[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected.");
2207+
2208+
aqo_neighbours_flush();
21102209

21112210
return num_remove;
21122211
}
@@ -2251,15 +2350,18 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
22512350
/* Fix or remove neighbours htab entry*/
22522351
LWLockAcquire(&aqo_state->neighbours_lock, LW_EXCLUSIVE);
22532352
fss_htab_entry = (NeighboursEntry *) hash_search(fss_neighbours, &key.fss, HASH_FIND, &found);
2353+
elog(NOTICE, "aqo_cleanup, find %ld", key.fss);
22542354
if (found && fss_htab_entry->data->key.fs == key.fs)
22552355
{
22562356
if (has_prev)
22572357
{
22582358
fss_htab_entry->data = entry->list.prev;
2359+
elog(NOTICE, "aqo_cleanup, change %ld", key.fss);
22592360
}
22602361
else
22612362
{
22622363
hash_search(fss_neighbours, &key.fss, HASH_REMOVE, NULL);
2364+
elog(NOTICE, "aqo_cleanup, remove %ld", key.fss);
22632365
}
22642366
}
22652367
LWLockRelease(&aqo_state->neighbours_lock);
@@ -2294,6 +2396,7 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
22942396
aqo_data_flush();
22952397
aqo_qtexts_flush();
22962398
aqo_queries_flush();
2399+
aqo_neighbours_flush();
22972400
}
22982401

22992402
Datum
@@ -2376,6 +2479,7 @@ aqo_drop_class(PG_FUNCTION_ARGS)
23762479
aqo_data_flush();
23772480
aqo_qtexts_flush();
23782481
aqo_queries_flush();
2482+
aqo_neighbours_flush();
23792483

23802484
PG_RETURN_INT32(cnt);
23812485
}

storage.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ extern bool aqo_queries_store(uint64 queryid, uint64 fs, bool learn_aqo,
128128
extern void aqo_queries_flush(void);
129129
extern void aqo_queries_load(void);
130130

131+
extern void aqo_neighbours_flush(void);
132+
extern void aqo_neighbours_load(void);
133+
131134
/*
132135
* Machinery for deactivated queries cache.
133136
* TODO: Should live in a custom memory context

0 commit comments

Comments
 (0)
0