36
36
#define PGAQO_TEXT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_query_texts.stat"
37
37
#define PGAQO_DATA_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_data.stat"
38
38
#define PGAQO_QUERIES_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_queries.stat"
39
+ #define PGAQO_NEIGHBOURS_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_neighbours.stat"
39
40
40
41
#define AQO_DATA_COLUMNS (7)
41
42
#define FormVectorSz (v_name ) (form_vector((v_name), (v_name ## _size)))
@@ -1393,6 +1394,7 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
1393
1394
if (!found ) {
1394
1395
LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
1395
1396
1397
+ elog (NOTICE , "Entering neighbours %d" , fss );
1396
1398
prev = (NeighboursEntry * ) hash_search (fss_neighbours , & key .fss , HASH_ENTER , & found );
1397
1399
if (!found )
1398
1400
{
@@ -1406,6 +1408,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
1406
1408
entry -> list .prev = prev -> data ;
1407
1409
}
1408
1410
prev -> data = entry ;
1411
+ elog (NOTICE , "Entered neighbours %ld, found %s" , prev -> fss , found ? "true" : "false" );
1412
+ prev = (NeighboursEntry * ) hash_search (fss_neighbours , & key .fss , HASH_FIND , & found );
1413
+ elog (NOTICE , "Entered neighbours check 2 key %ld found %s number of entries %ld" , prev -> fss , found ? "true" : "false" , hash_get_num_entries (fss_neighbours ));
1409
1414
1410
1415
LWLockRelease (& aqo_state -> neighbours_lock );
1411
1416
}
@@ -1585,8 +1590,10 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
1585
1590
NeighboursEntry * neighbour_entry ;
1586
1591
1587
1592
found = false;
1593
+ LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
1588
1594
neighbour_entry = (NeighboursEntry * ) hash_search (fss_neighbours , & fss , HASH_FIND , & found );
1589
1595
entry = found ? neighbour_entry -> data : NULL ;
1596
+ elog (NOTICE , "load_aqo_data, find %d, found %d" , fss , found ? 1 : 0 );
1590
1597
1591
1598
while (entry != NULL )
1592
1599
{
@@ -1620,6 +1627,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
1620
1627
1621
1628
entry = entry -> list .prev ;
1622
1629
}
1630
+ LWLockRelease (& aqo_state -> neighbours_lock );
1623
1631
}
1624
1632
1625
1633
Assert (!found || (data -> rows > 0 && data -> rows <= aqo_K ));
@@ -1768,15 +1776,18 @@ _aqo_data_clean(uint64 fs)
1768
1776
/* Fix or remove neighbours htab entry*/
1769
1777
LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
1770
1778
fss_htab_entry = (NeighboursEntry * ) hash_search (fss_neighbours , & entry -> key .fss , HASH_FIND , & found );
1779
+ elog (NOTICE ,"_aqo_data_clean %ld" , fss_htab_entry -> fss );
1771
1780
if (found && fss_htab_entry -> data -> key .fs == fs )
1772
1781
{
1773
1782
if (has_prev )
1774
1783
{
1775
1784
fss_htab_entry -> data = entry -> list .prev ;
1785
+ elog (NOTICE , "_aqo_data_clean, change %ld" , entry -> key .fss );
1776
1786
}
1777
1787
else
1778
1788
{
1779
1789
hash_search (fss_neighbours , & entry -> key .fss , HASH_REMOVE , NULL );
1790
+ elog (NOTICE , "_aqo_data_clean, find %ld" , entry -> key .fss );
1780
1791
}
1781
1792
}
1782
1793
LWLockRelease (& aqo_state -> neighbours_lock );
@@ -2082,6 +2093,89 @@ aqo_queries_update(PG_FUNCTION_ARGS)
2082
2093
PG_RETURN_BOOL (true);
2083
2094
}
2084
2095
2096
+ static bool
2097
+ _deform_neighbours_record_cb (void * data , size_t size )
2098
+ {
2099
+ bool found ;
2100
+ NeighboursEntry * entry ;
2101
+ int64 fss ;
2102
+
2103
+ Assert (LWLockHeldByMeInMode (& aqo_state -> neighbours_lock , LW_EXCLUSIVE ));
2104
+ Assert (size == sizeof (NeighboursEntry ));
2105
+
2106
+ fss = ((NeighboursEntry * ) data )-> fss ;
2107
+ entry = (NeighboursEntry * ) hash_search (fss_neighbours , & fss , HASH_ENTER , & found );
2108
+ Assert (!found );
2109
+ memcpy (entry , data , sizeof (NeighboursEntry ));
2110
+ return true;
2111
+ }
2112
+
2113
+ void
2114
+ aqo_neighbours_load (void )
2115
+ {
2116
+ elog (NOTICE , "Loading aqo_neighbours" );
2117
+ Assert (!LWLockHeldByMe (& aqo_state -> neighbours_lock ));
2118
+
2119
+ LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
2120
+
2121
+ if (hash_get_num_entries (fss_neighbours ) != 0 )
2122
+ {
2123
+ /* Someone have done it concurrently. */
2124
+ elog (LOG , "[AQO] Another backend have loaded neighbours data concurrently." );
2125
+ LWLockRelease (& aqo_state -> neighbours_lock );
2126
+ return ;
2127
+ }
2128
+
2129
+ data_load (PGAQO_NEIGHBOURS_FILE , _deform_neighbours_record_cb , NULL );
2130
+
2131
+ aqo_state -> neighbours_changed = false; /* mem data is consistent with disk */
2132
+ LWLockRelease (& aqo_state -> neighbours_lock );
2133
+ }
2134
+
2135
+ static void *
2136
+ _form_neighbours_record_cb (void * ctx , size_t * size )
2137
+ {
2138
+ HASH_SEQ_STATUS * hash_seq = (HASH_SEQ_STATUS * ) ctx ;
2139
+ NeighboursEntry * entry ;
2140
+
2141
+ * size = sizeof (NeighboursEntry );
2142
+ entry = hash_seq_search (hash_seq );
2143
+ if (entry == NULL )
2144
+ return NULL ;
2145
+
2146
+ return memcpy (palloc (* size ), entry , * size );
2147
+ }
2148
+
2149
+
2150
+ void
2151
+ aqo_neighbours_flush (void )
2152
+ {
2153
+ HASH_SEQ_STATUS hash_seq ;
2154
+ int ret ;
2155
+ long entries ;
2156
+
2157
+ elog (NOTICE , "Flushing aqo_neighbours" );
2158
+
2159
+ LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
2160
+
2161
+ if (!aqo_state -> neighbours_changed )
2162
+ goto end ;
2163
+
2164
+ entries = hash_get_num_entries (fss_neighbours );
2165
+ hash_seq_init (& hash_seq , fss_neighbours );
2166
+ ret = data_store (PGAQO_NEIGHBOURS_FILE , _form_neighbours_record_cb , entries ,
2167
+ (void * ) & hash_seq );
2168
+ if (ret != 0 )
2169
+ hash_seq_term (& hash_seq );
2170
+ else
2171
+ /* Hash table and disk storage are now consistent */
2172
+ aqo_state -> neighbours_changed = false;
2173
+
2174
+ end :
2175
+ LWLockRelease (& aqo_state -> neighbours_lock );
2176
+ }
2177
+
2178
+
2085
2179
static long
2086
2180
aqo_neighbours_reset (void )
2087
2181
{
@@ -2093,8 +2187,11 @@ aqo_neighbours_reset(void)
2093
2187
LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
2094
2188
num_entries = hash_get_num_entries (fss_neighbours );
2095
2189
hash_seq_init (& hash_seq , fss_neighbours );
2190
+ elog (NOTICE , "fss_neighbours num entries: %ld" , num_entries );
2096
2191
while ((entry = hash_seq_search (& hash_seq )) != NULL )
2097
2192
{
2193
+ elog (NOTICE , "delete %ld" , entry -> fss );
2194
+
2098
2195
if (hash_search (fss_neighbours , & entry -> fss , HASH_REMOVE , NULL ) == NULL )
2099
2196
elog (ERROR , "[AQO] hash table corrupted" );
2100
2197
num_remove ++ ;
@@ -2105,8 +2202,10 @@ aqo_neighbours_reset(void)
2105
2202
2106
2203
LWLockRelease (& aqo_state -> neighbours_lock );
2107
2204
2108
- if (num_remove != num_entries )
2109
- elog (ERROR , "[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected." );
2205
+ // if (num_remove != num_entries)
2206
+ // elog(ERROR, "[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected.");
2207
+
2208
+ aqo_neighbours_flush ();
2110
2209
2111
2210
return num_remove ;
2112
2211
}
@@ -2251,15 +2350,18 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
2251
2350
/* Fix or remove neighbours htab entry*/
2252
2351
LWLockAcquire (& aqo_state -> neighbours_lock , LW_EXCLUSIVE );
2253
2352
fss_htab_entry = (NeighboursEntry * ) hash_search (fss_neighbours , & key .fss , HASH_FIND , & found );
2353
+ elog (NOTICE , "aqo_cleanup, find %ld" , key .fss );
2254
2354
if (found && fss_htab_entry -> data -> key .fs == key .fs )
2255
2355
{
2256
2356
if (has_prev )
2257
2357
{
2258
2358
fss_htab_entry -> data = entry -> list .prev ;
2359
+ elog (NOTICE , "aqo_cleanup, change %ld" , key .fss );
2259
2360
}
2260
2361
else
2261
2362
{
2262
2363
hash_search (fss_neighbours , & key .fss , HASH_REMOVE , NULL );
2364
+ elog (NOTICE , "aqo_cleanup, remove %ld" , key .fss );
2263
2365
}
2264
2366
}
2265
2367
LWLockRelease (& aqo_state -> neighbours_lock );
@@ -2294,6 +2396,7 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
2294
2396
aqo_data_flush ();
2295
2397
aqo_qtexts_flush ();
2296
2398
aqo_queries_flush ();
2399
+ aqo_neighbours_flush ();
2297
2400
}
2298
2401
2299
2402
Datum
@@ -2376,6 +2479,7 @@ aqo_drop_class(PG_FUNCTION_ARGS)
2376
2479
aqo_data_flush ();
2377
2480
aqo_qtexts_flush ();
2378
2481
aqo_queries_flush ();
2482
+ aqo_neighbours_flush ();
2379
2483
2380
2484
PG_RETURN_INT32 (cnt );
2381
2485
}
0 commit comments