8000 vm_trace.c: workqueue as thread-safe version of postponed_job · github/ruby@eb38fb6 · GitHub
[go: up one dir, main page]

Skip to content

Commit eb38fb6

Browse files
author
normal
committed
vm_trace.c: workqueue as thread-safe version of postponed_job
postponed_job is safe to use in signal handlers, but is not thread-safe for MJIT. Implement a workqueue for MJIT thread-safety. [Bug #15316] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@66100 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
1 parent d7e4e50 commit eb38fb6

File tree

5 files changed

+67
-20
lines changed

5 files changed

+67
-20
lines changed

mjit.c

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,6 @@ mjit_gc_finish_hook(void)
106106
CRITICAL_SECTION_FINISH(4, "mjit_gc_finish_hook");
107107
}
108108

109-
/* Wrap critical section to prevent [Bug #15316] */
110-
void
111-
mjit_postponed_job_register_start_hook(void)
112-
{
113-
CRITICAL_SECTION_START(4, "mjit_postponed_job_register_start_hook");
114-
}
115-
116-
/* Unwrap critical section of mjit_postponed_job_register_start_hook() */
117-
void
118-
mjit_postponed_job_register_finish_hook(void)
119-
{
120-
CRITICAL_SECTION_FINISH(4, "mjit_postponed_job_register_finish_hook");
121-
}
122-
123109
/* Iseqs can be garbage collected. This function should call when it
124110
happens. It removes iseq from the unit. */
125111
void

mjit_worker.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,9 @@ static mjit_copy_job_t mjit_copy_job;
11331133

11341134
static void mjit_copy_job_handler(void *data);
11351135

1136+
/* vm_trace.c */
1137+
int rb_workqueue_register(unsigned flags, rb_postponed_job_func_t , void *);
1138+
11361139
/* We're lazily copying cache values from main thread because these cache values
11371140
could be different between ones on enqueue timing and ones on dequeue timing.
11381141
Return TRUE if copy succeeds. */
@@ -1148,7 +1151,7 @@ copy_cache_from_main_thread(mjit_copy_job_t *job)
11481151
return job->finish_p;
11491152
}
11501153

1151-
if (!rb_postponed_job_register(0, mjit_copy_job_handler, (void *)job))
1154+
if (!rb_workqueue_register(0, mjit_copy_job_handler, (void *)job))
11521155
return FALSE;
11531156
CRITICAL_SECTION_START(3, "in MJIT copy job wait");
11541157
/* checking `stop_worker_p` too because `RUBY_VM_CHECK_INTS(ec)` may not

thread.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,7 @@ rb_vm_gvl_destroy(rb_vm_t *vm)
419419
if (0) {
420420
/* may be held by running threads */
421421
rb_native_mutex_destroy(&vm->waitpid_lock);
422+
rb_native_mutex_destroy(&vm->workqueue_lock);
422423
}
423424
}
424425

@@ -4422,6 +4423,7 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
44224423

44234424
/* may be held by MJIT threads in parent */
44244425
rb_native_mutex_initialize(&vm->waitpid_lock);
4426+
rb_native_mutex_initialize(&vm->workqueue_lock);
44254427

44264428
/* may be held by any thread in parent */
44274429
rb_native_mutex_initialize(&th->interrupt_lock);
@@ -5183,6 +5185,7 @@ Init_Thread(void)
51835185
gvl_init(th->vm);
51845186
gvl_acquire(th->vm, th);
51855187
rb_native_mutex_initialize(&th->vm->waitpid_lock);
5188+
rb_native_mutex_initialize(&th->vm->workqueue_lock);
51865189
rb_native_mutex_initialize(&th->interrupt_lock);
51875190

51885191
th->pending_interrupt_queue = rb_ary_tmp_new(0);

