1
1
#include "ruby/ruby.h"
2
+ #include <assert.h>
3
+ #include "gc.h"
2
4
#include <stdio.h>
3
5
#include <pthread.h>
4
6
11
13
#define DEQUE_FULL 0
12
14
#define DEQUE_EMPTY -1
13
15
16
+ #define GC_THREADING_DEBUG 1
17
+
18
+ #ifdef GC_THREADING_DEBUG
19
+ #define debug_print (...) \
20
+ printf(__VA_ARGS__)
21
+ #else
22
+ #define debug_print (...) \
23
+ //noop
24
+ #endif
25
+
26
+ /* A mod function that always gives positive results */
27
+ #define POS_MOD (a , b ) \
28
+ (((a) % (b) + b) % b)
29
+
30
+ #define MIN (a ,b ) \
31
+ ((a) < (b) ? (a) : (b))
32
+
33
+
14
34
extern void gc_do_mark (void * objspace , VALUE ptr );
15
35
extern void gc_start_mark (void * objspace );
16
-
36
+ pthread_key_t thread_id_k ;
17
37
/**
18
- * Dequeue
38
+ * Deque
19
39
*/
20
40
21
41
typedef struct deque_struct {
@@ -33,7 +53,7 @@ static void deque_init(deque_t* deque, int max_length) {
33
53
deque -> buffer = buffer ;
34
54
deque -> max_length = max_length ;
35
55
deque -> length = 0 ;
36
- deque -> head = deque -> tail = -1 ;
56
+ deque -> head = deque -> tail = 0 ;
37
57
}
38
58
39
59
static void deque_destroy (deque_t * deque ) {
@@ -52,14 +72,14 @@ static int deque_full_p(deque_t* deque) {
52
72
return deque -> length == deque -> max_length ;
53
73
}
54
74
75
+
55
76
static int deque_push (deque_t * deque , VALUE val ) {
56
77
if (deque_full_p (deque ))
57
78
return 0 ;
58
79
59
- if (deque_empty_p (deque ))
60
- deque -> head = 0 ;
80
+ if (! deque_empty_p (deque ))
81
+ deque -> tail = POS_MOD ( deque -> tail + 1 , deque -> max_length ) ;
61
82
62
- deque -> tail = (deque -> tail + 1 ) % deque -> max_length ;
63
83
deque -> buffer [deque -> tail ] = val ;
64
84
deque -> length ++ ;
65
85
return 1 ;
@@ -69,33 +89,26 @@ static VALUE deque_pop(deque_t* deque) {
69
89
VALUE rtn ;
70
90
if (deque_empty_p (deque ))
71
91
return DEQUE_EMPTY ;
92
+ assert (deque -> tail >= 0 );
93
+ rtn = deque -> buffer [deque -> tail ];
94
+
95
+ deque -> tail = POS_MOD (deque -> tail - 1 , deque -> max_length );
72
96
73
- rtn = deque -> buffer [deque -> tail ];
74
- if (deque -> length - 1 == 0 ) {
75
- //Reset head and tail to beginning
76
- deque -> head = deque -> tail = -1 ;
77
- }
78
- else {
79
- deque -> tail = (deque -> tail - 1 ) % deque -> max_length ;
80
- }
81
97
deque -> length -- ;
82
98
return rtn ;
83
99
}
84
100
85
101
static VALUE deque_pop_back (deque_t * deque ) {
86
102
VALUE rtn ;
87
-
103
+ int <
106D2
span class="pl-s1 x">index;
88
104
if (deque_empty_p (deque ))
89
105
return DEQUE_EMPTY ;
106
+ index = deque -> head ;
107
+ assert (index >= 0 );
108
+ rtn = deque -> buffer [index ];
109
+
110
+ deque -> head = POS_MOD (deque -> head - 1 , deque -> max_length );
90
111
91
- rtn = deque -> buffer [deque -> head ];
92
- if (deque -> length - 1 == 0 ) {
93
- //Reset head and tail to beginning if this call empties the deque
94
- deque -> head = deque -> tail = -1 ;
95
- }
96
- else {
97
- deque -> head = (deque -> head - 1 ) % deque -> max_length ;
98
- }
99
112
deque -> length -- ;
100
113
return rtn ;
101
114
}
@@ -106,16 +119,16 @@ static VALUE deque_pop_back(deque_t* deque) {
106
119
107
120
typedef struct global_queue_struct {
108
121
unsigned int waiters ;
109
- unsigned int count ;
110
122
deque_t deque ;
111
123
pthread_mutex_t lock ;
112
124
pthread_cond_t wait_condition ;
113
125
unsigned int complete ;
114
126
} global_queue_t ;
115
127
128
+ #define global_queue_count global_queue->deque.length
129
+
116
130
static void global_queue_init (global_queue_t * global_queue ) {
117
131
global_queue -> waiters = 0 ;
118
- global_queue -> count = 0 ;
119
132
deque_init (& (global_queue -> deque ), GLOBAL_QUEUE_SIZE );
120
133
pthread_mutex_init (& global_queue -> lock , NULL );
121
134
pthread_cond_init (& global_queue -> wait_condition , NULL );
@@ -129,52 +142,61 @@ static void global_queue_destroy(global_queue_t* global_queue) {
129
142
}
130
143
131
144
static void global_queue_pop_work (unsigned long thread_id , global_queue_t * global_queue , deque_t * local_queue ) {
132
- int i ;
145
+ int i , work_to_grab ;
133
146
134
- printf ("Thread %lu aquiring global queue lock\n" , thread_id );
147
+ debug_print ("Thread %lu aquiring global queue lock\n" , thread_id );
135
148
pthread_mutex_lock (& global_queue -> lock );
136
- printf ("Thread %lu aquired global queue lock\n" , thread_id );
137
- while (global_queue -> count == 0 && !global_queue -> complete ) {
149
+ debug_print ("Thread %lu aquired global queue lock\n" , thread_id );
150
+ while (global_queue_count == 0 && !global_queue -> complete ) {
138
151
global_queue -> waiters ++ ;
139
- printf ("Thread %lu checking wait condition. Waiters: %d NTHREADS: %d\n" , thread_id , global_queue -> waiters , NTHREADS );
152
+ debug_print ("Thread %lu checking wait condition. Waiters: %d NTHREADS: %d\n" , thread_id , global_queue -> waiters , NTHREADS );
140
153
if (global_queue -> waiters == NTHREADS ) {
141
- printf ("Marking complete + waking threads\n" );
154
+ debug_print ("Marking complete + waking threads\n" );
142
155
global_queue -> complete = 1 ;
143
156
pthread_cond_broadcast (& global_queue -> wait_condition );
144
157
} else {
145
158
// Release the lock and go to sleep until someone signals
146
- printf ("Thread %lu waiting. Waiters: %d\n" , thread_id , global_queue -> waiters );
159
+ debug_print ("Thread %lu waiting. Waiters: %d\n" , thread_id , global_queue -> waiters );
147
160
pthread_cond_wait (& global_queue -> wait_condition , & global_queue -> lock );
148
- printf ("Thread %lu awoken\n" , thread_id );
161
+ debug_print ("Thread %lu awoken\n" , thread_id );
149
162
}
150
163
global_queue -> waiters -- ;
151
164
}
152
-
153
- for (i = 0 ; i < MAX_WORK_TO_GRAB ; i ++ ) {
154
- if (deque_empty_p (& global_queue -> deque ))
155
- break ;
165
+ work_to_grab = MIN (global_queue_count , MAX_WORK_TO_GRAB );
166
+ for (i = 0 ; i < work_to_grab ; i ++ ) {
156
167
deque_push (local_queue , deque_pop (& (global_queue -> deque )));
157
168
}
169
+ debug_print ("Thread %lu took %d items from global\n" , thread_id , work_to_grab );
158
170
159
171
pthread_mutex_unlock (& global_queue -> lock );
160
172
}
161
173
162
174
static void global_queue_offer_work (global_queue_t * global_queue , deque_t * local_queue ) {
163
175
int i ;
164
176
int localqueuesize = local_queue -> length ;
177
+ int items_to_offer , free_slots ;
178
+
165
179
if ((global_queue -> waiters && localqueuesize > 2 )
DCA5
||
166
- (global_queue -> count < GLOBAL_QUEUE_SIZE_MIN &&
180
+ (global_queue_count < GLOBAL_QUEUE_SIZE_MIN &&
167
181
localqueuesize > LOCAL_QUEUE_SIZE / 2 )) {
168
- if (pthread_mutex_trylock (& global_queue -> lock )) {
169
- //Offer to global
170
- for (i = 0 ; i < localqueuesize / 2 ; i ++ ) {
171
- deque_push (& (global_queue -> deque ), deque_pop_back (local_queue ));
172
- }
173
- if (global_queue -> waiters ) {
174
- pthread_cond_broadcast (& global_queue -> wait_condition );
175
- }
176
- pthread_mutex_unlock (& global_queue -> lock );
182
+
183
+ pthread_mutex_lock (& global_queue -> lock );
184
+
185
+ free_slots = GLOBAL_QUEUE_SIZE - global_queue_count ;
186
+ items_to_offer = MIN (localqueuesize / 2 , free_slots );
187
+ //Offer to global
188
+ debug_print ("Thread %lu offering %d items to global\n" ,
189
+ * ((long * )pthread_getspecific (thread_id_k )),
190
+ items_to_offer );
191
+
192
+ for (i = 0 ; i < items_to_offer ; i ++ ) {
193
+ assert (deque_push (& (global_queue -> deque ), deque_pop_back (local_queue )));
194
+ }
195
+ if (global_queue -> waiters ) {
196
+ pthread_cond_broadcast (& global_queue -> wait_condition );
177
197
}
198
+ pthread_mutex_unlock (& global_queue -> lock );
199
+
178
200
}
179
201
}
180
202
@@ -185,27 +207,32 @@ static void global_queue_offer_work(global_queue_t* global_queue, deque_t* local
185
207
void * active_objspace ;
186
208
global_queue_t * global_queue ;
187
209
pthread_key_t thread_local_deque_k ;
210
+
188
211
static void * mark_run_loop (void * arg ) {
189
212
long thread_id = (long ) arg ;
190
- printf ("Thread %lu started\n" , thread_id );
191
213
deque_t deque ;
214
+ VALUE v ;
215
+ debug_print ("Thread %lu started\n" , thread_id );
216
+
192
217
deque_init (& deque , LOCAL_QUEUE_SIZE );
193
218
pthread_setspecific (thread_local_deque_k , & deque );
219
+ pthread_setspecific (thread_id_k , & thread_id );
194
220
if (thread_id == 0 ) {
195
- printf ("Thread 0 running start_mark\n" );
221
+ debug_print ("Thread 0 running start_mark\n" );
196
222
gc_start_mark (active_objspace );
223
+ debug_print ("Thread 0 finished start_mark\n" );
197
224
}
198
225
while (1 ) {
199
226
global_queue_offer_work (global_queue , & deque );
200
227
if (deque_empty_p (& deque )) {
201
- printf ("Thread %lu taking work from the master thread \n" , thread_id );
228
+ debug_print ("Thread %lu taking work from the global queue \n" , thread_id );
202
229
global_queue_pop_work (thread_id , global_queue , & deque );
203
230
}
204
231
if (global_queue -> complete ) {
205
232
break ;
206
233
}
207
- VALUE v = deque_pop (& deque );
208
- // printf ("Thread %lu marking %lu\n", thread_id, v);
234
+ v = deque_pop (& deque );
235
+ // debug_print ("Thread %lu marking %lu\n", thread_id, v);
209
236
gc_do_mark (active_objspace , v );
210
237
}
211
238
return NULL ;
@@ -223,6 +250,7 @@ void gc_mark_parallel(void* objspace) {
223
250
global_queue_init (global_queue );
224
251
225
252
pthread_key_create (& thread_local_deque_k , deque_destroy_callback );
253
+ pthread_key_create (& thread_id_k , NULL );
226
254
227
255
pthread_attr_init (& attr );
228
256
pthread_attr_setdetachstate (& attr , PTHREAD_CREATE_JOINABLE );
@@ -242,6 +270,7 @@ void gc_mark_parallel(void* objspace) {
242
270
pthread_join (threads [t ], & status );
243
271
//TODO: handle error codes
244
272
}
273
+
245
274
global_queue_destroy (global_queue );
246
275
}
247
276
0 commit comments