8
8
9
9
#include "aqo_shared.h"
10
10
11
+
12
+ typedef struct
13
+ {
14
+ int magic ;
15
+ uint32 total_size ;
16
+ uint32 delta ;
17
+ } dsm_seg_hdr ;
18
+
19
+ #define free_space (hdr ) (uint32) (temp_storage_size - sizeof(dsm_seg_hdr) - hdr->delta)
20
+ #define addr (delta ) ((char *) dsm_segment_address(seg) + sizeof(dsm_seg_hdr) + delta)
21
+
11
22
shmem_startup_hook_type prev_shmem_startup_hook = NULL ;
12
- static AQOSharedState * aqo_state = NULL ;
13
- unsigned long temp_storage_size = 1024 * 1024 ; /* Storage size, in bytes */
14
- void * temp_storage = NULL ;
23
+ AQOSharedState * aqo_state = NULL ;
24
+ HTAB * fss_htab = NULL ;
25
+ static int aqo_htab_max_items = 1000 ;
26
+ static uint32 temp_storage_size = 1024 * 1024 * 10 ; /* Storage size, in bytes */
27
+ static dsm_segment * seg = NULL ;
15
28
16
- static void
17
- attach_dsm_segment (void )
29
+
30
+ static void aqo_detach_shmem (int code , Datum arg );
31
+
32
+
33
+ void *
34
+ get_dsm_all (uint32 * size )
18
35
{
19
- dsm_segment * seg ;
36
+ dsm_seg_hdr * hdr ;
20
37
21
- LWLockAcquire ( & aqo_state -> lock , LW_EXCLUSIVE );
38
+ Assert ( LWLockHeldByMeInMode ( & aqo_state -> lock , LW_EXCLUSIVE ) );
22
39
23
- if (aqo_state -> dsm_handler != DSM_HANDLE_INVALID )
40
+ if (aqo_state -> dsm_handler == DSM_HANDLE_INVALID )
41
+ {
42
+ /* Fast path. No any cached data exists. */
43
+ * size = 0 ;
44
+ return NULL ;
45
+ }
46
+
47
+ if (!seg )
24
48
{
49
+ /* if segment exists we should connect to */
25
50
seg = dsm_attach (aqo_state -> dsm_handler );
51
+ Assert (seg );
52
+ dsm_pin_mapping (seg );
53
+ on_shmem_exit (aqo_detach_shmem , (Datum ) & aqo_state -> dsm_handler );
54
+ }
55
+
56
+ hdr = (dsm_seg_hdr * ) dsm_segment_address (seg );
57
+ * size = hdr -> delta ;
58
+ return (char * ) hdr + sizeof (dsm_seg_hdr );
59
+ }
60
+
61
+ /*
62
+ * Cleanup of DSM cache: set header into default state and zero the memory block.
63
+ * This operation can be coupled with the cache dump, so we do it under an external
64
+ * hold of the lock.
65
+ */
66
+ void
67
+ reset_dsm_cache (void )
68
+ {
69
+ dsm_seg_hdr * hdr ;
70
+ char * start ;
71
+
72
+ Assert (LWLockHeldByMeInMode (& aqo_state -> lock , LW_EXCLUSIVE ));
73
+
74
+ if (aqo_state -> dsm_handler == DSM_HANDLE_INVALID )
75
+ /* Fast path. No any cached data exists. */
76
+ return ;
77
+
78
+ Assert (seg );
79
+
80
+ hdr = (dsm_seg_hdr * ) dsm_segment_address (seg );
81
+ start = (char * ) hdr + sizeof (dsm_seg_hdr );
82
+
83
+ /* Reset the cache */
84
+ memset (start , 0 , hdr -> delta );
85
+
86
+ hdr -> delta = 0 ;
87
+ hdr -> total_size = temp_storage_size - sizeof (dsm_seg_hdr );
88
+ }
89
+
90
+ char *
91
+ get_cache_address (void )
92
+ {
93
+ dsm_seg_hdr * hdr ;
94
+
95
+ Assert (LWLockHeldByMeInMode (& aqo_state -> lock , LW_EXCLUSIVE ) ||
96
+ LWLockHeldByMeInMode (& aqo_state -> lock , LW_SHARED ));
97
+
98
+ if (aqo_state -> dsm_handler != DSM_HANDLE_INVALID )
99
+ {
100
+ if (!seg )
101
+ {
102
+ /* Another process created the segment yet. Just attach to. */
103
+ seg = dsm_attach (aqo_state -> dsm_handler );
104
+ dsm_pin_mapping (seg );
105
+ on_shmem_exit (aqo_detach_shmem , (Datum ) & aqo_state -> dsm_handler );
106
+ }
107
+
108
+ hdr = (dsm_seg_hdr * ) dsm_segment_address (seg );
26
109
}
27
110
else
28
111
{
112
+ /*
113
+ * First request for DSM cache in this instance.
114
+ * Create the DSM segment. Pin it to live up to instance shutdown.
115
+ * Don't forget to detach DSM segment before an exit.
116
+ */
29
117
seg = dsm_create (temp_storage_size , 0 );
118
+ dsm_pin_mapping (seg );
119
+ dsm_pin_segment (seg );
30
120
aqo_state -> dsm_handler = dsm_segment_handle (seg );
121
+ on_shmem_exit (aqo_detach_shmem , (Datum ) & aqo_state -> dsm_handler );
122
+
123
+ hdr = (dsm_seg_hdr * ) dsm_segment_address (seg );
124
+ hdr -> magic = AQO_SHARED_MAGIC ;
125
+ hdr -> delta = 0 ;
126
+ hdr -> total_size = temp_storage_size - sizeof (dsm_seg_hdr );
31
127
}
32
128
33
- temp_storage = dsm_segment_address (seg );
34
- LWLockRelease (& aqo_state -> lock );
129
+ Assert (seg );
130
+ Assert (hdr -> magic == AQO_SHARED_MAGIC && hdr -> total_size > 0 );
131
+
132
+ return (char * ) hdr + sizeof (dsm_seg_hdr );
133
+ }
134
+
135
+ uint32
136
+ get_dsm_cache_pos (uint32 size )
137
+ {
138
+ dsm_seg_hdr * hdr ;
139
+ uint32 pos ;
140
+
141
+ Assert (LWLockHeldByMeInMode (& aqo_state -> lock , LW_EXCLUSIVE ) ||
142
+ LWLockHeldByMeInMode (& aqo_state -> lock , LW_SHARED ));
143
+
144
+ (void ) get_cache_address ();
145
+ hdr = (dsm_seg_hdr * ) dsm_segment_address (seg );
146
+
147
+ if (free_space (hdr ) < size || size == 0 )
148
+ elog (ERROR ,
149
+ "DSM cache can't allcoate a mem block. Required: %u, free: %u" ,
150
+ size , free_space (hdr ));
151
+
152
+ pos = hdr -> delta ;
153
+ hdr -> delta += size ;
154
+ Assert (free_space (hdr ) >= 0 );
155
+ return pos ;
35
156
}
36
157
37
158
static void
38
159
aqo_detach_shmem (int code , Datum arg )
39
160
{
40
- dsm_handle handler = * (dsm_handle * ) arg ;
41
- dsm_detach (dsm_find_mapping (handler ));
161
+ if (seg != NULL )
162
+ dsm_detach (seg );
163
+ seg = NULL ;
42
164
}
43
165
44
166
void
45
167
aqo_init_shmem (void )
46
168
{
47
169
bool found ;
170
+ HASHCTL info ;
171
+
172
+ aqo_state = NULL ;
173
+ fss_htab = NULL ;
48
174
49
175
LWLockAcquire (AddinShmemInitLock , LW_EXCLUSIVE );
50
176
aqo_state = ShmemInitStruct ("aqo" , sizeof (AQOSharedState ), & found );
@@ -54,8 +180,26 @@ aqo_init_shmem(void)
54
180
LWLockInitialize (& aqo_state -> lock , LWLockNewTrancheId ());
55
181
aqo_state -> dsm_handler = DSM_HANDLE_INVALID ;
56
182
}
183
+
184
+ info .keysize = sizeof (htab_key );
185
+ info .entrysize = sizeof (htab_entry );
186
+ fss_htab = ShmemInitHash ("aqo hash" ,
187
+ aqo_htab_max_items , aqo_htab_max_items ,
188
+ & info ,
189
+ HASH_ELEM | HASH_BLOBS );
190
+
57
191
LWLockRelease (AddinShmemInitLock );
58
192
59
193
LWLockRegisterTranche (aqo_state -> lock .tranche , "aqo" );
60
- on_shmem_exit (aqo_detach_shmem , (Datum ) & aqo_state -> dsm_handler );
194
+ }
195
+
196
+ Size
197
+ aqo_memsize (void )
198
+ {
199
+ Size size ;
200
+
201
+ size = MAXALIGN (sizeof (AQOSharedState ));
202
+ size = add_size (size , hash_estimate_size (aqo_htab_max_items , sizeof (htab_entry )));
203
+
204
+ return size ;
61
205
}
0 commit comments