8000 Cumulative commit on the 'learn on statement timeout' feature. · postgrespro/aqo@41b0bff · GitHub
[go: up one dir, main page]

Skip to content

Commit 41b0bff

Browse files
committed
Cumulative commit on the 'learn on statement timeout' feature.
Now it works quite stable, merge it into master branch.
1 parent ae557bd commit 41b0bff

File tree

8 files changed

+373
-99
lines changed

8 files changed

+373
-99
lines changed

aqo.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,7 @@ _PG_init(void)
244244
RegisterAQOPlanNodeMethods();
245245

246246
MarkGUCPrefixReserved("aqo");
247-
RequestAddinShmemSpace(MAXALIGN(sizeof(AQOSharedState)));
248-
249-
lc_init();
247+
RequestAddinShmemSpace(aqo_memsize());
250248
}
251249

252250
PG_FUNCTION_INFO_V1(invalidate_deactivated_queries_cache);

aqo_shared.c

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,43 +8,169 @@
88

99
#include "aqo_shared.h"
1010

11+
12+
typedef struct
13+
{
14+
int magic;
15+
uint32 total_size;
16+
uint32 delta;
17+
} dsm_seg_hdr;
18+
19+
#define free_space(hdr) (uint32) (temp_storage_size - sizeof(dsm_seg_hdr) - hdr->delta)
20+
#define addr(delta) ((char *) dsm_segment_address(seg) + sizeof(dsm_seg_hdr) + delta)
21+
1122
shmem_startup_hook_type prev_shmem_startup_hook = NULL;
12-
static AQOSharedState *aqo_state = NULL;
13-
unsigned long temp_storage_size = 1024 * 1024; /* Storage size, in bytes */
14-
void *temp_storage = NULL;
23+
AQOSharedState *aqo_state = NULL;
24+
HTAB *fss_htab = NULL;
25+
static int aqo_htab_max_items = 1000;
26+
static uint32 temp_storage_size = 1024 * 1024 * 10; /* Storage size, in bytes */
27+
static dsm_segment *seg = NULL;
1528

16-
static void
17-
attach_dsm_segment(void)
29+
30+
static void aqo_detach_shmem(int code, Datum arg);
31+
32+
33+
void *
34+
get_dsm_all(uint32 *size)
1835
{
19-
dsm_segment *seg;
36+
dsm_seg_hdr *hdr;
2037

21-
LWLockAcquire(&aqo_state->lock, LW_EXCLUSIVE);
38+
Assert(LWLockHeldByMeInMode(&aqo_state->lock, LW_EXCLUSIVE));
2239

23-
if (aqo_state->dsm_handler != DSM_HANDLE_INVALID)
40+
if (aqo_state->dsm_handler == DSM_HANDLE_INVALID)
41+
{
42+
/* Fast path. No any cached data exists. */
43+
*size = 0;
44+
return NULL;
45+
}
46+
47+
if (!seg)
2448
{
49+
/* if segment exists we should connect to */
2550
seg = dsm_attach(aqo_state->dsm_handler);
51+
Assert(seg);
52+
dsm_pin_mapping(seg);
53+
on_shmem_exit(aqo_detach_shmem, (Datum) &aqo_state->dsm_handler);
54+
}
55+
56+
hdr = (dsm_seg_hdr *) dsm_segment_address(seg);
57+
*size = hdr->delta;
58+
return (char *) hdr + sizeof(dsm_seg_hdr);
59+
}
60+
61+
/*
62+
* Cleanup of DSM cache: set header into default state and zero the memory block.
63+
* This operation can be coupled with the cache dump, so we do it under an external
64+
* hold of the lock.
65+
*/
66+
void
67+
reset_dsm_cache(void)
68+
{
69+
dsm_seg_hdr *hdr;
70+
char *start;
71+
72+
Assert(LWLockHeldByMeInMode(&aqo_state->lock, LW_EXCLUSIVE));
73+
74+
if (aqo_state->dsm_handler == DSM_HANDLE_INVALID)
75+
/* Fast path. No any cached data exists. */
76+
return;
77+
78+
Assert(seg);
79+
80+
hdr = (dsm_seg_hdr *) dsm_segment_address(seg);
81+
start = (char *) hdr + sizeof(dsm_seg_hdr);
82+
83+
/* Reset the cache */
84+
memset(start, 0, hdr->delta);
85+
86+
hdr->delta = 0;
87+
hdr->total_size = temp_storage_size - sizeof(dsm_seg_hdr);
88+
}
89+
90+
char *
91+
get_cache_address(void)
92+
{
93+
dsm_seg_hdr *hdr;
94+
95+
Assert(LWLockHeldByMeInMode(&aqo_state->lock, LW_EXCLUSIVE) ||
96+
LWLockHeldByMeInMode(&aqo_state->lock, LW_SHARED));
97+
98+
if (aqo_state->dsm_handler != DSM_HANDLE_INVALID)
99+
{
100+
if (!seg)
101+
{
102+
/* Another process created the segment yet. Just attach to. */
103+
seg = dsm_attach(aqo_state->dsm_handler);
104+
dsm_pin_mapping(seg);
105+
on_shmem_exit(aqo_detach_shmem, (Datum) &aqo_state->dsm_handler);
106+
}
107+
108+
hdr = (dsm_seg_hdr *) dsm_segment_address(seg);
26109
}
27110
else
28111
{
112+
/*
113+
* First request for DSM cache in this instance.
114+
* Create the DSM segment. Pin it to live up to instance shutdown.
115+
* Don't forget to detach DSM segment before an exit.
116+
*/
29117
seg = dsm_create(temp_storage_size, 0);
118+
dsm_pin_mapping(seg);
119+
dsm_pin_segment(seg);
30120
aqo_state->dsm_handler = dsm_segment_handle(seg);
121+
on_shmem_exit(aqo_detach_shmem, (Datum) &aqo_state->dsm_handler);
122+
123+
hdr = (dsm_seg_hdr *) dsm_segment_address(seg);
124+
hdr->magic = AQO_SHARED_MAGIC;
125+
hdr->delta = 0;
126+
hdr->total_size = temp_storage_size - sizeof(dsm_seg_hdr);
31127
}
32128

33-
temp_storage = dsm_segment_address(seg);
34-
LWLockRelease(&aqo_state->lock);
129+
Assert(seg);
130+
Assert(hdr->magic == AQO_SHARED_MAGIC && hdr->total_size > 0);
131+
132+
return (char *) hdr + sizeof(dsm_seg_hdr);
133+
}
134+
135+
uint32
136+
get_dsm_cache_pos(uint32 size)
137+
{
138+
dsm_seg_hdr *hdr;
139+
uint32 pos;
140+
141+
Assert(LWLockHeldByMeInMode(&aqo_state->lock, LW_EXCLUSIVE) ||
142+
LWLockHeldByMeInMode(&aqo_state->lock, LW_SHARED));
143+
144+
(void) get_cache_address();
145+
hdr = (dsm_seg_hdr *) dsm_segment_address(seg);
146+
147+
if (free_space(hdr) < size || size == 0)
148+
elog(ERROR,
149+
"DSM cache can't allcoate a mem block. Required: %u, free: %u",
150+
size, free_space(hdr));
151+
152+
pos = hdr->delta;
153+
hdr->delta += size;
154+
Assert(free_space(hdr) >= 0);
155+
return pos;
35156
}
36157