vm_core.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,12 +638,16 @@ typedef struct rb_vm_struct {
638638
/* relation table of ensure - rollback for callcc */
639639
struct st_table *ensure_rollback_table;
640640

641-
/* postponed_job */
641+
/* postponed_job (async-signal-safe, NOT thread-safe) */
642642
struct rb_postponed_job_struct *postponed_job_buffer;
643643
int postponed_job_index;
644644

645645
int src_encoding_index;
646646

647+
/* workqueue (thread-safe, NOT async-signal-safe) */
648+
struct list_head workqueue; /* <=> rb_workqueue_job.jnode */
649+
rb_nativethread_lock_t workqueue_lock;
650+
647651
VALUE verbose, debug, orig_progname, progname;
648652
VALUE coverages;
649653
int coverage_mode;
@@ -1628,6 +1632,7 @@ rb_vm_living_threads_init(rb_vm_t *vm)
16281632
{
16291633
list_head_init(&vm->waiting_fds);
16301634
list_head_init(&vm->waiting_pids);
1635+
list_head_init(&vm->workqueue);
16311636
list_head_init(&vm->waiting_grps);
16321637
list_head_init(&vm->living_threads);
16331638
vm->living_thread_num = 0;

vm_trace.c

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,12 +1752,18 @@ typedef struct rb_postponed_job_struct {
17521752
#define MAX_POSTPONED_JOB 1000
17531753
#define MAX_POSTPONED_JOB_SPECIAL_ADDITION 24
17541754

1755+
struct rb_workqueue_job {
1756+
struct list_node jnode; /* <=> vm->workqueue */
1757+
rb_postponed_job_t job;
1758+
};
1759+
17551760
void
17561761
Init_vm_postponed_job(void)
17571762
{
17581763
rb_vm_t *vm = GET_VM();
17591764
vm->postponed_job_buffer = ALLOC_N(rb_postponed_job_t, MAX_POSTPONED_JOB);
17601765
vm->postponed_job_index = 0;
1766+
/* workqueue is initialized when VM locks are initialized */
17611767
}
17621768

17631769
enum postponed_job_register_result {
@@ -1766,21 +1772,19 @@ enum postponed_job_register_result {
17661772
PJRR_INTERRUPTED = 2
17671773
};
17681774

1769-
/* Async-signal-safe, thread-safe against MJIT worker thread */
1775+
/* Async-signal-safe */
17701776
static enum postponed_job_register_result
17711777
postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
17721778
unsigned int flags, rb_postponed_job_func_t func, void *data, int max, int expected_index)
17731779
{
17741780
rb_postponed_job_t *pjob;
17751781

17761782
if (expected_index >= max) return PJRR_FULL; /* failed */
1777-
if (mjit_enabled) mjit_postponed_job_register_start_hook();
17781783

17791784
if (ATOMIC_CAS(vm->postponed_job_index, expected_index, expected_index+1) == expected_index) {
17801785
pjob = &vm->postponed_job_buffer[expected_index];
17811786
}
17821787
else {
1783-
if (mjit_enabled) mjit_postponed_job_register_finish_hook();
17841788
return PJRR_INTERRUPTED;
17851789
}
17861790

@@ -1789,7 +1793,6 @@ postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
17891793
pjob->data = data;
17901794

17911795
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
1792-
if (mjit_enabled) mjit_postponed_job_register_finish_hook();
17931796

17941797
return PJRR_SUCCESS;
17951798
}
@@ -1842,13 +1845,43 @@ rb_postponed_job_register_one(unsigned int flags, rb_postponed_job_func_t func,
18421845
}
18431846
}
18441847

1848+
/*
1849+
* thread-safe and called from non-Ruby thread
1850+
* returns FALSE on failure (ENOMEM), TRUE otherwise
1851+
*/
1852+
int
1853+
rb_workqueue_register(unsigned flags, rb_postponed_job_func_t func, void *data)
1854+
{
1855+
struct rb_workqueue_job *wq_job = malloc(sizeof(*wq_job));
1856+
rb_vm_t *vm = GET_VM();
1857+
1858+
if (!wq_job) return FALSE;
1859+
wq_job->job.func = func;
1860+
wq_job->job.data = data;
1861+
1862+
rb_nativethread_lock_lock(&vm->workqueue_lock);
1863+
list_add_tail(&vm->workqueue, &wq_job->jnode);
1864+
rb_nativethread_lock_unlock(&vm->workqueue_lock);
1865+
1866+
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
1867+
1868+
return TRUE;
1869+
}
1870+
18451871
void
18461872
rb_postponed_job_flush(rb_vm_t *vm)
18471873
{
18481874
rb_execution_context_t *ec = GET_EC();
18491875
const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK|TRAP_INTERRUPT_MASK;
18501876
volatile rb_atomic_t saved_mask = ec->interrupt_mask & block_mask;
18511877
VALUE volatile saved_errno = ec->errinfo;
1878+
struct list_head tmp;
1879+
1880+
list_head_init(&tmp);
1881+
1882+
rb_nativethread_lock_lock(&vm->workqueue_lock);
1883+
list_append_list(&tmp, &vm->workqueue);
1884+
rb_nativethread_lock_unlock(&vm->workqueue_lock);
18521885

18531886
ec->errinfo = Qnil;
18541887
/* mask POSTPONED_JOB dispatch */
@@ -1857,16 +1890,33 @@ rb_postponed_job_flush(rb_vm_t *vm)
18571890
EC_PUSH_TAG(ec);
18581891
if (EC_EXEC_TAG() == TAG_NONE) {
18591892
int index;
1893+
struct rb_workqueue_job *wq_job;
1894+
18601895
while ((index = vm->postponed_job_index) > 0) {
18611896
if (ATOMIC_CAS(vm->postponed_job_index, index, index-1) == index) {
18621897
rb_postponed_job_t *pjob = &vm->postponed_job_buffer[index-1];
18631898
(*pjob->func)(pjob->data);
18641899
}
18651900
}
1901+
while ((wq_job = list_pop(&tmp, struct rb_workqueue_job, jnode))) {
1902+
rb_postponed_job_t pjob = wq_job->job;
1903+
1904+
free(wq_job);
1905+
(pjob.func)(pjob.data);
1906+
}
18661907
}
18671908
EC_POP_TAG();
18681909
}
18691910
/* restore POSTPONED_JOB mask */
18701911
ec->interrupt_mask &= ~(saved_mask ^ block_mask);
18711912
ec->errinfo = saved_errno;
1913+
1914+
/* don't leak memory if a job threw an exception */
1915+
if (!list_empty(&tmp)) {
1916+
rb_nativethread_lock_lock(&vm->workqueue_lock);
1917+
list_prepend_list(&vm->workqueue, &tmp);
1918+
rb_nativethread_lock_unlock(&vm->workqueue_lock);
1919+
1920+
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
1921+
}
18721922
}

0 commit comments

Comments
 (0)
0