37158
static void
38159
aqo_detach_shmem(int code, Datum arg)
39160
{
40-
dsm_handle handler = *(dsm_handle *) arg;
41-
dsm_detach(dsm_find_mapping(handler));
161+
if (seg != NULL)
162+
dsm_detach(seg);
163+
seg = NULL;
42164
}
43165

44166
void
45167
aqo_init_shmem(void)
46168
{
47169
bool found;
170+
HASHCTL info;
171+
172+
aqo_state = NULL;
173+
fss_htab = NULL;
48174

49175
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
50176
aqo_state = ShmemInitStruct("aqo", sizeof(AQOSharedState), &found);
@@ -54,8 +180,26 @@ aqo_init_shmem(void)
54180
LWLockInitialize(&aqo_state->lock, LWLockNewTrancheId());
55181
aqo_state->dsm_handler = DSM_HANDLE_INVALID;
56182
}
183+
184+
info.keysize = sizeof(htab_key);
185+
info.entrysize = sizeof(htab_entry);
186+
fss_htab = ShmemInitHash("aqo hash",
187+
aqo_htab_max_items, aqo_htab_max_items,
188+
&info,
189+
HASH_ELEM | HASH_BLOBS);
190+
57191
LWLockRelease(AddinShmemInitLock);
58192

59193
LWLockRegisterTranche(aqo_state->lock.tranche, "aqo");
60-
on_shmem_exit(aqo_detach_shmem, (Datum) &aqo_state->dsm_handler);
194+
}
195+
196+
Size
197+
aqo_memsize(void)
198+
{
199+
Size size;
200+
201+
size = MAXALIGN(sizeof(AQOSharedState));
202+
size = add_size(size, hash_estimate_size(aqo_htab_max_items, sizeof(htab_entry)));
203+
204+
return size;
61205
}

aqo_shared.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@
66
#include "storage/ipc.h"
77
#include "storage/lwlock.h"
88

9+
#define AQO_SHARED_MAGIC 0x053163
10+
11+
typedef struct
12+
{
13+
/* XXX we assume this struct contains no padding bytes */
14+
uint64 fs;
15+
int64 fss;
16+
} htab_key;
17+
18+
typedef struct
19+
{
20+
htab_key key;
21+
uint32 hdr_off; /* offset of data in DSM cache */
22+
} htab_entry;
923

1024
typedef struct AQOSharedState
1125
{
@@ -15,8 +29,15 @@ typedef struct AQOSharedState
1529

1630

1731
extern shmem_startup_hook_type prev_shmem_startup_hook;
32+
extern AQOSharedState *aqo_state;
33+
extern HTAB *fss_htab;
1834

1935

36+
extern Size aqo_memsize(void);
37+
extern void reset_dsm_cache(void);
38+
extern void *get_dsm_all(uint32 *size);
39+
extern char *get_cache_address(void);
40+
extern uint32 get_dsm_cache_pos(uint32 size);
2041
extern void aqo_init_shmem(void);
2142

2243
#endif /* AQO_SHARED_H */

0 commit comments

Comments
 (0)
0