Ruby 1.9.3p327(2012-11-10revision37606)
|
00001 /********************************************************************** 00002 00003 thread.c - 00004 00005 $Author: kosaki $ 00006 00007 Copyright (C) 2004-2007 Koichi Sasada 00008 00009 **********************************************************************/ 00010 00011 /* 00012 YARV Thread Design 00013 00014 model 1: Userlevel Thread 00015 Same as traditional ruby thread. 00016 00017 model 2: Native Thread with Global VM lock 00018 Using pthread (or Windows thread) and Ruby threads run concurrent. 00019 00020 model 3: Native Thread with fine grain lock 00021 Using pthread and Ruby threads run concurrent or parallel. 00022 00023 ------------------------------------------------------------------------ 00024 00025 model 2: 00026 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run. 00027 When thread scheduling, running thread release GVL. If running thread 00028 try blocking operation, this thread must release GVL and another 00029 thread can continue this flow. After blocking operation, thread 00030 must check interrupt (RUBY_VM_CHECK_INTS). 00031 00032 Every VM can run parallel. 00033 00034 Ruby threads are scheduled by OS thread scheduler. 00035 00036 ------------------------------------------------------------------------ 00037 00038 model 3: 00039 Every threads run concurrent or parallel and to access shared object 00040 exclusive access control is needed. For example, to access String 00041 object or Array object, fine grain lock must be locked every time. 00042 */ 00043 00044 00045 /* for model 2 */ 00046 00047 #include "eval_intern.h" 00048 #include "gc.h" 00049 #include "internal.h" 00050 #include "ruby/io.h" 00051 00052 #ifndef USE_NATIVE_THREAD_PRIORITY 00053 #define USE_NATIVE_THREAD_PRIORITY 0 00054 #define RUBY_THREAD_PRIORITY_MAX 3 00055 #define RUBY_THREAD_PRIORITY_MIN -3 00056 #endif 00057 00058 #ifndef THREAD_DEBUG 00059 #define THREAD_DEBUG 0 00060 #endif 00061 00062 VALUE rb_cMutex; 00063 VALUE rb_cBarrier; 00064 00065 static void sleep_timeval(rb_thread_t *th, struct timeval time); 00066 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec); 00067 static void sleep_forever(rb_thread_t *th, int nodeadlock); 00068 static double timeofday(void); 00069 static int rb_threadptr_dead(rb_thread_t *th); 00070 00071 static void rb_check_deadlock(rb_vm_t *vm); 00072 00073 #define eKillSignal INT2FIX(0) 00074 #define eTerminateSignal INT2FIX(1) 00075 static volatile int system_working = 1; 00076 00077 #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] 00078 00079 inline static void 00080 st_delete_wrap(st_table *table, st_data_t key) 00081 { 00082 st_delete(table, &key, 0); 00083 } 00084 00085 /********************************************************************************/ 00086 00087 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION 00088 00089 struct rb_blocking_region_buffer { 00090 enum rb_thread_status prev_status; 00091 struct rb_unblock_callback oldubf; 00092 }; 00093 00094 static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00095 struct rb_unblock_callback *old); 00096 static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); 00097 00098 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); 00099 00100 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \ 00101 do { \ 00102 rb_gc_save_machine_context(th); \ 00103 SET_MACHINE_STACK_END(&(th)->machine_stack_end); \ 00104 } while (0) 00105 00106 #define GVL_UNLOCK_BEGIN() do { \ 00107 rb_thread_t *_th_stored = GET_THREAD(); \ 00108 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \ 00109 gvl_release(_th_stored->vm); 00110 00111 #define GVL_UNLOCK_END() \ 00112 gvl_acquire(_th_stored->vm, _th_stored); \ 00113 rb_thread_set_current(_th_stored); \ 00114 } while(0) 00115 00116 #define blocking_region_begin(th, region, func, arg) \ 00117 do { \ 00118 (region)->prev_status = (th)->status; \ 00119 set_unblock_function((th), (func), (arg), &(region)->oldubf); \ 00120 (th)->blocking_region_buffer = (region); \ 00121 (th)->status = THREAD_STOPPED; \ 00122 thread_debug("enter blocking region (%p)\n", (void *)(th)); \ 00123 RB_GC_SAVE_MACHINE_CONTEXT(th); \ 00124 gvl_release((th)->vm); \ 00125 } while (0) 00126 00127 #define BLOCKING_REGION(exec, ubf, ubfarg) do { \ 00128 rb_thread_t *__th = GET_THREAD(); \ 00129 struct rb_blocking_region_buffer __region; \ 00130 blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \ 00131 exec; \ 00132 blocking_region_end(__th, &__region); \ 00133 RUBY_VM_CHECK_INTS(); \ 00134 } while(0) 00135 00136 #if THREAD_DEBUG 00137 #ifdef HAVE_VA_ARGS_MACRO 00138 void rb_thread_debug(const char *file, int line, const char *fmt, ...); 00139 #define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__) 00140 #define POSITION_FORMAT "%s:%d:" 00141 #define POSITION_ARGS ,file, line 00142 #else 00143 void rb_thread_debug(const char *fmt, ...); 00144 #define thread_debug rb_thread_debug 00145 #define POSITION_FORMAT 00146 #define POSITION_ARGS 00147 #endif 00148 00149 # if THREAD_DEBUG < 0 00150 static int rb_thread_debug_enabled; 00151 00152 /* 00153 * call-seq: 00154 * Thread.DEBUG -> num 00155 * 00156 * Returns the thread debug level. Available only if compiled with 00157 * THREAD_DEBUG=-1. 00158 */ 00159 00160 static VALUE 00161 rb_thread_s_debug(void) 00162 { 00163 return INT2NUM(rb_thread_debug_enabled); 00164 } 00165 00166 /* 00167 * call-seq: 00168 * Thread.DEBUG = num 00169 * 00170 * Sets the thread debug level. Available only if compiled with 00171 * THREAD_DEBUG=-1. 00172 */ 00173 00174 static VALUE 00175 rb_thread_s_debug_set(VALUE self, VALUE val) 00176 { 00177 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0; 00178 return val; 00179 } 00180 # else 00181 # define rb_thread_debug_enabled THREAD_DEBUG 00182 # endif 00183 #else 00184 #define thread_debug if(0)printf 00185 #endif 00186 00187 #ifndef __ia64 00188 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st) 00189 #endif 00190 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, 00191 VALUE *register_stack_start)); 00192 static void timer_thread_function(void *); 00193 00194 #if defined(_WIN32) 00195 #include "thread_win32.c" 00196 00197 #define DEBUG_OUT() \ 00198 WaitForSingleObject(&debug_mutex, INFINITE); \ 00199 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \ 00200 fflush(stdout); \ 00201 ReleaseMutex(&debug_mutex); 00202 00203 #elif defined(HAVE_PTHREAD_H) 00204 #include "thread_pthread.c" 00205 00206 #define DEBUG_OUT() \ 00207 pthread_mutex_lock(&debug_mutex); \ 00208 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \ 00209 fflush(stdout); \ 00210 pthread_mutex_unlock(&debug_mutex); 00211 00212 #else 00213 #error "unsupported thread type" 00214 #endif 00215 00216 #if THREAD_DEBUG 00217 static int debug_mutex_initialized = 1; 00218 static rb_thread_lock_t debug_mutex; 00219 00220 void 00221 rb_thread_debug( 00222 #ifdef HAVE_VA_ARGS_MACRO 00223 const char *file, int line, 00224 #endif 00225 const char *fmt, ...) 00226 { 00227 va_list args; 00228 char buf[BUFSIZ]; 00229 00230 if (!rb_thread_debug_enabled) return; 00231 00232 if (debug_mutex_initialized == 1) { 00233 debug_mutex_initialized = 0; 00234 native_mutex_initialize(&debug_mutex); 00235 } 00236 00237 va_start(args, fmt); 00238 vsnprintf(buf, BUFSIZ, fmt, args); 00239 va_end(args); 00240 00241 DEBUG_OUT(); 00242 } 00243 #endif 00244 00245 void 00246 rb_vm_gvl_destroy(rb_vm_t *vm) 00247 { 00248 gvl_release(vm); 00249 gvl_destroy(vm); 00250 } 00251 00252 void 00253 rb_thread_lock_unlock(rb_thread_lock_t *lock) 00254 { 00255 native_mutex_unlock(lock); 00256 } 00257 00258 void 00259 rb_thread_lock_destroy(rb_thread_lock_t *lock) 00260 { 00261 native_mutex_destroy(lock); 00262 } 00263 00264 static void 00265 set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00266 struct rb_unblock_callback *old) 00267 { 00268 check_ints: 00269 RUBY_VM_CHECK_INTS(); /* check signal or so */ 00270 native_mutex_lock(&th->interrupt_lock); 00271 if (th->interrupt_flag) { 00272 native_mutex_unlock(&th->interrupt_lock); 00273 goto check_ints; 00274 } 00275 else { 00276 if (old) *old = th->unblock; 00277 th->unblock.func = func; 00278 th->unblock.arg = arg; 00279 } 00280 native_mutex_unlock(&th->interrupt_lock); 00281 } 00282 00283 static void 00284 reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old) 00285 { 00286 native_mutex_lock(&th->interrupt_lock); 00287 th->unblock = *old; 00288 native_mutex_unlock(&th->interrupt_lock); 00289 } 00290 00291 void 00292 rb_threadptr_interrupt(rb_thread_t *th) 00293 { 00294 native_mutex_lock(&th->interrupt_lock); 00295 RUBY_VM_SET_INTERRUPT(th); 00296 if (th->unblock.func) { 00297 (th->unblock.func)(th->unblock.arg); 00298 } 00299 else { 00300 /* none */ 00301 } 00302 native_mutex_unlock(&th->interrupt_lock); 00303 } 00304 00305 00306 static int 00307 terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread) 00308 { 00309 VALUE thval = key; 00310 rb_thread_t *th; 00311 GetThreadPtr(thval, th); 00312 00313 if (th != main_thread) { 00314 thread_debug("terminate_i: %p\n", (void *)th); 00315 rb_threadptr_interrupt(th); 00316 th->thrown_errinfo = eTerminateSignal; 00317 th->status = THREAD_TO_KILL; 00318 } 00319 else { 00320 thread_debug("terminate_i: main thread (%p)\n", (void *)th); 00321 } 00322 return ST_CONTINUE; 00323 } 00324 00325 typedef struct rb_mutex_struct 00326 { 00327 rb_thread_lock_t lock; 00328 rb_thread_cond_t cond; 00329 struct rb_thread_struct volatile *th; 00330 int cond_waiting; 00331 struct rb_mutex_struct *next_mutex; 00332 } rb_mutex_t; 00333 00334 static void rb_mutex_abandon_all(rb_mutex_t *mutexes); 00335 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 00336 00337 void 00338 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) 00339 { 00340 const char *err; 00341 rb_mutex_t *mutex; 00342 rb_mutex_t *mutexes = th->keeping_mutexes; 00343 00344 while (mutexes) { 00345 mutex = mutexes; 00346 /* rb_warn("mutex #<%p> remains to be locked by terminated thread", 00347 mutexes); */ 00348 mutexes = mutex->next_mutex; 00349 err = rb_mutex_unlock_th(mutex, th); 00350 if (err) rb_bug("invalid keeping_mutexes: %s", err); 00351 } 00352 } 00353 00354 void 00355 rb_thread_terminate_all(void) 00356 { 00357 rb_thread_t *th = GET_THREAD(); /* main thread */ 00358 rb_vm_t *vm = th->vm; 00359 00360 if (vm->main_thread != th) { 00361 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", 00362 (void *)vm->main_thread, (void *)th); 00363 } 00364 00365 /* unlock all locking mutexes */ 00366 rb_threadptr_unlock_all_locking_mutexes(th); 00367 00368 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); 00369 st_foreach(vm->living_threads, terminate_i, (st_data_t)th); 00370 vm->inhibit_thread_creation = 1; 00371 00372 while (!rb_thread_alone()) { 00373 PUSH_TAG(); 00374 if (EXEC_TAG() == 0) { 00375 rb_thread_schedule(); 00376 } 00377 else { 00378 /* ignore exception */ 00379 } 00380 POP_TAG(); 00381 } 00382 } 00383 00384 static void 00385 thread_cleanup_func_before_exec(void *th_ptr) 00386 { 00387 rb_thread_t *th = th_ptr; 00388 th->status = THREAD_KILLED; 00389 th->machine_stack_start = th->machine_stack_end = 0; 00390 #ifdef __ia64 00391 th->machine_register_stack_start = th->machine_register_stack_end = 0; 00392 #endif 00393 } 00394 00395 static void 00396 thread_cleanup_func(void *th_ptr, int atfork) 00397 { 00398 rb_thread_t *th = th_ptr; 00399 00400 th->locking_mutex = Qfalse; 00401 thread_cleanup_func_before_exec(th_ptr); 00402 00403 /* 00404 * Unfortunately, we can't release native threading resource at fork 00405 * because libc may have unstable locking state therefore touching 00406 * a threading resource may cause a deadlock. 00407 */ 00408 if (atfork) 00409 return; 00410 00411 native_mutex_destroy(&th->interrupt_lock); 00412 native_thread_destroy(th); 00413 } 00414 00415 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); 00416 00417 void 00418 ruby_thread_init_stack(rb_thread_t *th) 00419 { 00420 native_thread_init_stack(th); 00421 } 00422 00423 static int 00424 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start) 00425 { 00426 int state; 00427 VALUE args = th->first_args; 00428 rb_proc_t *proc; 00429 rb_thread_t *join_th; 00430 rb_thread_t *main_th; 00431 VALUE errinfo = Qnil; 00432 # ifdef USE_SIGALTSTACK 00433 void rb_register_sigaltstack(rb_thread_t *th); 00434 00435 rb_register_sigaltstack(th); 00436 # endif 00437 00438 ruby_thread_set_native(th); 00439 00440 th->machine_stack_start = stack_start; 00441 #ifdef __ia64 00442 th->machine_register_stack_start = register_stack_start; 00443 #endif 00444 thread_debug("thread start: %p\n", (void *)th); 00445 00446 gvl_acquire(th->vm, th); 00447 { 00448 thread_debug("thread start (get lock): %p\n", (void *)th); 00449 rb_thread_set_current(th); 00450 00451 TH_PUSH_TAG(th); 00452 if ((state = EXEC_TAG()) == 0) { 00453 SAVE_ROOT_JMPBUF(th, { 00454 if (!th->first_func) { 00455 GetProcPtr(th->first_proc, proc); 00456 th->errinfo = Qnil; 00457 th->local_lfp = proc->block.lfp; 00458 th->local_svar = Qnil; 00459 th->value = rb_vm_invoke_proc(th, proc, proc->block.self, 00460 (int)RARRAY_LEN(args), RARRAY_PTR(args), 0); 00461 } 00462 else { 00463 th->value = (*th->first_func)((void *)args); 00464 } 00465 }); 00466 } 00467 else { 00468 errinfo = th->errinfo; 00469 if (NIL_P(errinfo)) errinfo = rb_errinfo(); 00470 if (state == TAG_FATAL) { 00471 /* fatal error within this thread, need to stop whole script */ 00472 } 00473 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { 00474 if (th->safe_level >= 4) { 00475 th->errinfo = rb_exc_new3(rb_eSecurityError, 00476 rb_sprintf("Insecure exit at level %d", th->safe_level)); 00477 errinfo = Qnil; 00478 } 00479 } 00480 else if (th->safe_level < 4 && 00481 (th->vm->thread_abort_on_exception || 00482 th->abort_on_exception || RTEST(ruby_debug))) { 00483 /* exit on main_thread */ 00484 } 00485 else { 00486 errinfo = Qnil; 00487 } 00488 th->value = Qnil; 00489 } 00490 00491 th->status = THREAD_KILLED; 00492 thread_debug("thread end: %p\n", (void *)th); 00493 00494 main_th = th->vm->main_thread; 00495 if (th != main_th) { 00496 if (TYPE(errinfo) == T_OBJECT) { 00497 /* treat with normal error object */ 00498 rb_threadptr_raise(main_th, 1, &errinfo); 00499 } 00500 } 00501 TH_POP_TAG(); 00502 00503 /* locking_mutex must be Qfalse */ 00504 if (th->locking_mutex != Qfalse) { 00505 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", 00506 (void *)th, th->locking_mutex); 00507 } 00508 00509 /* delete self other than main thread from living_threads */ 00510 if (th != main_th) { 00511 st_delete_wrap(th->vm->living_threads, th->self); 00512 } 00513 00514 /* wake up joining threads */ 00515 join_th = th->join_list_head; 00516 while (join_th) { 00517 if (join_th == main_th) errinfo = Qnil; 00518 rb_threadptr_interrupt(join_th); 00519 switch (join_th->status) { 00520 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: 00521 join_th->status = THREAD_RUNNABLE; 00522 default: break; 00523 } 00524 join_th = join_th->join_list_next; 00525 } 00526 00527 rb_threadptr_unlock_all_locking_mutexes(th); 00528 if (th != main_th) rb_check_deadlock(th->vm); 00529 00530 if (!th->root_fiber) { 00531 rb_thread_recycle_stack_release(th->stack); 00532 th->stack = 0; 00533 } 00534 } 00535 if (th->vm->main_thread == th) { 00536 ruby_cleanup(state); 00537 } 00538 else { 00539 thread_cleanup_func(th, FALSE); 00540 gvl_release(th->vm); 00541 } 00542 00543 return 0; 00544 } 00545 00546 static VALUE 00547 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) 00548 { 00549 rb_thread_t *th; 00550 int err; 00551 00552 if (OBJ_FROZEN(GET_THREAD()->thgroup)) { 00553 rb_raise(rb_eThreadError, 00554 "can't start a new thread (frozen ThreadGroup)"); 00555 } 00556 GetThreadPtr(thval, th); 00557 00558 /* setup thread environment */ 00559 th->first_func = fn; 00560 th->first_proc = fn ? Qfalse : rb_block_proc(); 00561 th->first_args = args; /* GC: shouldn't put before above line */ 00562 00563 th->priority = GET_THREAD()->priority; 00564 th->thgroup = GET_THREAD()->thgroup; 00565 00566 native_mutex_initialize(&th->interrupt_lock); 00567 if (GET_VM()->event_hooks != NULL) 00568 th->event_flags |= RUBY_EVENT_VM; 00569 00570 /* kick thread */ 00571 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id); 00572 err = native_thread_create(th); 00573 if (err) { 00574 st_delete_wrap(th->vm->living_threads, th->self); 00575 th->status = THREAD_KILLED; 00576 rb_raise(rb_eThreadError, "can't create Thread (%d)", err); 00577 } 00578 return thval; 00579 } 00580 00581 /* :nodoc: */ 00582 static VALUE 00583 thread_s_new(int argc, VALUE *argv, VALUE klass) 00584 { 00585 rb_thread_t *th; 00586 VALUE thread = rb_thread_alloc(klass); 00587 00588 if (GET_VM()->inhibit_thread_creation) 00589 rb_raise(rb_eThreadError, "can't alloc thread"); 00590 00591 rb_obj_call_init(thread, argc, argv); 00592 GetThreadPtr(thread, th); 00593 if (!th->first_args) { 00594 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'", 00595 rb_class2name(klass)); 00596 } 00597 return thread; 00598 } 00599 00600 /* 00601 * call-seq: 00602 * Thread.start([args]*) {|args| block } -> thread 00603 * Thread.fork([args]*) {|args| block } -> thread 00604 * 00605 * Basically the same as <code>Thread::new</code>. However, if class 00606 * <code>Thread</code> is subclassed, then calling <code>start</code> in that 00607 * subclass will not invoke the subclass's <code>initialize</code> method. 00608 */ 00609 00610 static VALUE 00611 thread_start(VALUE klass, VALUE args) 00612 { 00613 return thread_create_core(rb_thread_alloc(klass), args, 0); 00614 } 00615 00616 /* :nodoc: */ 00617 static VALUE 00618 thread_initialize(VALUE thread, VALUE args) 00619 { 00620 rb_thread_t *th; 00621 if (!rb_block_given_p()) { 00622 rb_raise(rb_eThreadError, "must be called with a block"); 00623 } 00624 GetThreadPtr(thread, th); 00625 if (th->first_args) { 00626 VALUE proc = th->first_proc, line, loc; 00627 const char *file; 00628 if (!proc || !RTEST(loc = rb_proc_location(proc))) { 00629 rb_raise(rb_eThreadError, "already initialized thread"); 00630 } 00631 file = RSTRING_PTR(RARRAY_PTR(loc)[0]); 00632 if (NIL_P(line = RARRAY_PTR(loc)[1])) { 00633 rb_raise(rb_eThreadError, "already initialized thread - %s", 00634 file); 00635 } 00636 rb_raise(rb_eThreadError, "already initialized thread - %s:%d", 00637 file, NUM2INT(line)); 00638 } 00639 return thread_create_core(thread, args, 0); 00640 } 00641 00642 VALUE 00643 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) 00644 { 00645 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); 00646 } 00647 00648 00649 /* +infty, for this purpose */ 00650 #define DELAY_INFTY 1E30 00651 00652 struct join_arg { 00653 rb_thread_t *target, *waiting; 00654 double limit; 00655 int forever; 00656 }; 00657 00658 static VALUE 00659 remove_from_join_list(VALUE arg) 00660 { 00661 struct join_arg *p = (struct join_arg *)arg; 00662 rb_thread_t *target_th = p->target, *th = p->waiting; 00663 00664 if (target_th->status != THREAD_KILLED) { 00665 rb_thread_t **pth = &target_th->join_list_head; 00666 00667 while (*pth) { 00668 if (*pth == th) { 00669 *pth = th->join_list_next; 00670 break; 00671 } 00672 pth = &(*pth)->join_list_next; 00673 } 00674 } 00675 00676 return Qnil; 00677 } 00678 00679 static VALUE 00680 thread_join_sleep(VALUE arg) 00681 { 00682 struct join_arg *p = (struct join_arg *)arg; 00683 rb_thread_t *target_th = p->target, *th = p->waiting; 00684 double now, limit = p->limit; 00685 00686 while (target_th->status != THREAD_KILLED) { 00687 if (p->forever) { 00688 sleep_forever(th, 1); 00689 } 00690 else { 00691 now = timeofday(); 00692 if (now > limit) { 00693 thread_debug("thread_join: timeout (thid: %p)\n", 00694 (void *)target_th->thread_id); 00695 return Qfalse; 00696 } 00697 sleep_wait_for_interrupt(th, limit - now); 00698 } 00699 thread_debug("thread_join: interrupted (thid: %p)\n", 00700 (void *)target_th->thread_id); 00701 } 00702 return Qtrue; 00703 } 00704 00705 static VALUE 00706 thread_join(rb_thread_t *target_th, double delay) 00707 { 00708 rb_thread_t *th = GET_THREAD(); 00709 struct join_arg arg; 00710 00711 arg.target = target_th; 00712 arg.waiting = th; 00713 arg.limit = timeofday() + delay; 00714 arg.forever = delay == DELAY_INFTY; 00715 00716 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id); 00717 00718 if (target_th->status != THREAD_KILLED) { 00719 th->join_list_next = target_th->join_list_head; 00720 target_th->join_list_head = th; 00721 if (!rb_ensure(thread_join_sleep, (VALUE)&arg, 00722 remove_from_join_list, (VALUE)&arg)) { 00723 return Qnil; 00724 } 00725 } 00726 00727 thread_debug("thread_join: success (thid: %p)\n", 00728 (void *)target_th->thread_id); 00729 00730 if (target_th->errinfo != Qnil) { 00731 VALUE err = target_th->errinfo; 00732 00733 if (FIXNUM_P(err)) { 00734 /* */ 00735 } 00736 else if (TYPE(target_th->errinfo) == T_NODE) { 00737 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump( 00738 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err))); 00739 } 00740 else { 00741 /* normal exception */ 00742 rb_exc_raise(err); 00743 } 00744 } 00745 return target_th->self; 00746 } 00747 00748 /* 00749 * call-seq: 00750 * thr.join -> thr 00751 * thr.join(limit) -> thr 00752 * 00753 * The calling thread will suspend execution and run <i>thr</i>. Does not 00754 * return until <i>thr</i> exits or until <i>limit</i> seconds have passed. If 00755 * the time limit expires, <code>nil</code> will be returned, otherwise 00756 * <i>thr</i> is returned. 00757 * 00758 * Any threads not joined will be killed when the main program exits. If 00759 * <i>thr</i> had previously raised an exception and the 00760 * <code>abort_on_exception</code> and <code>$DEBUG</code> flags are not set 00761 * (so the exception has not yet been processed) it will be processed at this 00762 * time. 00763 * 00764 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" } 00765 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } 00766 * x.join # Let x thread finish, a will be killed on exit. 00767 * 00768 * <em>produces:</em> 00769 * 00770 * axyz 00771 * 00772 * The following example illustrates the <i>limit</i> parameter. 00773 * 00774 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} 00775 * puts "Waiting" until y.join(0.15) 00776 * 00777 * <em>produces:</em> 00778 * 00779 * tick... 00780 * Waiting 00781 * tick... 00782 * Waitingtick... 00783 * 00784 * 00785 * tick... 00786 */ 00787 00788 static VALUE 00789 thread_join_m(int argc, VALUE *argv, VALUE self) 00790 { 00791 rb_thread_t *target_th; 00792 double delay = DELAY_INFTY; 00793 VALUE limit; 00794 00795 GetThreadPtr(self, target_th); 00796 00797 rb_scan_args(argc, argv, "01", &limit); 00798 if (!NIL_P(limit)) { 00799 delay = rb_num2dbl(limit); 00800 } 00801 00802 return thread_join(target_th, delay); 00803 } 00804 00805 /* 00806 * call-seq: 00807 * thr.value -> obj 00808 * 00809 * Waits for <i>thr</i> to complete (via <code>Thread#join</code>) and returns 00810 * its value. 00811 * 00812 * a = Thread.new { 2 + 2 } 00813 * a.value #=> 4 00814 */ 00815 00816 static VALUE 00817 thread_value(VALUE self) 00818 { 00819 rb_thread_t *th; 00820 GetThreadPtr(self, th); 00821 thread_join(th, DELAY_INFTY); 00822 return th->value; 00823 } 00824 00825 /* 00826 * Thread Scheduling 00827 */ 00828 00829 static struct timeval 00830 double2timeval(double d) 00831 { 00832 struct timeval time; 00833 00834 time.tv_sec = (int)d; 00835 time.tv_usec = (int)((d - (int)d) * 1e6); 00836 if (time.tv_usec < 0) { 00837 time.tv_usec += (int)1e6; 00838 time.tv_sec -= 1; 00839 } 00840 return time; 00841 } 00842 00843 static void 00844 sleep_forever(rb_thread_t *th, int deadlockable) 00845 { 00846 enum rb_thread_status prev_status = th->status; 00847 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; 00848 00849 th->status = status; 00850 do { 00851 if (deadlockable) { 00852 th->vm->sleeper++; 00853 rb_check_deadlock(th->vm); 00854 } 00855 native_sleep(th, 0); 00856 if (deadlockable) { 00857 th->vm->sleeper--; 00858 } 00859 RUBY_VM_CHECK_INTS(); 00860 } while (th->status == status); 00861 th->status = prev_status; 00862 } 00863 00864 static void 00865 getclockofday(struct timeval *tp) 00866 { 00867 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00868 struct timespec ts; 00869 00870 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { 00871 tp->tv_sec = ts.tv_sec; 00872 tp->tv_usec = ts.tv_nsec / 1000; 00873 } else 00874 #endif 00875 { 00876 gettimeofday(tp, NULL); 00877 } 00878 } 00879 00880 static void 00881 sleep_timeval(rb_thread_t *th, struct timeval tv) 00882 { 00883 struct timeval to, tvn; 00884 enum rb_thread_status prev_status = th->status; 00885 00886 getclockofday(&to); 00887 to.tv_sec += tv.tv_sec; 00888 if ((to.tv_usec += tv.tv_usec) >= 1000000) { 00889 to.tv_sec++; 00890 to.tv_usec -= 1000000; 00891 } 00892 00893 th->status = THREAD_STOPPED; 00894 do { 00895 native_sleep(th, &tv); 00896 RUBY_VM_CHECK_INTS(); 00897 getclockofday(&tvn); 00898 if (to.tv_sec < tvn.tv_sec) break; 00899 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; 00900 thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", 00901 (long)to.tv_sec, (long)to.tv_usec, 00902 (long)tvn.tv_sec, (long)tvn.tv_usec); 00903 tv.tv_sec = to.tv_sec - tvn.tv_sec; 00904 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { 00905 --tv.tv_sec; 00906 tv.tv_usec += 1000000; 00907 } 00908 } while (th->status == THREAD_STOPPED); 00909 th->status = prev_status; 00910 } 00911 00912 void 00913 rb_thread_sleep_forever(void) 00914 { 00915 thread_debug("rb_thread_sleep_forever\n"); 00916 sleep_forever(GET_THREAD(), 0); 00917 } 00918 00919 static void 00920 rb_thread_sleep_deadly(void) 00921 { 00922 thread_debug("rb_thread_sleep_deadly\n"); 00923 sleep_forever(GET_THREAD(), 1); 00924 } 00925 00926 static double 00927 timeofday(void) 00928 { 00929 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00930 struct timespec tp; 00931 00932 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) { 00933 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9; 00934 } else 00935 #endif 00936 { 00937 struct timeval tv; 00938 gettimeofday(&tv, NULL); 00939 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6; 00940 } 00941 } 00942 00943 static void 00944 sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec) 00945 { 00946 sleep_timeval(th, double2timeval(sleepsec)); 00947 } 00948 00949 static void 00950 sleep_for_polling(rb_thread_t *th) 00951 { 00952 struct timeval time; 00953 time.tv_sec = 0; 00954 time.tv_usec = 100 * 1000; /* 0.1 sec */ 00955 sleep_timeval(th, time); 00956 } 00957 00958 void 00959 rb_thread_wait_for(struct timeval time) 00960 { 00961 rb_thread_t *th = GET_THREAD(); 00962 sleep_timeval(th, time); 00963 } 00964 00965 void 00966 rb_thread_polling(void) 00967 { 00968 RUBY_VM_CHECK_INTS(); 00969 if (!rb_thread_alone()) { 00970 rb_thread_t *th = GET_THREAD(); 00971 sleep_for_polling(th); 00972 } 00973 } 00974 00975 /* 00976 * CAUTION: This function causes thread switching. 00977 * rb_thread_check_ints() check ruby's interrupts. 00978 * some interrupt needs thread switching/invoke handlers, 00979 * and so on. 00980 */ 00981 00982 void 00983 rb_thread_check_ints(void) 00984 { 00985 RUBY_VM_CHECK_INTS(); 00986 } 00987 00988 /* 00989 * Hidden API for tcl/tk wrapper. 00990 * There is no guarantee to perpetuate it. 00991 */ 00992 int 00993 rb_thread_check_trap_pending(void) 00994 { 00995 return rb_signal_buff_size() != 0; 00996 } 00997 00998 /* This function can be called in blocking region. */ 00999 int 01000 rb_thread_interrupted(VALUE thval) 01001 { 01002 rb_thread_t *th; 01003 GetThreadPtr(thval, th); 01004 return RUBY_VM_INTERRUPTED(th); 01005 } 01006 01007 void 01008 rb_thread_sleep(int sec) 01009 { 01010 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); 01011 } 01012 01013 static void rb_threadptr_execute_interrupts_common(rb_thread_t *); 01014 01015 static void 01016 rb_thread_schedule_limits(unsigned long limits_us) 01017 { 01018 thread_debug("rb_thread_schedule\n"); 01019 if (!rb_thread_alone()) { 01020 rb_thread_t *th = GET_THREAD(); 01021 01022 if (th->running_time_us >= limits_us) { 01023 thread_debug("rb_thread_schedule/switch start\n"); 01024 RB_GC_SAVE_MACHINE_CONTEXT(th); 01025 gvl_yield(th->vm, th); 01026 rb_thread_set_current(th); 01027 thread_debug("rb_thread_schedule/switch done\n"); 01028 } 01029 } 01030 } 01031 01032 void 01033 rb_thread_schedule(void) 01034 { 01035 rb_thread_schedule_limits(0); 01036 01037 if (UNLIKELY(GET_THREAD()->interrupt_flag)) { 01038 rb_threadptr_execute_interrupts_common(GET_THREAD()); 01039 } 01040 } 01041 01042 /* blocking region */ 01043 01044 static inline void 01045 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) 01046 { 01047 gvl_acquire(th->vm, th); 01048 rb_thread_set_current(th); 01049 thread_debug("leave blocking region (%p)\n", (void *)th); 01050 remove_signal_thread_list(th); 01051 th->blocking_region_buffer = 0; 01052 reset_unblock_function(th, ®ion->oldubf); 01053 if (th->status == THREAD_STOPPED) { 01054 th->status = region->prev_status; 01055 } 01056 } 01057 01058 struct rb_blocking_region_buffer * 01059 rb_thread_blocking_region_begin(void) 01060 { 01061 rb_thread_t *th = GET_THREAD(); 01062 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer); 01063 blocking_region_begin(th, region, ubf_select, th); 01064 return region; 01065 } 01066 01067 void 01068 rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) 01069 { 01070 int saved_errno = errno; 01071 rb_thread_t *th = GET_THREAD(); 01072 blocking_region_end(th, region); 01073 xfree(region); 01074 RUBY_VM_CHECK_INTS(); 01075 errno = saved_errno; 01076 } 01077 01078 /* 01079 * rb_thread_blocking_region - permit concurrent/parallel execution. 01080 * 01081 * This function does: 01082 * (1) release GVL. 01083 * Other Ruby threads may run in parallel. 01084 * (2) call func with data1. 01085 * (3) acquire GVL. 01086 * Other Ruby threads can not run in parallel any more. 01087 * 01088 * If another thread interrupts this thread (Thread#kill, signal delivery, 01089 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means 01090 * "un-blocking function"). `ubf()' should interrupt `func()' execution. 01091 * 01092 * There are built-in ubfs and you can specify these ubfs. 01093 * However, we can not guarantee our built-in ubfs interrupt 01094 * your `func()' correctly. Be careful to use rb_thread_blocking_region(). 01095 * 01096 * * RUBY_UBF_IO: ubf for IO operation 01097 * * RUBY_UBF_PROCESS: ubf for process operation 01098 * 01099 * NOTE: You can not execute most of Ruby C API and touch Ruby 01100 * objects in `func()' and `ubf()', including raising an 01101 * exception, because current thread doesn't acquire GVL 01102 * (cause synchronization problem). If you need to do it, 01103 * read source code of C APIs and confirm by yourself. 01104 * 01105 * NOTE: In short, this API is difficult to use safely. I recommend you 01106 * use other ways if you have. We lack experiences to use this API. 01107 * Please report your problem related on it. 01108 * 01109 * Safe C API: 01110 * * rb_thread_interrupted() - check interrupt flag 01111 * * ruby_xalloc(), ruby_xrealloc(), ruby_xfree() - 01112 * if they called without GVL, acquire GVL automatically. 01113 */ 01114 VALUE 01115 rb_thread_blocking_region( 01116 rb_blocking_function_t *func, void *data1, 01117 rb_unblock_function_t *ubf, void *data2) 01118 { 01119 VALUE val; 01120 rb_thread_t *th = GET_THREAD(); 01121 int saved_errno = 0; 01122 01123 th->waiting_fd = -1; 01124 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { 01125 ubf = ubf_select; 01126 data2 = th; 01127 } 01128 01129 BLOCKING_REGION({ 01130 val = func(data1); 01131 saved_errno = errno; 01132 }, ubf, data2); 01133 errno = saved_errno; 01134 01135 return val; 01136 } 01137 01138 VALUE 01139 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) 01140 { 01141 VALUE val; 01142 rb_thread_t *th = GET_THREAD(); 01143 int saved_errno = 0; 01144 01145 th->waiting_fd = fd; 01146 BLOCKING_REGION({ 01147 val = func(data1); 01148 saved_errno = errno; 01149 }, ubf_select, th); 01150 th->waiting_fd = -1; 01151 errno = saved_errno; 01152 01153 return val; 01154 } 01155 01156 /* alias of rb_thread_blocking_region() */ 01157 01158 VALUE 01159 rb_thread_call_without_gvl( 01160 rb_blocking_function_t *func, void *data1, 01161 rb_unblock_function_t *ubf, void *data2) 01162 { 01163 return rb_thread_blocking_region(func, data1, ubf, data2); 01164 } 01165 01166 /* 01167 * rb_thread_call_with_gvl - re-enter into Ruby world while releasing GVL. 01168 * 01169 *** 01170 *** This API is EXPERIMENTAL! 01171 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01172 *** 01173 * 01174 * While releasing GVL using rb_thread_blocking_region() or 01175 * rb_thread_call_without_gvl(), you can not access Ruby values or invoke methods. 01176 * If you need to access it, you must use this function rb_thread_call_with_gvl(). 01177 * 01178 * This function rb_thread_call_with_gvl() does: 01179 * (1) acquire GVL. 01180 * (2) call passed function `func'. 01181 * (3) release GVL. 01182 * (4) return a value which is returned at (2). 01183 * 01184 * NOTE: You should not return Ruby object at (2) because such Object 01185 * will not marked. 01186 * 01187 * NOTE: If an exception is raised in `func', this function "DOES NOT" 01188 * protect (catch) the exception. If you have any resources 01189 * which should free before throwing exception, you need use 01190 * rb_protect() in `func' and return a value which represents 01191 * exception is raised. 01192 * 01193 * NOTE: This functions should not be called by a thread which 01194 * is not created as Ruby thread (created by Thread.new or so). 01195 * In other words, this function *DOES NOT* associate 01196 * NON-Ruby thread to Ruby thread. 01197 */ 01198 void * 01199 rb_thread_call_with_gvl(void *(*func)(void *), void *data1) 01200 { 01201 rb_thread_t *th = ruby_thread_from_native(); 01202 struct rb_blocking_region_buffer *brb; 01203 struct rb_unblock_callback prev_unblock; 01204 void *r; 01205 01206 if (th == 0) { 01207 /* Error is occurred, but we can't use rb_bug() 01208 * because this thread is not Ruby's thread. 01209 * What should we do? 01210 */ 01211 01212 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); 01213 exit(EXIT_FAILURE); 01214 } 01215 01216 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer; 01217 prev_unblock = th->unblock; 01218 01219 if (brb == 0) { 01220 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); 01221 } 01222 01223 blocking_region_end(th, brb); 01224 /* enter to Ruby world: You can access Ruby values, methods and so on. */ 01225 r = (*func)(data1); 01226 /* leave from Ruby world: You can not access Ruby values, etc. */ 01227 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg); 01228 return r; 01229 } 01230 01231 /* 01232 * ruby_thread_has_gvl_p - check if current native thread has GVL. 01233 * 01234 *** 01235 *** This API is EXPERIMENTAL! 01236 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01237 *** 01238 */ 01239 01240 int 01241 ruby_thread_has_gvl_p(void) 01242 { 01243 rb_thread_t *th = ruby_thread_from_native(); 01244 01245 if (th && th->blocking_region_buffer == 0) { 01246 return 1; 01247 } 01248 else { 01249 return 0; 01250 } 01251 } 01252 01253 /* 01254 * call-seq: 01255 * Thread.pass -> nil 01256 * 01257 * Give the thread scheduler a hint to pass execution to another thread. 01258 * A running thread may or may not switch, it depends on OS and processor. 01259 */ 01260 01261 static VALUE 01262 thread_s_pass(VALUE klass) 01263 { 01264 rb_thread_schedule(); 01265 return Qnil; 01266 } 01267 01268 /* 01269 * 01270 */ 01271 01272 static void 01273 rb_threadptr_execute_interrupts_common(rb_thread_t *th) 01274 { 01275 rb_atomic_t interrupt; 01276 01277 if (th->raised_flag) return; 01278 01279 while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) { 01280 enum rb_thread_status status = th->status; 01281 int timer_interrupt = interrupt & 0x01; 01282 int finalizer_interrupt = interrupt & 0x04; 01283 int sig; 01284 01285 th->status = THREAD_RUNNABLE; 01286 01287 /* signal handling */ 01288 if (th == th->vm->main_thread) { 01289 while ((sig = rb_get_next_signal()) != 0) { 01290 rb_signal_exec(th, sig); 01291 } 01292 } 01293 01294 /* exception from another thread */ 01295 if (th->thrown_errinfo) { 01296 VALUE err = th->thrown_errinfo; 01297 th->thrown_errinfo = 0; 01298 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); 01299 01300 if (err == eKillSignal || err == eTerminateSignal) { 01301 th->errinfo = INT2FIX(TAG_FATAL); 01302 TH_JUMP_TAG(th, TAG_FATAL); 01303 } 01304 else { 01305 rb_exc_raise(err); 01306 } 01307 } 01308 th->status = status; 01309 01310 if (finalizer_interrupt) { 01311 rb_gc_finalize_deferred(); 01312 } 01313 01314 if (timer_interrupt) { 01315 unsigned long limits_us = 250 * 1000; 01316 01317 if (th->priority > 0) 01318 limits_us <<= th->priority; 01319 else 01320 limits_us >>= -th->priority; 01321 01322 if (status == THREAD_RUNNABLE) 01323 th->running_time_us += TIME_QUANTUM_USEC; 01324 01325 EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); 01326 01327 rb_thread_schedule_limits(limits_us); 01328 } 01329 } 01330 } 01331 01332 void 01333 rb_threadptr_execute_interrupts(rb_thread_t *th) 01334 { 01335 rb_threadptr_execute_interrupts_common(th); 01336 } 01337 01338 void 01339 rb_thread_execute_interrupts(VALUE thval) 01340 { 01341 rb_thread_t *th; 01342 GetThreadPtr(thval, th); 01343 rb_threadptr_execute_interrupts_common(th); 01344 } 01345 01346 void 01347 rb_gc_mark_threads(void) 01348 { 01349 rb_bug("deprecated function rb_gc_mark_threads is called"); 01350 } 01351 01352 /*****************************************************/ 01353 01354 static void 01355 rb_threadptr_ready(rb_thread_t *th) 01356 { 01357 rb_threadptr_interrupt(th); 01358 } 01359 01360 static VALUE 01361 rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) 01362 { 01363 VALUE exc; 01364 01365 again: 01366 if (rb_threadptr_dead(th)) { 01367 return Qnil; 01368 } 01369 01370 if (th->thrown_errinfo != 0 || th->raised_flag) { 01371 rb_thread_schedule(); 01372 goto again; 01373 } 01374 01375 exc = rb_make_exception(argc, argv); 01376 th->thrown_errinfo = exc; 01377 rb_threadptr_ready(th); 01378 return Qnil; 01379 } 01380 01381 void 01382 rb_threadptr_signal_raise(rb_thread_t *th, int sig) 01383 { 01384 VALUE argv[2]; 01385 01386 argv[0] = rb_eSignal; 01387 argv[1] = INT2FIX(sig); 01388 rb_threadptr_raise(th->vm->main_thread, 2, argv); 01389 } 01390 01391 void 01392 rb_threadptr_signal_exit(rb_thread_t *th) 01393 { 01394 VALUE argv[2]; 01395 01396 argv[0] = rb_eSystemExit; 01397 argv[1] = rb_str_new2("exit"); 01398 rb_threadptr_raise(th->vm->main_thread, 2, argv); 01399 } 01400 01401 #if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK) 01402 #define USE_SIGALTSTACK 01403 #endif 01404 01405 void 01406 ruby_thread_stack_overflow(rb_thread_t *th) 01407 { 01408 th->raised_flag = 0; 01409 #ifdef USE_SIGALTSTACK 01410 rb_exc_raise(sysstack_error); 01411 #else 01412 th->errinfo = sysstack_error; 01413 TH_JUMP_TAG(th, TAG_RAISE); 01414 #endif 01415 } 01416 01417 int 01418 rb_threadptr_set_raised(rb_thread_t *th) 01419 { 01420 if (th->raised_flag & RAISED_EXCEPTION) { 01421 return 1; 01422 } 01423 th->raised_flag |= RAISED_EXCEPTION; 01424 return 0; 01425 } 01426 01427 int 01428 rb_threadptr_reset_raised(rb_thread_t *th) 01429 { 01430 if (!(th->raised_flag & RAISED_EXCEPTION)) { 01431 return 0; 01432 } 01433 th->raised_flag &= ~RAISED_EXCEPTION; 01434 return 1; 01435 } 01436 01437 #define THREAD_IO_WAITING_P(th) ( \ 01438 ((th)->status == THREAD_STOPPED || \ 01439 (th)->status == THREAD_STOPPED_FOREVER) && \ 01440 (th)->blocking_region_buffer && \ 01441 (th)->unblock.func == ubf_select && \ 01442 1) 01443 01444 static int 01445 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) 01446 { 01447 int fd = (int)data; 01448 rb_thread_t *th; 01449 GetThreadPtr((VALUE)key, th); 01450 01451 if (THREAD_IO_WAITING_P(th)) { 01452 native_mutex_lock(&th->interrupt_lock); 01453 if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) { 01454 th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream]; 01455 RUBY_VM_SET_INTERRUPT(th); 01456 (th->unblock.func)(th->unblock.arg); 01457 } 01458 native_mutex_unlock(&th->interrupt_lock); 01459 } 01460 return ST_CONTINUE; 01461 } 01462 01463 void 01464 rb_thread_fd_close(int fd) 01465 { 01466 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd); 01467 } 01468 01469 /* 01470 * call-seq: 01471 * thr.raise 01472 * thr.raise(string) 01473 * thr.raise(exception [, string [, array]]) 01474 * 01475 * Raises an exception (see <code>Kernel::raise</code>) from <i>thr</i>. The 01476 * caller does not have to be <i>thr</i>. 01477 * 01478 * Thread.abort_on_exception = true 01479 * a = Thread.new { sleep(200) } 01480 * a.raise("Gotcha") 01481 * 01482 * <em>produces:</em> 01483 * 01484 * prog.rb:3: Gotcha (RuntimeError) 01485 * from prog.rb:2:in `initialize' 01486 * from prog.rb:2:in `new' 01487 * from prog.rb:2 01488 */ 01489 01490 static VALUE 01491 thread_raise_m(int argc, VALUE *argv, VALUE self) 01492 { 01493 rb_thread_t *th; 01494 GetThreadPtr(self, th); 01495 rb_threadptr_raise(th, argc, argv); 01496 return Qnil; 01497 } 01498 01499 01500 /* 01501 * call-seq: 01502 * thr.exit -> thr or nil 01503 * thr.kill -> thr or nil 01504 * thr.terminate -> thr or nil 01505 * 01506 * Terminates <i>thr</i> and schedules another thread to be run. If this thread 01507 * is already marked to be killed, <code>exit</code> returns the 01508 * <code>Thread</code>. If this is the main thread, or the last thread, exits 01509 * the process. 01510 */ 01511 01512 VALUE 01513 rb_thread_kill(VALUE thread) 01514 { 01515 rb_thread_t *th; 01516 01517 GetThreadPtr(thread, th); 01518 01519 if (th != GET_THREAD() && th->safe_level < 4) { 01520 rb_secure(4); 01521 } 01522 if (th->status == THREAD_TO_KILL || th->status == THREAD_KILLED) { 01523 return thread; 01524 } 01525 if (th == th->vm->main_thread) { 01526 rb_exit(EXIT_SUCCESS); 01527 } 01528 01529 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); 01530 01531 rb_threadptr_interrupt(th); 01532 th->thrown_errinfo = eKillSignal; 01533 th->status = THREAD_TO_KILL; 01534 01535 return thread; 01536 } 01537 01538 01539 /* 01540 * call-seq: 01541 * Thread.kill(thread) -> thread 01542 * 01543 * Causes the given <em>thread</em> to exit (see <code>Thread::exit</code>). 01544 * 01545 * count = 0 01546 * a = Thread.new { loop { count += 1 } } 01547 * sleep(0.1) #=> 0 01548 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead> 01549 * count #=> 93947 01550 * a.alive? #=> false 01551 */ 01552 01553 static VALUE 01554 rb_thread_s_kill(VALUE obj, VALUE th) 01555 { 01556 return rb_thread_kill(th); 01557 } 01558 01559 01560 /* 01561 * call-seq: 01562 * Thread.exit -> thread 01563 * 01564 * Terminates the currently running thread and schedules another thread to be 01565 * run. If this thread is already marked to be killed, <code>exit</code> 01566 * returns the <code>Thread</code>. If this is the main thread, or the last 01567 * thread, exit the process. 01568 */ 01569 01570 static VALUE 01571 rb_thread_exit(void) 01572 { 01573 return rb_thread_kill(GET_THREAD()->self); 01574 } 01575 01576 01577 /* 01578 * call-seq: 01579 * thr.wakeup -> thr 01580 * 01581 * Marks <i>thr</i> as eligible for scheduling (it may still remain blocked on 01582 * I/O, however). Does not invoke the scheduler (see <code>Thread#run</code>). 01583 * 01584 * c = Thread.new { Thread.stop; puts "hey!" } 01585 * sleep 0.1 while c.status!='sleep' 01586 * c.wakeup 01587 * c.join 01588 * 01589 * <em>produces:</em> 01590 * 01591 * hey! 01592 */ 01593 01594 VALUE 01595 rb_thread_wakeup(VALUE thread) 01596 { 01597 if (!RTEST(rb_thread_wakeup_alive(thread))) { 01598 rb_raise(rb_eThreadError, "killed thread"); 01599 } 01600 return thread; 01601 } 01602 01603 VALUE 01604 rb_thread_wakeup_alive(VALUE thread) 01605 { 01606 rb_thread_t *th; 01607 GetThreadPtr(thread, th); 01608 01609 if (th->status == THREAD_KILLED) { 01610 return Qnil; 01611 } 01612 rb_threadptr_ready(th); 01613 if (th->status != THREAD_TO_KILL) { 01614 th->status = THREAD_RUNNABLE; 01615 } 01616 return thread; 01617 } 01618 01619 01620 /* 01621 * call-seq: 01622 * thr.run -> thr 01623 * 01624 * Wakes up <i>thr</i>, making it eligible for scheduling. 01625 * 01626 * a = Thread.new { puts "a"; Thread.stop; puts "c" } 01627 * sleep 0.1 while a.status!='sleep' 01628 * puts "Got here" 01629 * a.run 01630 * a.join 01631 * 01632 * <em>produces:</em> 01633 * 01634 * a 01635 * Got here 01636 * c 01637 */ 01638 01639 VALUE 01640 rb_thread_run(VALUE thread) 01641 { 01642 rb_thread_wakeup(thread); 01643 rb_thread_schedule(); 01644 return thread; 01645 } 01646 01647 01648 /* 01649 * call-seq: 01650 * Thread.stop -> nil 01651 * 01652 * Stops execution of the current thread, putting it into a ``sleep'' state, 01653 * and schedules execution of another thread. 01654 * 01655 * a = Thread.new { print "a"; Thread.stop; print "c" } 01656 * sleep 0.1 while a.status!='sleep' 01657 * print "b" 01658 * a.run 01659 * a.join 01660 * 01661 * <em>produces:</em> 01662 * 01663 * abc 01664 */ 01665 01666 VALUE 01667 rb_thread_stop(void) 01668 { 01669 if (rb_thread_alone()) { 01670 rb_raise(rb_eThreadError, 01671 "stopping only thread\n\tnote: use sleep to stop forever"); 01672 } 01673 rb_thread_sleep_deadly(); 01674 return Qnil; 01675 } 01676 01677 static int 01678 thread_list_i(st_data_t key, st_data_t val, void *data) 01679 { 01680 VALUE ary = (VALUE)data; 01681 rb_thread_t *th; 01682 GetThreadPtr((VALUE)key, th); 01683 01684 switch (th->status) { 01685 case THREAD_RUNNABLE: 01686 case THREAD_STOPPED: 01687 case THREAD_STOPPED_FOREVER: 01688 case THREAD_TO_KILL: 01689 rb_ary_push(ary, th->self); 01690 default: 01691 break; 01692 } 01693 return ST_CONTINUE; 01694 } 01695 01696 /********************************************************************/ 01697 01698 /* 01699 * call-seq: 01700 * Thread.list -> array 01701 * 01702 * Returns an array of <code>Thread</code> objects for all threads that are 01703 * either runnable or stopped. 01704 * 01705 * Thread.new { sleep(200) } 01706 * Thread.new { 1000000.times {|i| i*i } } 01707 * Thread.new { Thread.stop } 01708 * Thread.list.each {|t| p t} 01709 * 01710 * <em>produces:</em> 01711 * 01712 * #<Thread:0x401b3e84 sleep> 01713 * #<Thread:0x401b3f38 run> 01714 * #<Thread:0x401b3fb0 sleep> 01715 * #<Thread:0x401bdf4c run> 01716 */ 01717 01718 VALUE 01719 rb_thread_list(void) 01720 { 01721 VALUE ary = rb_ary_new(); 01722 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary); 01723 return ary; 01724 } 01725 01726 VALUE 01727 rb_thread_current(void) 01728 { 01729 return GET_THREAD()->self; 01730 } 01731 01732 /* 01733 * call-seq: 01734 * Thread.current -> thread 01735 * 01736 * Returns the currently executing thread. 01737 * 01738 * Thread.current #=> #<Thread:0x401bdf4c run> 01739 */ 01740 01741 static VALUE 01742 thread_s_current(VALUE klass) 01743 { 01744 return rb_thread_current(); 01745 } 01746 01747 VALUE 01748 rb_thread_main(void) 01749 { 01750 return GET_THREAD()->vm->main_thread->self; 01751 } 01752 01753 /* 01754 * call-seq: 01755 * Thread.main -> thread 01756 * 01757 * Returns the main thread. 01758 */ 01759 01760 static VALUE 01761 rb_thread_s_main(VALUE klass) 01762 { 01763 return rb_thread_main(); 01764 } 01765 01766 01767 /* 01768 * call-seq: 01769 * Thread.abort_on_exception -> true or false 01770 * 01771 * Returns the status of the global ``abort on exception'' condition. The 01772 * default is <code>false</code>. When set to <code>true</code>, or if the 01773 * global <code>$DEBUG</code> flag is <code>true</code> (perhaps because the 01774 * command line option <code>-d</code> was specified) all threads will abort 01775 * (the process will <code>exit(0)</code>) if an exception is raised in any 01776 * thread. See also <code>Thread::abort_on_exception=</code>. 01777 */ 01778 01779 static VALUE 01780 rb_thread_s_abort_exc(void) 01781 { 01782 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; 01783 } 01784 01785 01786 /* 01787 * call-seq: 01788 * Thread.abort_on_exception= boolean -> true or false 01789 * 01790 * When set to <code>true</code>, all threads will abort if an exception is 01791 * raised. Returns the new state. 01792 * 01793 * Thread.abort_on_exception = true 01794 * t1 = Thread.new do 01795 * puts "In new thread" 01796 * raise "Exception from thread" 01797 * end 01798 * sleep(1) 01799 * puts "not reached" 01800 * 01801 * <em>produces:</em> 01802 * 01803 * In new thread 01804 * prog.rb:4: Exception from thread (RuntimeError) 01805 * from prog.rb:2:in `initialize' 01806 * from prog.rb:2:in `new' 01807 * from prog.rb:2 01808 */ 01809 01810 static VALUE 01811 rb_thread_s_abort_exc_set(VALUE self, VALUE val) 01812 { 01813 rb_secure(4); 01814 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val); 01815 return val; 01816 } 01817 01818 01819 /* 01820 * call-seq: 01821 * thr.abort_on_exception -> true or false 01822 * 01823 * Returns the status of the thread-local ``abort on exception'' condition for 01824 * <i>thr</i>. The default is <code>false</code>. See also 01825 * <code>Thread::abort_on_exception=</code>. 01826 */ 01827 01828 static VALUE 01829 rb_thread_abort_exc(VALUE thread) 01830 { 01831 rb_thread_t *th; 01832 GetThreadPtr(thread, th); 01833 return th->abort_on_exception ? Qtrue : Qfalse; 01834 } 01835 01836 01837 /* 01838 * call-seq: 01839 * thr.abort_on_exception= boolean -> true or false 01840 * 01841 * When set to <code>true</code>, causes all threads (including the main 01842 * program) to abort if an exception is raised in <i>thr</i>. The process will 01843 * effectively <code>exit(0)</code>. 01844 */ 01845 01846 static VALUE 01847 rb_thread_abort_exc_set(VALUE thread, VALUE val) 01848 { 01849 rb_thread_t *th; 01850 rb_secure(4); 01851 01852 GetThreadPtr(thread, th); 01853 th->abort_on_exception = RTEST(val); 01854 return val; 01855 } 01856 01857 01858 /* 01859 * call-seq: 01860 * thr.group -> thgrp or nil 01861 * 01862 * Returns the <code>ThreadGroup</code> which contains <i>thr</i>, or nil if 01863 * the thread is not a member of any group. 01864 * 01865 * Thread.main.group #=> #<ThreadGroup:0x4029d914> 01866 */ 01867 01868 VALUE 01869 rb_thread_group(VALUE thread) 01870 { 01871 rb_thread_t *th; 01872 VALUE group; 01873 GetThreadPtr(thread, th); 01874 group = th->thgroup; 01875 01876 if (!group) { 01877 group = Qnil; 01878 } 01879 return group; 01880 } 01881 01882 static const char * 01883 thread_status_name(enum rb_thread_status status) 01884 { 01885 switch (status) { 01886 case THREAD_RUNNABLE: 01887 return "run"; 01888 case THREAD_STOPPED: 01889 case THREAD_STOPPED_FOREVER: 01890 return "sleep"; 01891 case THREAD_TO_KILL: 01892 return "aborting"; 01893 case THREAD_KILLED: 01894 return "dead"; 01895 default: 01896 return "unknown"; 01897 } 01898 } 01899 01900 static int 01901 rb_threadptr_dead(rb_thread_t *th) 01902 { 01903 return th->status == THREAD_KILLED; 01904 } 01905 01906 01907 /* 01908 * call-seq: 01909 * thr.status -> string, false or nil 01910 * 01911 * Returns the status of <i>thr</i>: ``<code>sleep</code>'' if <i>thr</i> is 01912 * sleeping or waiting on I/O, ``<code>run</code>'' if <i>thr</i> is executing, 01913 * ``<code>aborting</code>'' if <i>thr</i> is aborting, <code>false</code> if 01914 * <i>thr</i> terminated normally, and <code>nil</code> if <i>thr</i> 01915 * terminated with an exception. 01916 * 01917 * a = Thread.new { raise("die now") } 01918 * b = Thread.new { Thread.stop } 01919 * c = Thread.new { Thread.exit } 01920 * d = Thread.new { sleep } 01921 * d.kill #=> #<Thread:0x401b3678 aborting> 01922 * a.status #=> nil 01923 * b.status #=> "sleep" 01924 * c.status #=> false 01925 * d.status #=> "aborting" 01926 * Thread.current.status #=> "run" 01927 */ 01928 01929 static VALUE 01930 rb_thread_status(VALUE thread) 01931 { 01932 rb_thread_t *th; 01933 GetThreadPtr(thread, th); 01934 01935 if (rb_threadptr_dead(th)) { 01936 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo) 01937 /* TODO */ ) { 01938 return Qnil; 01939 } 01940 return Qfalse; 01941 } 01942 return rb_str_new2(thread_status_name(th->status)); 01943 } 01944 01945 01946 /* 01947 * call-seq: 01948 * thr.alive? -> true or false 01949 * 01950 * Returns <code>true</code> if <i>thr</i> is running or sleeping. 01951 * 01952 * thr = Thread.new { } 01953 * thr.join #=> #<Thread:0x401b3fb0 dead> 01954 * Thread.current.alive? #=> true 01955 * thr.alive? #=> false 01956 */ 01957 01958 static VALUE 01959 rb_thread_alive_p(VALUE thread) 01960 { 01961 rb_thread_t *th; 01962 GetThreadPtr(thread, th); 01963 01964 if (rb_threadptr_dead(th)) 01965 return Qfalse; 01966 return Qtrue; 01967 } 01968 01969 /* 01970 * call-seq: 01971 * thr.stop? -> true or false 01972 * 01973 * Returns <code>true</code> if <i>thr</i> is dead or sleeping. 01974 * 01975 * a = Thread.new { Thread.stop } 01976 * b = Thread.current 01977 * a.stop? #=> true 01978 * b.stop? #=> false 01979 */ 01980 01981 static VALUE 01982 rb_thread_stop_p(VALUE thread) 01983 { 01984 rb_thread_t *th; 01985 GetThreadPtr(thread, th); 01986 01987 if (rb_threadptr_dead(th)) 01988 return Qtrue; 01989 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 01990 return Qtrue; 01991 return Qfalse; 01992 } 01993 01994 /* 01995 * call-seq: 01996 * thr.safe_level -> integer 01997 * 01998 * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe 01999 * levels can help when implementing sandboxes which run insecure code. 02000 * 02001 * thr = Thread.new { $SAFE = 3; sleep } 02002 * Thread.current.safe_level #=> 0 02003 * thr.safe_level #=> 3 02004 */ 02005 02006 static VALUE 02007 rb_thread_safe_level(VALUE thread) 02008 { 02009 rb_thread_t *th; 02010 GetThreadPtr(thread, th); 02011 02012 return INT2NUM(th->safe_level); 02013 } 02014 02015 /* 02016 * call-seq: 02017 * thr.inspect -> string 02018 * 02019 * Dump the name, id, and status of _thr_ to a string. 02020 */ 02021 02022 static VALUE 02023 rb_thread_inspect(VALUE thread) 02024 { 02025 const char *cname = rb_obj_classname(thread); 02026 rb_thread_t *th; 02027 const char *status; 02028 VALUE str; 02029 02030 GetThreadPtr(thread, th); 02031 status = thread_status_name(th->status); 02032 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status); 02033 OBJ_INFECT(str, thread); 02034 02035 return str; 02036 } 02037 02038 VALUE 02039 rb_thread_local_aref(VALUE thread, ID id) 02040 { 02041 rb_thread_t *th; 02042 st_data_t val; 02043 02044 GetThreadPtr(thread, th); 02045 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02046 rb_raise(rb_eSecurityError, "Insecure: thread locals"); 02047 } 02048 if (!th->local_storage) { 02049 return Qnil; 02050 } 02051 if (st_lookup(th->local_storage, id, &val)) { 02052 return (VALUE)val; 02053 } 02054 return Qnil; 02055 } 02056 02057 /* 02058 * call-seq: 02059 * thr[sym] -> obj or nil 02060 * 02061 * Attribute Reference---Returns the value of a thread-local variable, using 02062 * either a symbol or a string name. If the specified variable does not exist, 02063 * returns <code>nil</code>. 02064 * 02065 * [ 02066 * Thread.new { Thread.current["name"] = "A" }, 02067 * Thread.new { Thread.current[:name] = "B" }, 02068 * Thread.new { Thread.current["name"] = "C" } 02069 * ].each do |th| 02070 * th.join 02071 * puts "#{th.inspect}: #{th[:name]}" 02072 * end 02073 * 02074 * <em>produces:</em> 02075 * 02076 * #<Thread:0x00000002a54220 dead>: A 02077 * #<Thread:0x00000002a541a8 dead>: B 02078 * #<Thread:0x00000002a54130 dead>: C 02079 */ 02080 02081 static VALUE 02082 rb_thread_aref(VALUE thread, VALUE id) 02083 { 02084 return rb_thread_local_aref(thread, rb_to_id(id)); 02085 } 02086 02087 VALUE 02088 rb_thread_local_aset(VALUE thread, ID id, VALUE val) 02089 { 02090 rb_thread_t *th; 02091 GetThreadPtr(thread, th); 02092 02093 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02094 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 02095 } 02096 if (OBJ_FROZEN(thread)) { 02097 rb_error_frozen("thread locals"); 02098 } 02099 if (!th->local_storage) { 02100 th->local_storage = st_init_numtable(); 02101 } 02102 if (NIL_P(val)) { 02103 st_delete_wrap(th->local_storage, id); 02104 return Qnil; 02105 } 02106 st_insert(th->local_storage, id, val); 02107 return val; 02108 } 02109 02110 /* 02111 * call-seq: 02112 * thr[sym] = obj -> obj 02113 * 02114 * Attribute Assignment---Sets or creates the value of a thread-local variable, 02115 * using either a symbol or a string. See also <code>Thread#[]</code>. 02116 */ 02117 02118 static VALUE 02119 rb_thread_aset(VALUE self, VALUE id, VALUE val) 02120 { 02121 return rb_thread_local_aset(self, rb_to_id(id), val); 02122 } 02123 02124 /* 02125 * call-seq: 02126 * thr.key?(sym) -> true or false 02127 * 02128 * Returns <code>true</code> if the given string (or symbol) exists as a 02129 * thread-local variable. 02130 * 02131 * me = Thread.current 02132 * me[:oliver] = "a" 02133 * me.key?(:oliver) #=> true 02134 * me.key?(:stanley) #=> false 02135 */ 02136 02137 static VALUE 02138 rb_thread_key_p(VALUE self, VALUE key) 02139 { 02140 rb_thread_t *th; 02141 ID id = rb_to_id(key); 02142 02143 GetThreadPtr(self, th); 02144 02145 if (!th->local_storage) { 02146 return Qfalse; 02147 } 02148 if (st_lookup(th->local_storage, id, 0)) { 02149 return Qtrue; 02150 } 02151 return Qfalse; 02152 } 02153 02154 static int 02155 thread_keys_i(ID key, VALUE value, VALUE ary) 02156 { 02157 rb_ary_push(ary, ID2SYM(key)); 02158 return ST_CONTINUE; 02159 } 02160 02161 static int 02162 vm_living_thread_num(rb_vm_t *vm) 02163 { 02164 return vm->living_threads->num_entries; 02165 } 02166 02167 int 02168 rb_thread_alone(void) 02169 { 02170 int num = 1; 02171 if (GET_THREAD()->vm->living_threads) { 02172 num = vm_living_thread_num(GET_THREAD()->vm); 02173 thread_debug("rb_thread_alone: %d\n", num); 02174 } 02175 return num == 1; 02176 } 02177 02178 /* 02179 * call-seq: 02180 * thr.keys -> array 02181 * 02182 * Returns an an array of the names of the thread-local variables (as Symbols). 02183 * 02184 * thr = Thread.new do 02185 * Thread.current[:cat] = 'meow' 02186 * Thread.current["dog"] = 'woof' 02187 * end 02188 * thr.join #=> #<Thread:0x401b3f10 dead> 02189 * thr.keys #=> [:dog, :cat] 02190 */ 02191 02192 static VALUE 02193 rb_thread_keys(VALUE self) 02194 { 02195 rb_thread_t *th; 02196 VALUE ary = rb_ary_new(); 02197 GetThreadPtr(self, th); 02198 02199 if (th->local_storage) { 02200 st_foreach(th->local_storage, thread_keys_i, ary); 02201 } 02202 return ary; 02203 } 02204 02205 /* 02206 * call-seq: 02207 * thr.priority -> integer 02208 * 02209 * Returns the priority of <i>thr</i>. Default is inherited from the 02210 * current thread which creating the new thread, or zero for the 02211 * initial main thread; higher-priority thread will run more frequently 02212 * than lower-priority threads (but lower-priority threads can also run). 02213 * 02214 * This is just hint for Ruby thread scheduler. It may be ignored on some 02215 * platform. 02216 * 02217 * Thread.current.priority #=> 0 02218 */ 02219 02220 static VALUE 02221 rb_thread_priority(VALUE thread) 02222 { 02223 rb_thread_t *th; 02224 GetThreadPtr(thread, th); 02225 return INT2NUM(th->priority); 02226 } 02227 02228 02229 /* 02230 * call-seq: 02231 * thr.priority= integer -> thr 02232 * 02233 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads 02234 * will run more frequently than lower-priority threads (but lower-priority 02235 * threads can also run). 02236 * 02237 * This is just hint for Ruby thread scheduler. It may be ignored on some 02238 * platform. 02239 * 02240 * count1 = count2 = 0 02241 * a = Thread.new do 02242 * loop { count1 += 1 } 02243 * end 02244 * a.priority = -1 02245 * 02246 * b = Thread.new do 02247 * loop { count2 += 1 } 02248 * end 02249 * b.priority = -2 02250 * sleep 1 #=> 1 02251 * count1 #=> 622504 02252 * count2 #=> 5832 02253 */ 02254 02255 static VALUE 02256 rb_thread_priority_set(VALUE thread, VALUE prio) 02257 { 02258 rb_thread_t *th; 02259 int priority; 02260 GetThreadPtr(thread, th); 02261 02262 rb_secure(4); 02263 02264 #if USE_NATIVE_THREAD_PRIORITY 02265 th->priority = NUM2INT(prio); 02266 native_thread_apply_priority(th); 02267 #else 02268 priority = NUM2INT(prio); 02269 if (priority > RUBY_THREAD_PRIORITY_MAX) { 02270 priority = RUBY_THREAD_PRIORITY_MAX; 02271 } 02272 else if (priority < RUBY_THREAD_PRIORITY_MIN) { 02273 priority = RUBY_THREAD_PRIORITY_MIN; 02274 } 02275 th->priority = priority; 02276 #endif 02277 return INT2NUM(th->priority); 02278 } 02279 02280 /* for IO */ 02281 02282 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) 02283 02284 /* 02285 * several Unix platforms support file descriptors bigger than FD_SETSIZE 02286 * in select(2) system call. 02287 * 02288 * - Linux 2.2.12 (?) 02289 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25) 02290 * select(2) documents how to allocate fd_set dynamically. 02291 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0 02292 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19) 02293 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4) 02294 * select(2) documents how to allocate fd_set dynamically. 02295 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4 02296 * - HP-UX documents how to allocate fd_set dynamically. 02297 * http://docs.hp.com/en/B2355-60105/select.2.html 02298 * - Solaris 8 has select_large_fdset 02299 * 02300 * When fd_set is not big enough to hold big file descriptors, 02301 * it should be allocated dynamically. 02302 * Note that this assumes fd_set is structured as bitmap. 02303 * 02304 * rb_fd_init allocates the memory. 02305 * rb_fd_term free the memory. 02306 * rb_fd_set may re-allocates bitmap. 02307 * 02308 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE. 02309 */ 02310 02311 void 02312 rb_fd_init(rb_fdset_t *fds) 02313 { 02314 fds->maxfd = 0; 02315 fds->fdset = ALLOC(fd_set); 02316 FD_ZERO(fds->fdset); 02317 } 02318 02319 void 02320 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 02321 { 02322 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02323 02324 if (size < sizeof(fd_set)) 02325 size = sizeof(fd_set); 02326 dst->maxfd = src->maxfd; 02327 dst->fdset = xmalloc(size); 02328 memcpy(dst->fdset, src->fdset, size); 02329 } 02330 02331 void 02332 rb_fd_term(rb_fdset_t *fds) 02333 { 02334 if (fds->fdset) xfree(fds->fdset); 02335 fds->maxfd = 0; 02336 fds->fdset = 0; 02337 } 02338 02339 void 02340 rb_fd_zero(rb_fdset_t *fds) 02341 { 02342 if (fds->fdset) 02343 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); 02344 } 02345 02346 static void 02347 rb_fd_resize(int n, rb_fdset_t *fds) 02348 { 02349 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); 02350 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); 02351 02352 if (m < sizeof(fd_set)) m = sizeof(fd_set); 02353 if (o < sizeof(fd_set)) o = sizeof(fd_set); 02354 02355 if (m > o) { 02356 fds->fdset = xrealloc(fds->fdset, m); 02357 memset((char *)fds->fdset + o, 0, m - o); 02358 } 02359 if (n >= fds->maxfd) fds->maxfd = n + 1; 02360 } 02361 02362 void 02363 rb_fd_set(int n, rb_fdset_t *fds) 02364 { 02365 rb_fd_resize(n, fds); 02366 FD_SET(n, fds->fdset); 02367 } 02368 02369 void 02370 rb_fd_clr(int n, rb_fdset_t *fds) 02371 { 02372 if (n >= fds->maxfd) return; 02373 FD_CLR(n, fds->fdset); 02374 } 02375 02376 int 02377 rb_fd_isset(int n, const rb_fdset_t *fds) 02378 { 02379 if (n >= fds->maxfd) return 0; 02380 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */ 02381 } 02382 02383 void 02384 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) 02385 { 02386 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); 02387 02388 if (size < sizeof(fd_set)) size = sizeof(fd_set); 02389 dst->maxfd = max; 02390 dst->fdset = xrealloc(dst->fdset, size); 02391 memcpy(dst->fdset, src, size); 02392 } 02393 02394 static void 02395 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 02396 { 02397 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02398 02399 if (size > sizeof(fd_set)) { 02400 rb_raise(rb_eArgError, "too large fdsets"); 02401 } 02402 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set)); 02403 } 02404 02405 void 02406 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) 02407 { 02408 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02409 02410 if (size < sizeof(fd_set)) 02411 size = sizeof(fd_set); 02412 dst->maxfd = src->maxfd; 02413 dst->fdset = xrealloc(dst->fdset, size); 02414 memcpy(dst->fdset, src->fdset, size); 02415 } 02416 02417 int 02418 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) 02419 { 02420 fd_set *r = NULL, *w = NULL, *e = NULL; 02421 if (readfds) { 02422 rb_fd_resize(n - 1, readfds); 02423 r = rb_fd_ptr(readfds); 02424 } 02425 if (writefds) { 02426 rb_fd_resize(n - 1, writefds); 02427 w = rb_fd_ptr(writefds); 02428 } 02429 if (exceptfds) { 02430 rb_fd_resize(n - 1, exceptfds); 02431 e = rb_fd_ptr(exceptfds); 02432 } 02433 return select(n, r, w, e, timeout); 02434 } 02435 02436 #undef FD_ZERO 02437 #undef FD_SET 02438 #undef FD_CLR 02439 #undef FD_ISSET 02440 02441 #define FD_ZERO(f) rb_fd_zero(f) 02442 #define FD_SET(i, f) rb_fd_set((i), (f)) 02443 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 02444 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 02445 02446 #elif defined(_WIN32) 02447 02448 void 02449 rb_fd_init(rb_fdset_t *set) 02450 { 02451 set->capa = FD_SETSIZE; 02452 set->fdset = ALLOC(fd_set); 02453 FD_ZERO(set->fdset); 02454 } 02455 02456 void 02457 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 02458 { 02459 rb_fd_init(dst); 02460 rb_fd_dup(dst, src); 02461 } 02462 02463 static void 02464 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 02465 { 02466 int max = rb_fd_max(src); 02467 02468 /* we assume src is the result of select() with dst, so dst should be 02469 * larger or equal than src. */ 02470 if (max > FD_SETSIZE || max > dst->fd_count) { 02471 rb_raise(rb_eArgError, "too large fdsets"); 02472 } 02473 02474 memcpy(dst->fd_array, src->fdset->fd_array, max); 02475 dst->fd_count = max; 02476 } 02477 02478 void 02479 rb_fd_term(rb_fdset_t *set) 02480 { 02481 xfree(set->fdset); 02482 set->fdset = NULL; 02483 set->capa = 0; 02484 } 02485 02486 void 02487 rb_fd_set(int fd, rb_fdset_t *set) 02488 { 02489 unsigned int i; 02490 SOCKET s = rb_w32_get_osfhandle(fd); 02491 02492 for (i = 0; i < set->fdset->fd_count; i++) { 02493 if (set->fdset->fd_array[i] == s) { 02494 return; 02495 } 02496 } 02497 if (set->fdset->fd_count >= (unsigned)set->capa) { 02498 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; 02499 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); 02500 } 02501 set->fdset->fd_array[set->fdset->fd_count++] = s; 02502 } 02503 02504 #undef FD_ZERO 02505 #undef FD_SET 02506 #undef FD_CLR 02507 #undef FD_ISSET 02508 02509 #define FD_ZERO(f) rb_fd_zero(f) 02510 #define FD_SET(i, f) rb_fd_set((i), (f)) 02511 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 02512 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 02513 02514 #else 02515 #define rb_fd_rcopy(d, s) (*(d) = *(s)) 02516 #endif 02517 02518 #if defined(__CYGWIN__) 02519 static long 02520 cmp_tv(const struct timeval *a, const struct timeval *b) 02521 { 02522 long d = (a->tv_sec - b->tv_sec); 02523 return (d != 0) ? d : (a->tv_usec - b->tv_usec); 02524 } 02525 02526 static int 02527 subtract_tv(struct timeval *rest, const struct timeval *wait) 02528 { 02529 if (rest->tv_sec < wait->tv_sec) { 02530 return 0; 02531 } 02532 while (rest->tv_usec < wait->tv_usec) { 02533 if (rest->tv_sec <= wait->tv_sec) { 02534 return 0; 02535 } 02536 rest->tv_sec -= 1; 02537 rest->tv_usec += 1000 * 1000; 02538 } 02539 rest->tv_sec -= wait->tv_sec; 02540 rest->tv_usec -= wait->tv_usec; 02541 return rest->tv_sec != 0 || rest->tv_usec != 0; 02542 } 02543 #endif 02544 02545 static int 02546 do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, 02547 struct timeval *timeout) 02548 { 02549 int result, lerrno; 02550 rb_fdset_t UNINITIALIZED_VAR(orig_read); 02551 rb_fdset_t UNINITIALIZED_VAR(orig_write); 02552 rb_fdset_t UNINITIALIZED_VAR(orig_except); 02553 double limit = 0; 02554 struct timeval wait_rest; 02555 # if defined(__CYGWIN__) 02556 struct timeval start_time; 02557 # endif 02558 02559 if (timeout) { 02560 # if defined(__CYGWIN__) 02561 gettimeofday(&start_time, NULL); 02562 limit = (double)start_time.tv_sec + (double)start_time.tv_usec*1e-6; 02563 # else 02564 limit = timeofday(); 02565 # endif 02566 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; 02567 wait_rest = *timeout; 02568 timeout = &wait_rest; 02569 } 02570 02571 if (read) 02572 rb_fd_init_copy(&orig_read, read); 02573 if (write) 02574 rb_fd_init_copy(&orig_write, write); 02575 if (except) 02576 rb_fd_init_copy(&orig_except, except); 02577 02578 retry: 02579 lerrno = 0; 02580 02581 #if defined(__CYGWIN__) 02582 { 02583 int finish = 0; 02584 /* polling duration: 100ms */ 02585 struct timeval wait_100ms, *wait; 02586 wait_100ms.tv_sec = 0; 02587 wait_100ms.tv_usec = 100 * 1000; /* 100 ms */ 02588 02589 do { 02590 wait = (timeout == 0 || cmp_tv(&wait_100ms, timeout) < 0) ? &wait_100ms : timeout; 02591 BLOCKING_REGION({ 02592 do { 02593 result = rb_fd_select(n, read, write, except, wait); 02594 if (result < 0) lerrno = errno; 02595 if (result != 0) break; 02596 02597 if (read) 02598 rb_fd_dup(read, &orig_read); 02599 if (write) 02600 rb_fd_dup(write, &orig_write); 02601 if (except) 02602 rb_fd_dup(except, &orig_except); 02603 if (timeout) { 02604 struct timeval elapsed; 02605 gettimeofday(&elapsed, NULL); 02606 subtract_tv(&elapsed, &start_time); 02607 gettimeofday(&start_time, NULL); 02608 if (!subtract_tv(timeout, &elapsed)) { 02609 finish = 1; 02610 break; 02611 } 02612 if (cmp_tv(&wait_100ms, timeout) > 0) wait = timeout; 02613 } 02614 } while (__th->interrupt_flag == 0); 02615 }, 0, 0); 02616 } while (result == 0 && !finish); 02617 } 02618 #elif defined(_WIN32) 02619 { 02620 rb_thread_t *th = GET_THREAD(); 02621 BLOCKING_REGION({ 02622 result = native_fd_select(n, read, write, except, timeout, th); 02623 if (result < 0) lerrno = errno; 02624 }, ubf_select, th); 02625 } 02626 #else 02627 BLOCKING_REGION({ 02628 result = rb_fd_select(n, read, write, except, timeout); 02629 if (result < 0) lerrno = errno; 02630 }, ubf_select, GET_THREAD()); 02631 #endif 02632 02633 errno = lerrno; 02634 02635 if (result < 0) { 02636 switch (errno) { 02637 case EINTR: 02638 #ifdef ERESTART 02639 case ERESTART: 02640 #endif 02641 if (read) 02642 rb_fd_dup(read, &orig_read); 02643 if (write) 02644 rb_fd_dup(write, &orig_write); 02645 if (except) 02646 rb_fd_dup(except, &orig_except); 02647 02648 if (timeout) { 02649 double d = limit - timeofday(); 02650 02651 wait_rest.tv_sec = (unsigned int)d; 02652 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6); 02653 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0; 02654 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0; 02655 } 02656 02657 goto retry; 02658 default: 02659 break; 02660 } 02661 } 02662 02663 if (read) 02664 rb_fd_term(&orig_read); 02665 if (write) 02666 rb_fd_term(&orig_write); 02667 if (except) 02668 rb_fd_term(&orig_except); 02669 02670 return result; 02671 } 02672 02673 static void 02674 rb_thread_wait_fd_rw(int fd, int read) 02675 { 02676 int result = 0; 02677 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; 02678 02679 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); 02680 02681 if (fd < 0) { 02682 rb_raise(rb_eIOError, "closed stream"); 02683 } 02684 if (rb_thread_alone()) return; 02685 while (result <= 0) { 02686 result = rb_wait_for_single_fd(fd, events, NULL); 02687 02688 if (result < 0) { 02689 rb_sys_fail(0); 02690 } 02691 } 02692 02693 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); 02694 } 02695 02696 void 02697 rb_thread_wait_fd(int fd) 02698 { 02699 rb_thread_wait_fd_rw(fd, 1); 02700 } 02701 02702 int 02703 rb_thread_fd_writable(int fd) 02704 { 02705 rb_thread_wait_fd_rw(fd, 0); 02706 return TRUE; 02707 } 02708 02709 int 02710 rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except, 02711 struct timeval *timeout) 02712 { 02713 rb_fdset_t fdsets[3]; 02714 rb_fdset_t *rfds = NULL; 02715 rb_fdset_t *wfds = NULL; 02716 rb_fdset_t *efds = NULL; 02717 int retval; 02718 02719 if (read) { 02720 rfds = &fdsets[0]; 02721 rb_fd_init(rfds); 02722 rb_fd_copy(rfds, read, max); 02723 } 02724 if (write) { 02725 wfds = &fdsets[1]; 02726 rb_fd_init(wfds); 02727 rb_fd_copy(wfds, write, max); 02728 } 02729 if (except) { 02730 efds = &fdsets[2]; 02731 rb_fd_init(efds); 02732 rb_fd_copy(efds, except, max); 02733 } 02734 02735 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout); 02736 02737 if (rfds) { 02738 rb_fd_rcopy(read, rfds); 02739 rb_fd_term(rfds); 02740 } 02741 if (wfds) { 02742 rb_fd_rcopy(write, wfds); 02743 rb_fd_term(wfds); 02744 } 02745 if (efds) { 02746 rb_fd_rcopy(except, efds); 02747 rb_fd_term(efds); 02748 } 02749 02750 return retval; 02751 } 02752 02753 int 02754 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, 02755 struct timeval *timeout) 02756 { 02757 if (!read && !write && !except) { 02758 if (!timeout) { 02759 rb_thread_sleep_forever(); 02760 return 0; 02761 } 02762 rb_thread_wait_for(*timeout); 02763 return 0; 02764 } 02765 02766 if (read) { 02767 rb_fd_resize(max - 1, read); 02768 } 02769 if (write) { 02770 rb_fd_resize(max - 1, write); 02771 } 02772 if (except) { 02773 rb_fd_resize(max - 1, except); 02774 } 02775 return do_select(max, read, write, except, timeout); 02776 } 02777 02778 /* 02779 * poll() is supported by many OSes, but so far Linux is the only 02780 * one we know of that supports using poll() in all places select() 02781 * would work. 02782 */ 02783 #if defined(HAVE_POLL) && defined(linux) 02784 # define USE_POLL 02785 #endif 02786 02787 #ifdef USE_POLL 02788 02789 /* The same with linux kernel. TODO: make platform independent definition. */ 02790 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR) 02791 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) 02792 #define POLLEX_SET (POLLPRI) 02793 02794 #define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0)) 02795 #define TIMET_MIN (~(time_t)0 <= 0 ? (time_t)(((unsigned_time_t)1) << (sizeof(time_t) * CHAR_BIT - 1)) : (time_t)0) 02796 02797 #ifndef HAVE_PPOLL 02798 /* TODO: don't ignore sigmask */ 02799 int ppoll(struct pollfd *fds, nfds_t nfds, 02800 const struct timespec *ts, const sigset_t *sigmask) 02801 { 02802 int timeout_ms; 02803 02804 if (ts) { 02805 int tmp, tmp2; 02806 02807 if (ts->tv_sec > TIMET_MAX/1000) 02808 timeout_ms = -1; 02809 else { 02810 tmp = ts->tv_sec * 1000; 02811 tmp2 = ts->tv_nsec / (1000 * 1000); 02812 if (TIMET_MAX - tmp < tmp2) 02813 timeout_ms = -1; 02814 else 02815 timeout_ms = tmp + tmp2; 02816 } 02817 } else 02818 timeout_ms = -1; 02819 02820 return poll(fds, nfds, timeout_ms); 02821 } 02822 #endif 02823 02824 /* 02825 * returns a mask of events 02826 */ 02827 int 02828 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 02829 { 02830 struct pollfd fds; 02831 int result, lerrno; 02832 double limit = 0; 02833 struct timespec ts; 02834 struct timespec *timeout = NULL; 02835 02836 if (tv) { 02837 ts.tv_sec = tv->tv_sec; 02838 ts.tv_nsec = tv->tv_usec * 1000; 02839 limit = timeofday(); 02840 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; 02841 timeout = &ts; 02842 } 02843 02844 fds.fd = fd; 02845 fds.events = (short)events; 02846 02847 retry: 02848 lerrno = 0; 02849 BLOCKING_REGION({ 02850 result = ppoll(&fds, 1, timeout, NULL); 02851 if (result < 0) lerrno = errno; 02852 }, ubf_select, GET_THREAD()); 02853 02854 if (result < 0) { 02855 errno = lerrno; 02856 switch (errno) { 02857 case EINTR: 02858 #ifdef ERESTART 02859 case ERESTART: 02860 #endif 02861 if (timeout) { 02862 double d = limit - timeofday(); 02863 02864 ts.tv_sec = (long)d; 02865 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9); 02866 if (ts.tv_sec < 0) 02867 ts.tv_sec = 0; 02868 if (ts.tv_nsec < 0) 02869 ts.tv_nsec = 0; 02870 } 02871 goto retry; 02872 } 02873 return -1; 02874 } 02875 02876 if (fds.revents & POLLNVAL) { 02877 errno = EBADF; 02878 return -1; 02879 } 02880 02881 /* 02882 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit. 02883 * Therefore we need fix it up. 02884 */ 02885 result = 0; 02886 if (fds.revents & POLLIN_SET) 02887 result |= RB_WAITFD_IN; 02888 if (fds.revents & POLLOUT_SET) 02889 result |= RB_WAITFD_OUT; 02890 if (fds.revents & POLLEX_SET) 02891 result |= RB_WAITFD_PRI; 02892 02893 return result; 02894 } 02895 #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ 02896 static rb_fdset_t *init_set_fd(int fd, rb_fdset_t *fds) 02897 { 02898 rb_fd_init(fds); 02899 rb_fd_set(fd, fds); 02900 02901 return fds; 02902 } 02903 02904 struct select_args { 02905 union { 02906 int fd; 02907 int error; 02908 } as; 02909 rb_fdset_t *read; 02910 rb_fdset_t *write; 02911 rb_fdset_t *except; 02912 struct timeval *tv; 02913 }; 02914 02915 static VALUE 02916 select_single(VALUE ptr) 02917 { 02918 struct select_args *args = (struct select_args *)ptr; 02919 int r; 02920 02921 r = rb_thread_fd_select(args->as.fd + 1, 02922 args->read, args->write, args->except, args->tv); 02923 if (r == -1) 02924 args->as.error = errno; 02925 if (r > 0) { 02926 r = 0; 02927 if (args->read && rb_fd_isset(args->as.fd, args->read)) 02928 r |= RB_WAITFD_IN; 02929 if (args->write && rb_fd_isset(args->as.fd, args->write)) 02930 r |= RB_WAITFD_OUT; 02931 if (args->except && rb_fd_isset(args->as.fd, args->except)) 02932 r |= RB_WAITFD_PRI; 02933 } 02934 return (VALUE)r; 02935 } 02936 02937 static VALUE 02938 select_single_cleanup(VALUE ptr) 02939 { 02940 struct select_args *args = (struct select_args *)ptr; 02941 02942 if (args->read) rb_fd_term(args->read); 02943 if (args->write) rb_fd_term(args->write); 02944 if (args->except) rb_fd_term(args->except); 02945 02946 return (VALUE)-1; 02947 } 02948 02949 int 02950 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 02951 { 02952 rb_fdset_t rfds, wfds, efds; 02953 struct select_args args; 02954 int r; 02955 VALUE ptr = (VALUE)&args; 02956 02957 args.as.fd = fd; 02958 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; 02959 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; 02960 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; 02961 args.tv = tv; 02962 02963 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); 02964 if (r == -1) 02965 errno = args.as.error; 02966 02967 return r; 02968 } 02969 #endif /* ! USE_POLL */ 02970 02971 /* 02972 * for GC 02973 */ 02974 02975 #ifdef USE_CONSERVATIVE_STACK_END 02976 void 02977 rb_gc_set_stack_end(VALUE **stack_end_p) 02978 { 02979 VALUE stack_end; 02980 *stack_end_p = &stack_end; 02981 } 02982 #endif 02983 02984 void 02985 rb_gc_save_machine_context(rb_thread_t *th) 02986 { 02987 FLUSH_REGISTER_WINDOWS; 02988 #ifdef __ia64 02989 th->machine_register_stack_end = rb_ia64_bsp(); 02990 #endif 02991 setjmp(th->machine_regs); 02992 } 02993 02994 /* 02995 * 02996 */ 02997 02998 void 02999 rb_threadptr_check_signal(rb_thread_t *mth) 03000 { 03001 /* mth must be main_thread */ 03002 if (rb_signal_buff_size() > 0) { 03003 /* wakeup main thread */ 03004 rb_threadptr_interrupt(mth); 03005 } 03006 } 03007 03008 static void 03009 timer_thread_function(void *arg) 03010 { 03011 rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ 03012 03013 /* for time slice */ 03014 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread); 03015 03016 /* check signal */ 03017 rb_threadptr_check_signal(vm->main_thread); 03018 03019 #if 0 03020 /* prove profiler */ 03021 if (vm->prove_profile.enable) { 03022 rb_thread_t *th = vm->running_thread; 03023 03024 if (vm->during_gc) { 03025 /* GC prove profiling */ 03026 } 03027 } 03028 #endif 03029 } 03030 03031 void 03032 rb_thread_stop_timer_thread(int close_anyway) 03033 { 03034 if (timer_thread_id && native_stop_timer_thread(close_anyway)) { 03035 native_reset_timer_thread(); 03036 } 03037 } 03038 03039 void 03040 rb_thread_reset_timer_thread(void) 03041 { 03042 native_reset_timer_thread(); 03043 } 03044 03045 void 03046 rb_thread_start_timer_thread(void) 03047 { 03048 system_working = 1; 03049 rb_thread_create_timer_thread(); 03050 } 03051 03052 static int 03053 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) 03054 { 03055 int i; 03056 VALUE lines = (VALUE)val; 03057 03058 for (i = 0; i < RARRAY_LEN(lines); i++) { 03059 if (RARRAY_PTR(lines)[i] != Qnil) { 03060 RARRAY_PTR(lines)[i] = INT2FIX(0); 03061 } 03062 } 03063 return ST_CONTINUE; 03064 } 03065 03066 static void 03067 clear_coverage(void) 03068 { 03069 VALUE coverages = rb_get_coverages(); 03070 if (RTEST(coverages)) { 03071 st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0); 03072 } 03073 } 03074 03075 static void 03076 rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t)) 03077 { 03078 rb_thread_t *th = GET_THREAD(); 03079 rb_vm_t *vm = th->vm; 03080 VALUE thval = th->self; 03081 vm->main_thread = th; 03082 03083 gvl_atfork(th->vm); 03084 st_foreach(vm->living_threads, atfork, (st_data_t)th); 03085 st_clear(vm->living_threads); 03086 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id); 03087 vm->sleeper = 0; 03088 clear_coverage(); 03089 } 03090 03091 static int 03092 terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th) 03093 { 03094 VALUE thval = key; 03095 rb_thread_t *th; 03096 GetThreadPtr(thval, th); 03097 03098 if (th != (rb_thread_t *)current_th) { 03099 if (th->keeping_mutexes) { 03100 rb_mutex_abandon_all(th->keeping_mutexes); 03101 } 03102 th->keeping_mutexes = NULL; 03103 thread_cleanup_func(th, TRUE); 03104 } 03105 return ST_CONTINUE; 03106 } 03107 03108 void 03109 rb_thread_atfork(void) 03110 { 03111 rb_thread_atfork_internal(terminate_atfork_i); 03112 GET_THREAD()->join_list_head = 0; 03113 03114 /* We don't want reproduce CVE-2003-0900. */ 03115 rb_reset_random_seed(); 03116 } 03117 03118 static int 03119 terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th) 03120 { 03121 VALUE thval = key; 03122 rb_thread_t *th; 03123 GetThreadPtr(thval, th); 03124 03125 if (th != (rb_thread_t *)current_th) { 03126 thread_cleanup_func_before_exec(th); 03127 } 03128 return ST_CONTINUE; 03129 } 03130 03131 void 03132 rb_thread_atfork_before_exec(void) 03133 { 03134 rb_thread_atfork_internal(terminate_atfork_before_exec_i); 03135 } 03136 03137 struct thgroup { 03138 int enclosed; 03139 VALUE group; 03140 }; 03141 03142 static size_t 03143 thgroup_memsize(const void *ptr) 03144 { 03145 return ptr ? sizeof(struct thgroup) : 0; 03146 } 03147 03148 static const rb_data_type_t thgroup_data_type = { 03149 "thgroup", 03150 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, 03151 }; 03152 03153 /* 03154 * Document-class: ThreadGroup 03155 * 03156 * <code>ThreadGroup</code> provides a means of keeping track of a number of 03157 * threads as a group. A <code>Thread</code> can belong to only one 03158 * <code>ThreadGroup</code> at a time; adding a thread to a new group will 03159 * remove it from any previous group. 03160 * 03161 * Newly created threads belong to the same group as the thread from which they 03162 * were created. 03163 */ 03164 03165 static VALUE 03166 thgroup_s_alloc(VALUE klass) 03167 { 03168 VALUE group; 03169 struct thgroup *data; 03170 03171 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data); 03172 data->enclosed = 0; 03173 data->group = group; 03174 03175 return group; 03176 } 03177 03178 struct thgroup_list_params { 03179 VALUE ary; 03180 VALUE group; 03181 }; 03182 03183 static int 03184 thgroup_list_i(st_data_t key, st_data_t val, st_data_t data) 03185 { 03186 VALUE thread = (VALUE)key; 03187 VALUE ary = ((struct thgroup_list_params *)data)->ary; 03188 VALUE group = ((struct thgroup_list_params *)data)->group; 03189 rb_thread_t *th; 03190 GetThreadPtr(thread, th); 03191 03192 if (th->thgroup == group) { 03193 rb_ary_push(ary, thread); 03194 } 03195 return ST_CONTINUE; 03196 } 03197 03198 /* 03199 * call-seq: 03200 * thgrp.list -> array 03201 * 03202 * Returns an array of all existing <code>Thread</code> objects that belong to 03203 * this group. 03204 * 03205 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>] 03206 */ 03207 03208 static VALUE 03209 thgroup_list(VALUE group) 03210 { 03211 VALUE ary = rb_ary_new(); 03212 struct thgroup_list_params param; 03213 03214 param.ary = ary; 03215 param.group = group; 03216 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param); 03217 return ary; 03218 } 03219 03220 03221 /* 03222 * call-seq: 03223 * thgrp.enclose -> thgrp 03224 * 03225 * Prevents threads from being added to or removed from the receiving 03226 * <code>ThreadGroup</code>. New threads can still be started in an enclosed 03227 * <code>ThreadGroup</code>. 03228 * 03229 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914> 03230 * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep> 03231 * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4> 03232 * tg.add thr 03233 * 03234 * <em>produces:</em> 03235 * 03236 * ThreadError: can't move from the enclosed thread group 03237 */ 03238 03239 static VALUE 03240 thgroup_enclose(VALUE group) 03241 { 03242 struct thgroup *data; 03243 03244 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03245 data->enclosed = 1; 03246 03247 return group; 03248 } 03249 03250 03251 /* 03252 * call-seq: 03253 * thgrp.enclosed? -> true or false 03254 * 03255 * Returns <code>true</code> if <em>thgrp</em> is enclosed. See also 03256 * ThreadGroup#enclose. 03257 */ 03258 03259 static VALUE 03260 thgroup_enclosed_p(VALUE group) 03261 { 03262 struct thgroup *data; 03263 03264 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03265 if (data->enclosed) 03266 return Qtrue; 03267 return Qfalse; 03268 } 03269 03270 03271 /* 03272 * call-seq: 03273 * thgrp.add(thread) -> thgrp 03274 * 03275 * Adds the given <em>thread</em> to this group, removing it from any other 03276 * group to which it may have previously belonged. 03277 * 03278 * puts "Initial group is #{ThreadGroup::Default.list}" 03279 * tg = ThreadGroup.new 03280 * t1 = Thread.new { sleep } 03281 * t2 = Thread.new { sleep } 03282 * puts "t1 is #{t1}" 03283 * puts "t2 is #{t2}" 03284 * tg.add(t1) 03285 * puts "Initial group now #{ThreadGroup::Default.list}" 03286 * puts "tg group now #{tg.list}" 03287 * 03288 * <em>produces:</em> 03289 * 03290 * Initial group is #<Thread:0x401bdf4c> 03291 * t1 is #<Thread:0x401b3c90> 03292 * t2 is #<Thread:0x401b3c18> 03293 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c> 03294 * tg group now #<Thread:0x401b3c90> 03295 */ 03296 03297 static VALUE 03298 thgroup_add(VALUE group, VALUE thread) 03299 { 03300 rb_thread_t *th; 03301 struct thgroup *data; 03302 03303 rb_secure(4); 03304 GetThreadPtr(thread, th); 03305 03306 if (OBJ_FROZEN(group)) { 03307 rb_raise(rb_eThreadError, "can't move to the frozen thread group"); 03308 } 03309 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03310 if (data->enclosed) { 03311 rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); 03312 } 03313 03314 if (!th->thgroup) { 03315 return Qnil; 03316 } 03317 03318 if (OBJ_FROZEN(th->thgroup)) { 03319 rb_raise(rb_eThreadError, "can't move from the frozen thread group"); 03320 } 03321 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data); 03322 if (data->enclosed) { 03323 rb_raise(rb_eThreadError, 03324 "can't move from the enclosed thread group"); 03325 } 03326 03327 th->thgroup = group; 03328 return group; 03329 } 03330 03331 03332 /* 03333 * Document-class: Mutex 03334 * 03335 * Mutex implements a simple semaphore that can be used to coordinate access to 03336 * shared data from multiple concurrent threads. 03337 * 03338 * Example: 03339 * 03340 * require 'thread' 03341 * semaphore = Mutex.new 03342 * 03343 * a = Thread.new { 03344 * semaphore.synchronize { 03345 * # access shared resource 03346 * } 03347 * } 03348 * 03349 * b = Thread.new { 03350 * semaphore.synchronize { 03351 * # access shared resource 03352 * } 03353 * } 03354 * 03355 */ 03356 03357 #define GetMutexPtr(obj, tobj) \ 03358 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) 03359 03360 static const char *rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 03361 03362 #define mutex_mark NULL 03363 03364 static void 03365 mutex_free(void *ptr) 03366 { 03367 if (ptr) { 03368 rb_mutex_t *mutex = ptr; 03369 if (mutex->th) { 03370 /* rb_warn("free locked mutex"); */ 03371 const char *err = rb_mutex_unlock_th(mutex, mutex->th); 03372 if (err) rb_bug("%s", err); 03373 } 03374 native_mutex_destroy(&mutex->lock); 03375 native_cond_destroy(&mutex->cond); 03376 } 03377 ruby_xfree(ptr); 03378 } 03379 03380 static size_t 03381 mutex_memsize(const void *ptr) 03382 { 03383 return ptr ? sizeof(rb_mutex_t) : 0; 03384 } 03385 03386 static const rb_data_type_t mutex_data_type = { 03387 "mutex", 03388 {mutex_mark, mutex_free, mutex_memsize,}, 03389 }; 03390 03391 VALUE 03392 rb_obj_is_mutex(VALUE obj) 03393 { 03394 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { 03395 return Qtrue; 03396 } 03397 else { 03398 return Qfalse; 03399 } 03400 } 03401 03402 static VALUE 03403 mutex_alloc(VALUE klass) 03404 { 03405 VALUE volatile obj; 03406 rb_mutex_t *mutex; 03407 03408 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); 03409 native_mutex_initialize(&mutex->lock); 03410 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); 03411 return obj; 03412 } 03413 03414 /* 03415 * call-seq: 03416 * Mutex.new -> mutex 03417 * 03418 * Creates a new Mutex 03419 */ 03420 static VALUE 03421 mutex_initialize(VALUE self) 03422 { 03423 return self; 03424 } 03425 03426 VALUE 03427 rb_mutex_new(void) 03428 { 03429 return mutex_alloc(rb_cMutex); 03430 } 03431 03432 /* 03433 * call-seq: 03434 * mutex.locked? -> true or false 03435 * 03436 * Returns +true+ if this lock is currently held by some thread. 03437 */ 03438 VALUE 03439 rb_mutex_locked_p(VALUE self) 03440 { 03441 rb_mutex_t *mutex; 03442 GetMutexPtr(self, mutex); 03443 return mutex->th ? Qtrue : Qfalse; 03444 } 03445 03446 static void 03447 mutex_locked(rb_thread_t *th, VALUE self) 03448 { 03449 rb_mutex_t *mutex; 03450 GetMutexPtr(self, mutex); 03451 03452 if (th->keeping_mutexes) { 03453 mutex->next_mutex = th->keeping_mutexes; 03454 } 03455 th->keeping_mutexes = mutex; 03456 } 03457 03458 /* 03459 * call-seq: 03460 * mutex.try_lock -> true or false 03461 * 03462 * Attempts to obtain the lock and returns immediately. Returns +true+ if the 03463 * lock was granted. 03464 */ 03465 VALUE 03466 rb_mutex_trylock(VALUE self) 03467 { 03468 rb_mutex_t *mutex; 03469 VALUE locked = Qfalse; 03470 GetMutexPtr(self, mutex); 03471 03472 native_mutex_lock(&mutex->lock); 03473 if (mutex->th == 0) { 03474 mutex->th = GET_THREAD(); 03475 locked = Qtrue; 03476 03477 mutex_locked(GET_THREAD(), self); 03478 } 03479 native_mutex_unlock(&mutex->lock); 03480 03481 return locked; 03482 } 03483 03484 static int 03485 lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) 03486 { 03487 int interrupted = 0; 03488 int err = 0; 03489 03490 mutex->cond_waiting++; 03491 for (;;) { 03492 if (!mutex->th) { 03493 mutex->th = th; 03494 break; 03495 } 03496 if (RUBY_VM_INTERRUPTED(th)) { 03497 interrupted = 1; 03498 break; 03499 } 03500 if (err == ETIMEDOUT) { 03501 interrupted = 2; 03502 break; 03503 } 03504 03505 if (timeout_ms) { 03506 struct timespec timeout_rel; 03507 struct timespec timeout; 03508 03509 timeout_rel.tv_sec = 0; 03510 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; 03511 timeout = native_cond_timeout(&mutex->cond, timeout_rel); 03512 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); 03513 } 03514 else { 03515 native_cond_wait(&mutex->cond, &mutex->lock); 03516 err = 0; 03517 } 03518 } 03519 mutex->cond_waiting--; 03520 03521 return interrupted; 03522 } 03523 03524 static void 03525 lock_interrupt(void *ptr) 03526 { 03527 rb_mutex_t *mutex = (rb_mutex_t *)ptr; 03528 native_mutex_lock(&mutex->lock); 03529 if (mutex->cond_waiting > 0) 03530 native_cond_broadcast(&mutex->cond); 03531 native_mutex_unlock(&mutex->lock); 03532 } 03533 03534 /* 03535 * At maximum, only one thread can use cond_timedwait and watch deadlock 03536 * periodically. Multiple polling thread (i.e. concurrent deadlock check) 03537 * introduces new race conditions. [Bug #6278] [ruby-core:44275] 03538 */ 03539 rb_thread_t *patrol_thread = NULL; 03540 03541 /* 03542 * call-seq: 03543 * mutex.lock -> self 03544 * 03545 * Attempts to grab the lock and waits if it isn't available. 03546 * Raises +ThreadError+ if +mutex+ was locked by the current thread. 03547 */ 03548 VALUE 03549 rb_mutex_lock(VALUE self) 03550 { 03551 03552 if (rb_mutex_trylock(self) == Qfalse) { 03553 rb_mutex_t *mutex; 03554 rb_thread_t *th = GET_THREAD(); 03555 GetMutexPtr(self, mutex); 03556 03557 if (mutex->th == GET_THREAD()) { 03558 rb_raise(rb_eThreadError, "deadlock; recursive locking"); 03559 } 03560 03561 while (mutex->th != th) { 03562 int interrupted; 03563 enum rb_thread_status prev_status = th->status; 03564 int timeout_ms = 0; 03565 struct rb_unblock_callback oldubf; 03566 03567 set_unblock_function(th, lock_interrupt, mutex, &oldubf); 03568 th->status = THREAD_STOPPED_FOREVER; 03569 th->locking_mutex = self; 03570 03571 native_mutex_lock(&mutex->lock); 03572 th->vm->sleeper++; 03573 /* 03574 * Carefully! while some contended threads are in lock_func(), 03575 * vm->sleepr is unstable value. we have to avoid both deadlock 03576 * and busy loop. 03577 */ 03578 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && 03579 !patrol_thread) { 03580 timeout_ms = 100; 03581 patrol_thread = th; 03582 } 03583 03584 GVL_UNLOCK_BEGIN(); 03585 interrupted = lock_func(th, mutex, timeout_ms); 03586 native_mutex_unlock(&mutex->lock); 03587 GVL_UNLOCK_END(); 03588 03589 if (patrol_thread == th) 03590 patrol_thread = NULL; 03591 03592 reset_unblock_function(th, &oldubf); 03593 03594 th->locking_mutex = Qfalse; 03595 if (mutex->th && interrupted == 2) { 03596 rb_check_deadlock(th->vm); 03597 } 03598 if (th->status == THREAD_STOPPED_FOREVER) { 03599 th->status = prev_status; 03600 } 03601 th->vm->sleeper--; 03602 03603 if (mutex->th == th) mutex_locked(th, self); 03604 03605 if (interrupted) { 03606 RUBY_VM_CHECK_INTS(); 03607 } 03608 } 03609 } 03610 return self; 03611 } 03612 03613 static const char * 03614 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) 03615 { 03616 const char *err = NULL; 03617 rb_mutex_t *th_mutex; 03618 03619 native_mutex_lock(&mutex->lock); 03620 03621 if (mutex->th == 0) { 03622 err = "Attempt to unlock a mutex which is not locked"; 03623 } 03624 else if (mutex->th != th) { 03625 err = "Attempt to unlock a mutex which is locked by another thread"; 03626 } 03627 else { 03628 mutex->th = 0; 03629 if (mutex->cond_waiting > 0) 03630 native_cond_signal(&mutex->cond); 03631 } 03632 03633 native_mutex_unlock(&mutex->lock); 03634 03635 if (!err) { 03636 th_mutex = th->keeping_mutexes; 03637 if (th_mutex == mutex) { 03638 th->keeping_mutexes = mutex->next_mutex; 03639 } 03640 else { 03641 while (1) { 03642 rb_mutex_t *tmp_mutex; 03643 tmp_mutex = th_mutex->next_mutex; 03644 if (tmp_mutex == mutex) { 03645 th_mutex->next_mutex = tmp_mutex->next_mutex; 03646 break; 03647 } 03648 th_mutex = tmp_mutex; 03649 } 03650 } 03651 mutex->next_mutex = NULL; 03652 } 03653 03654 return err; 03655 } 03656 03657 /* 03658 * call-seq: 03659 * mutex.unlock -> self 03660 * 03661 * Releases the lock. 03662 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. 03663 */ 03664 VALUE 03665 rb_mutex_unlock(VALUE self) 03666 { 03667 const char *err; 03668 rb_mutex_t *mutex; 03669 GetMutexPtr(self, mutex); 03670 03671 err = rb_mutex_unlock_th(mutex, GET_THREAD()); 03672 if (err) rb_raise(rb_eThreadError, "%s", err); 03673 03674 return self; 03675 } 03676 03677 static void 03678 rb_mutex_abandon_all(rb_mutex_t *mutexes) 03679 { 03680 rb_mutex_t *mutex; 03681 03682 while (mutexes) { 03683 mutex = mutexes; 03684 mutexes = mutex->next_mutex; 03685 mutex->th = 0; 03686 mutex->next_mutex = 0; 03687 } 03688 } 03689 03690 static VALUE 03691 rb_mutex_sleep_forever(VALUE time) 03692 { 03693 rb_thread_sleep_deadly(); 03694 return Qnil; 03695 } 03696 03697 static VALUE 03698 rb_mutex_wait_for(VALUE time) 03699 { 03700 const struct timeval *t = (struct timeval *)time; 03701 rb_thread_wait_for(*t); 03702 return Qnil; 03703 } 03704 03705 VALUE 03706 rb_mutex_sleep(VALUE self, VALUE timeout) 03707 { 03708 time_t beg, end; 03709 struct timeval t; 03710 03711 if (!NIL_P(timeout)) { 03712 t = rb_time_interval(timeout); 03713 } 03714 rb_mutex_unlock(self); 03715 beg = time(0); 03716 if (NIL_P(timeout)) { 03717 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); 03718 } 03719 else { 03720 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); 03721 } 03722 end = time(0) - beg; 03723 return INT2FIX(end); 03724 } 03725 03726 /* 03727 * call-seq: 03728 * mutex.sleep(timeout = nil) -> number 03729 * 03730 * Releases the lock and sleeps +timeout+ seconds if it is given and 03731 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by 03732 * the current thread. 03733 */ 03734 static VALUE 03735 mutex_sleep(int argc, VALUE *argv, VALUE self) 03736 { 03737 VALUE timeout; 03738 03739 rb_scan_args(argc, argv, "01", &timeout); 03740 return rb_mutex_sleep(self, timeout); 03741 } 03742 03743 /* 03744 * call-seq: 03745 * mutex.synchronize { ... } -> result of the block 03746 * 03747 * Obtains a lock, runs the block, and releases the lock when the block 03748 * completes. See the example under +Mutex+. 03749 */ 03750 03751 VALUE 03752 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) 03753 { 03754 rb_mutex_lock(mutex); 03755 return rb_ensure(func, arg, rb_mutex_unlock, mutex); 03756 } 03757 03758 /* 03759 * Document-class: Barrier 03760 */ 03761 static void 03762 barrier_mark(void *ptr) 03763 { 03764 rb_gc_mark((VALUE)ptr); 03765 } 03766 03767 static const rb_data_type_t barrier_data_type = { 03768 "barrier", 03769 {barrier_mark, 0, 0,}, 03770 }; 03771 03772 static VALUE 03773 barrier_alloc(VALUE klass) 03774 { 03775 return TypedData_Wrap_Struct(klass, &barrier_data_type, (void *)mutex_alloc(0)); 03776 } 03777 03778 #define GetBarrierPtr(obj) ((VALUE)rb_check_typeddata((obj), &barrier_data_type)) 03779 03780 VALUE 03781 rb_barrier_new(void) 03782 { 03783 VALUE barrier = barrier_alloc(rb_cBarrier); 03784 rb_mutex_lock((VALUE)DATA_PTR(barrier)); 03785 return barrier; 03786 } 03787 03788 VALUE 03789 rb_barrier_wait(VALUE self) 03790 { 03791 VALUE mutex = GetBarrierPtr(self); 03792 rb_mutex_t *m; 03793 03794 if (!mutex) return Qfalse; 03795 GetMutexPtr(mutex, m); 03796 if (m->th == GET_THREAD()) return Qfalse; 03797 rb_mutex_lock(mutex); 03798 if (DATA_PTR(self)) return Qtrue; 03799 rb_mutex_unlock(mutex); 03800 return Qfalse; 03801 } 03802 03803 VALUE 03804 rb_barrier_release(VALUE self) 03805 { 03806 return rb_mutex_unlock(GetBarrierPtr(self)); 03807 } 03808 03809 VALUE 03810 rb_barrier_destroy(VALUE self) 03811 { 03812 VALUE mutex = GetBarrierPtr(self); 03813 DATA_PTR(self) = 0; 03814 return rb_mutex_unlock(mutex); 03815 } 03816 03817 /* variables for recursive traversals */ 03818 static ID recursive_key; 03819 03820 /* 03821 * Returns the current "recursive list" used to detect recursion. 03822 * This list is a hash table, unique for the current thread and for 03823 * the current __callee__. 03824 */ 03825 03826 static VALUE 03827 recursive_list_access(void) 03828 { 03829 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key); 03830 VALUE sym = ID2SYM(rb_frame_this_func()); 03831 VALUE list; 03832 if (NIL_P(hash) || TYPE(hash) != T_HASH) { 03833 hash = rb_hash_new(); 03834 OBJ_UNTRUST(hash); 03835 rb_thread_local_aset(rb_thread_current(), recursive_key, hash); 03836 list = Qnil; 03837 } 03838 else { 03839 list = rb_hash_aref(hash, sym); 03840 } 03841 if (NIL_P(list) || TYPE(list) != T_HASH) { 03842 list = rb_hash_new(); 03843 OBJ_UNTRUST(list); 03844 rb_hash_aset(hash, sym, list); 03845 } 03846 return list; 03847 } 03848 03849 /* 03850 * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already 03851 * in the recursion list. 03852 * Assumes the recursion list is valid. 03853 */ 03854 03855 static VALUE 03856 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) 03857 { 03858 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); 03859 if (pair_list == Qundef) 03860 return Qfalse; 03861 if (paired_obj_id) { 03862 if (TYPE(pair_list) != T_HASH) { 03863 if (pair_list != paired_obj_id) 03864 return Qfalse; 03865 } 03866 else { 03867 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) 03868 return Qfalse; 03869 } 03870 } 03871 return Qtrue; 03872 } 03873 03874 /* 03875 * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. 03876 * For a single obj_id, it sets list[obj_id] to Qtrue. 03877 * For a pair, it sets list[obj_id] to paired_obj_id if possible, 03878 * otherwise list[obj_id] becomes a hash like: 03879 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } 03880 * Assumes the recursion list is valid. 03881 */ 03882 03883 static void 03884 recursive_push(VALUE list, VALUE obj, VALUE paired_obj) 03885 { 03886 VALUE pair_list; 03887 03888 if (!paired_obj) { 03889 rb_hash_aset(list, obj, Qtrue); 03890 } 03891 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) { 03892 rb_hash_aset(list, obj, paired_obj); 03893 } 03894 else { 03895 if (TYPE(pair_list) != T_HASH){ 03896 VALUE other_paired_obj = pair_list; 03897 pair_list = rb_hash_new(); 03898 OBJ_UNTRUST(pair_list); 03899 rb_hash_aset(pair_list, other_paired_obj, Qtrue); 03900 rb_hash_aset(list, obj, pair_list); 03901 } 03902 rb_hash_aset(pair_list, paired_obj, Qtrue); 03903 } 03904 } 03905 03906 /* 03907 * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. 03908 * For a pair, if list[obj_id] is a hash, then paired_obj_id is 03909 * removed from the hash and no attempt is made to simplify 03910 * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id 03911 * Assumes the recursion list is valid. 03912 */ 03913 03914 static void 03915 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj) 03916 { 03917 if (paired_obj) { 03918 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); 03919 if (pair_list == Qundef) { 03920 VALUE symname = rb_inspect(ID2SYM(rb_frame_this_func())); 03921 VALUE thrname = rb_inspect(rb_thread_current()); 03922 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list for %s in %s", 03923 StringValuePtr(symname), StringValuePtr(thrname)); 03924 } 03925 if (TYPE(pair_list) == T_HASH) { 03926 rb_hash_delete(pair_list, paired_obj); 03927 if (!RHASH_EMPTY_P(pair_list)) { 03928 return; /* keep hash until is empty */ 03929 } 03930 } 03931 } 03932 rb_hash_delete(list, obj); 03933 } 03934 03935 struct exec_recursive_params { 03936 VALUE (*func) (VALUE, VALUE, int); 03937 VALUE list; 03938 VALUE obj; 03939 VALUE objid; 03940 VALUE pairid; 03941 VALUE arg; 03942 }; 03943 03944 static VALUE 03945 exec_recursive_i(VALUE tag, struct exec_recursive_params *p) 03946 { 03947 VALUE result = Qundef; 03948 int state; 03949 03950 recursive_push(p->list, p->objid, p->pairid); 03951 PUSH_TAG(); 03952 if ((state = EXEC_TAG()) == 0) { 03953 result = (*p->func)(p->obj, p->arg, FALSE); 03954 } 03955 POP_TAG(); 03956 recursive_pop(p->list, p->objid, p->pairid); 03957 if (state) 03958 JUMP_TAG(state); 03959 return result; 03960 } 03961 03962 /* 03963 * Calls func(obj, arg, recursive), where recursive is non-zero if the 03964 * current method is called recursively on obj, or on the pair <obj, pairid> 03965 * If outer is 0, then the innermost func will be called with recursive set 03966 * to Qtrue, otherwise the outermost func will be called. In the latter case, 03967 * all inner func are short-circuited by throw. 03968 * Implementation details: the value thrown is the recursive list which is 03969 * proper to the current method and unlikely to be catched anywhere else. 03970 * list[recursive_key] is used as a flag for the outermost call. 03971 */ 03972 03973 static VALUE 03974 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer) 03975 { 03976 VALUE result = Qundef; 03977 struct exec_recursive_params p; 03978 int outermost; 03979 p.list = recursive_list_access(); 03980 p.objid = rb_obj_id(obj); 03981 p.obj = obj; 03982 p.pairid = pairid; 03983 p.arg = arg; 03984 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); 03985 03986 if (recursive_check(p.list, p.objid, pairid)) { 03987 if (outer && !outermost) { 03988 rb_throw_obj(p.list, p.list); 03989 } 03990 return (*func)(obj, arg, TRUE); 03991 } 03992 else { 03993 p.func = func; 03994 03995 if (outermost) { 03996 recursive_push(p.list, ID2SYM(recursive_key), 0); 03997 result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p); 03998 recursive_pop(p.list, ID2SYM(recursive_key), 0); 03999 if (result == p.list) { 04000 result = (*func)(obj, arg, TRUE); 04001 } 04002 } 04003 else { 04004 result = exec_recursive_i(0, &p); 04005 } 04006 } 04007 *(volatile struct exec_recursive_params *)&p; 04008 return result; 04009 } 04010 04011 /* 04012 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04013 * current method is called recursively on obj 04014 */ 04015 04016 VALUE 04017 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04018 { 04019 return exec_recursive(func, obj, 0, arg, 0); 04020 } 04021 04022 /* 04023 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04024 * current method is called recursively on the ordered pair <obj, paired_obj> 04025 */ 04026 04027 VALUE 04028 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 04029 { 04030 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0); 04031 } 04032 04033 /* 04034 * If recursion is detected on the current method and obj, the outermost 04035 * func will be called with (obj, arg, Qtrue). All inner func will be 04036 * short-circuited using throw. 04037 */ 04038 04039 VALUE 04040 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04041 { 04042 return exec_recursive(func, obj, 0, arg, 1); 04043 } 04044 04045 /* tracer */ 04046 #define RUBY_EVENT_REMOVED 0x1000000 04047 04048 enum { 04049 EVENT_RUNNING_NOTHING, 04050 EVENT_RUNNING_TRACE = 1, 04051 EVENT_RUNNING_THREAD = 2, 04052 EVENT_RUNNING_VM = 4, 04053 EVENT_RUNNING_EVENT_MASK = EVENT_RUNNING_VM|EVENT_RUNNING_THREAD 04054 }; 04055 04056 static VALUE thread_suppress_tracing(rb_thread_t *th, int ev, VALUE (*func)(VALUE, int), VALUE arg, int always); 04057 04058 struct event_call_args { 04059 rb_thread_t *th; 04060 VALUE klass; 04061 VALUE self; 04062 VALUE proc; 04063 ID id; 04064 rb_event_flag_t event; 04065 }; 04066 04067 static rb_event_hook_t * 04068 alloc_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04069 { 04070 rb_event_hook_t *hook = ALLOC(rb_event_hook_t); 04071 hook->func = func; 04072 hook->flag = events; 04073 hook->data = data; 04074 return hook; 04075 } 04076 04077 static void 04078 thread_reset_event_flags(rb_thread_t *th) 04079 { 04080 rb_event_hook_t *hook = th->event_hooks; 04081 rb_event_flag_t flag = th->event_flags & RUBY_EVENT_VM; 04082 04083 while (hook) { 04084 if (!(flag & RUBY_EVENT_REMOVED)) 04085 flag |= hook->flag; 04086 hook = hook->next; 04087 } 04088 th->event_flags = flag; 04089 } 04090 04091 static void 04092 rb_threadptr_add_event_hook(rb_thread_t *th, 04093 rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04094 { 04095 rb_event_hook_t *hook = alloc_event_hook(func, events, data); 04096 hook->next = th->event_hooks; 04097 th->event_hooks = hook; 04098 thread_reset_event_flags(th); 04099 } 04100 04101 static rb_thread_t * 04102 thval2thread_t(VALUE thval) 04103 { 04104 rb_thread_t *th; 04105 GetThreadPtr(thval, th); 04106 return th; 04107 } 04108 04109 void 04110 rb_thread_add_event_hook(VALUE thval, 04111 rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04112 { 04113 rb_threadptr_add_event_hook(thval2thread_t(thval), func, events, data); 04114 } 04115 04116 static int 04117 set_threads_event_flags_i(st_data_t key, st_data_t val, st_data_t flag) 04118 { 04119 VALUE thval = key; 04120 rb_thread_t *th; 04121 GetThreadPtr(thval, th); 04122 04123 if (flag) { 04124 th->event_flags |= RUBY_EVENT_VM; 04125 } 04126 else { 04127 th->event_flags &= (~RUBY_EVENT_VM); 04128 } 04129 return ST_CONTINUE; 04130 } 04131 04132 static void 04133 set_threads_event_flags(int flag) 04134 { 04135 st_foreach(GET_VM()->living_threads, set_threads_event_flags_i, (st_data_t) flag); 04136 } 04137 04138 static inline int 04139 exec_event_hooks(const rb_event_hook_t *hook, rb_event_flag_t flag, VALUE self, ID id, VALUE klass) 04140 { 04141 int removed = 0; 04142 for (; hook; hook = hook->next) { 04143 if (hook->flag & RUBY_EVENT_REMOVED) { 04144 removed++; 04145 continue; 04146 } 04147 if (flag & hook->flag) { 04148 (*hook->func)(flag, hook->data, self, id, klass); 04149 } 04150 } 04151 return removed; 04152 } 04153 04154 static int remove_defered_event_hook(rb_event_hook_t **root); 04155 04156 static VALUE 04157 thread_exec_event_hooks(VALUE args, int running) 04158 { 04159 struct event_call_args *argp = (struct event_call_args *)args; 04160 rb_thread_t *th = argp->th; 04161 rb_event_flag_t flag = argp->event; 04162 VALUE self = argp->self; 04163 ID id = argp->id; 04164 VALUE klass = argp->klass; 04165 const rb_event_flag_t wait_event = th->event_flags; 04166 int removed; 04167 04168 if (self == rb_mRubyVMFrozenCore) return 0; 04169 04170 if ((wait_event & flag) && !(running & EVENT_RUNNING_THREAD)) { 04171 th->tracing |= EVENT_RUNNING_THREAD; 04172 removed = exec_event_hooks(th->event_hooks, flag, self, id, klass); 04173 th->tracing &= ~EVENT_RUNNING_THREAD; 04174 if (removed) { 04175 remove_defered_event_hook(&th->event_hooks); 04176 } 04177 } 04178 if (wait_event & RUBY_EVENT_VM) { 04179 if (th->vm->event_hooks == NULL) { 04180 th->event_flags &= (~RUBY_EVENT_VM); 04181 } 04182 else if (!(running & EVENT_RUNNING_VM)) { 04183 th->tracing |= EVENT_RUNNING_VM; 04184 removed = exec_event_hooks(th->vm->event_hooks, flag, self, id, klass); 04185 th->tracing &= ~EVENT_RUNNING_VM; 04186 if (removed) { 04187 remove_defered_event_hook(&th->vm->event_hooks); 04188 } 04189 } 04190 } 04191 return 0; 04192 } 04193 04194 void 04195 rb_threadptr_exec_event_hooks(rb_thread_t *th, rb_event_flag_t flag, VALUE self, ID id, VALUE klass) 04196 { 04197 const VALUE errinfo = th->errinfo; 04198 struct event_call_args args; 04199 args.th = th; 04200 args.event = flag; 04201 args.self = self; 04202 args.id = id; 04203 args.klass = klass; 04204 args.proc = 0; 04205 thread_suppress_tracing(th, EVENT_RUNNING_EVENT_MASK, thread_exec_event_hooks, (VALUE)&args, FALSE); 04206 th->errinfo = errinfo; 04207 } 04208 04209 void 04210 rb_add_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04211 { 04212 rb_event_hook_t *hook = alloc_event_hook(func, events, data); 04213 rb_vm_t *vm = GET_VM(); 04214 04215 hook->next = vm->event_hooks; 04216 vm->event_hooks = hook; 04217 04218 set_threads_event_flags(1); 04219 } 04220 04221 static int 04222 defer_remove_event_hook(rb_event_hook_t *hook, rb_event_hook_func_t func) 04223 { 04224 while (hook) { 04225 if (func == 0 || hook->func == func) { 04226 hook->flag |= RUBY_EVENT_REMOVED; 04227 } 04228 hook = hook->next; 04229 } 04230 return -1; 04231 } 04232 04233 static int 04234 remove_event_hook(rb_event_hook_t **root, rb_event_hook_func_t func) 04235 { 04236 rb_event_hook_t *hook = *root, *next; 04237 04238 while (hook) { 04239 next = hook->next; 04240 if (func == 0 || hook->func == func || (hook->flag & RUBY_EVENT_REMOVED)) { 04241 *root = next; 04242 xfree(hook); 04243 } 04244 else { 04245 root = &hook->next; 04246 } 04247 hook = next; 04248 } 04249 return -1; 04250 } 04251 04252 static int 04253 remove_defered_event_hook(rb_event_hook_t **root) 04254 { 04255 rb_event_hook_t *hook = *root, *next; 04256 04257 while (hook) { 04258 next = hook->next; 04259 if (hook->flag & RUBY_EVENT_REMOVED) { 04260 *root = next; 04261 xfree(hook); 04262 } 04263 else { 04264 root = &hook->next; 04265 } 04266 hook = next; 04267 } 04268 return -1; 04269 } 04270 04271 static int 04272 rb_threadptr_remove_event_hook(rb_thread_t *th, rb_event_hook_func_t func) 04273 { 04274 int ret; 04275 if (th->tracing & EVENT_RUNNING_THREAD) { 04276 ret = defer_remove_event_hook(th->event_hooks, func); 04277 } 04278 else { 04279 ret = remove_event_hook(&th->event_hooks, func); 04280 } 04281 thread_reset_event_flags(th); 04282 return ret; 04283 } 04284 04285 int 04286 rb_thread_remove_event_hook(VALUE thval, rb_event_hook_func_t func) 04287 { 04288 return rb_threadptr_remove_event_hook(thval2thread_t(thval), func); 04289 } 04290 04291 static rb_event_hook_t * 04292 search_live_hook(rb_event_hook_t *hook) 04293 { 04294 while (hook) { 04295 if (!(hook->flag & RUBY_EVENT_REMOVED)) 04296 return hook; 04297 hook = hook->next; 04298 } 04299 return NULL; 04300 } 04301 04302 static int 04303 running_vm_event_hooks(st_data_t key, st_data_t val, st_data_t data) 04304 { 04305 rb_thread_t *th = thval2thread_t((VALUE)key); 04306 if (!(th->tracing & EVENT_RUNNING_VM)) return ST_CONTINUE; 04307 *(rb_thread_t **)data = th; 04308 return ST_STOP; 04309 } 04310 04311 static rb_thread_t * 04312 vm_event_hooks_running_thread(rb_vm_t *vm) 04313 { 04314 rb_thread_t *found = NULL; 04315 st_foreach(vm->living_threads, running_vm_event_hooks, (st_data_t)&found); 04316 return found; 04317 } 04318 04319 int 04320 rb_remove_event_hook(rb_event_hook_func_t func) 04321 { 04322 rb_vm_t *vm = GET_VM(); 04323 rb_event_hook_t *hook = search_live_hook(vm->event_hooks); 04324 int ret; 04325 04326 if (vm_event_hooks_running_thread(vm)) { 04327 ret = defer_remove_event_hook(vm->event_hooks, func); 04328 } 04329 else { 04330 ret = remove_event_hook(&vm->event_hooks, func); 04331 } 04332 04333 if (hook && !search_live_hook(vm->event_hooks)) { 04334 set_threads_event_flags(0); 04335 } 04336 04337 return ret; 04338 } 04339 04340 static int 04341 clear_trace_func_i(st_data_t key, st_data_t val, st_data_t flag) 04342 { 04343 rb_thread_t *th; 04344 GetThreadPtr((VALUE)key, th); 04345 rb_threadptr_remove_event_hook(th, 0); 04346 return ST_CONTINUE; 04347 } 04348 04349 void 04350 rb_clear_trace_func(void) 04351 { 04352 st_foreach(GET_VM()->living_threads, clear_trace_func_i, (st_data_t) 0); 04353 rb_remove_event_hook(0); 04354 } 04355 04356 static void call_trace_func(rb_event_flag_t, VALUE data, VALUE self, ID id, VALUE klass); 04357 04358 /* 04359 * call-seq: 04360 * set_trace_func(proc) -> proc 04361 * set_trace_func(nil) -> nil 04362 * 04363 * Establishes _proc_ as the handler for tracing, or disables 04364 * tracing if the parameter is +nil+. _proc_ takes up 04365 * to six parameters: an event name, a filename, a line number, an 04366 * object id, a binding, and the name of a class. _proc_ is 04367 * invoked whenever an event occurs. Events are: <code>c-call</code> 04368 * (call a C-language routine), <code>c-return</code> (return from a 04369 * C-language routine), <code>call</code> (call a Ruby method), 04370 * <code>class</code> (start a class or module definition), 04371 * <code>end</code> (finish a class or module definition), 04372 * <code>line</code> (execute code on a new line), <code>raise</code> 04373 * (raise an exception), and <code>return</code> (return from a Ruby 04374 * method). Tracing is disabled within the context of _proc_. 04375 * 04376 * class Test 04377 * def test 04378 * a = 1 04379 * b = 2 04380 * end 04381 * end 04382 * 04383 * set_trace_func proc { |event, file, line, id, binding, classname| 04384 * printf "%8s %s:%-2d %10s %8s\n", event, file, line, id, classname 04385 * } 04386 * t = Test.new 04387 * t.test 04388 * 04389 * line prog.rb:11 false 04390 * c-call prog.rb:11 new Class 04391 * c-call prog.rb:11 initialize Object 04392 * c-return prog.rb:11 initialize Object 04393 * c-return prog.rb:11 new Class 04394 * line prog.rb:12 false 04395 * call prog.rb:2 test Test 04396 * line prog.rb:3 test Test 04397 * line prog.rb:4 test Test 04398 * return prog.rb:4 test Test 04399 */ 04400 04401 static VALUE 04402 set_trace_func(VALUE obj, VALUE trace) 04403 { 04404 rb_remove_event_hook(call_trace_func); 04405 04406 if (NIL_P(trace)) { 04407 GET_THREAD()->tracing = EVENT_RUNNING_NOTHING; 04408 return Qnil; 04409 } 04410 04411 if (!rb_obj_is_proc(trace)) { 04412 rb_raise(rb_eTypeError, "trace_func needs to be Proc"); 04413 } 04414 04415 rb_add_event_hook(call_trace_func, RUBY_EVENT_ALL, trace); 04416 return trace; 04417 } 04418 04419 static void 04420 thread_add_trace_func(rb_thread_t *th, VALUE trace) 04421 { 04422 if (!rb_obj_is_proc(trace)) { 04423 rb_raise(rb_eTypeError, "trace_func needs to be Proc"); 04424 } 04425 04426 rb_threadptr_add_event_hook(th, call_trace_func, RUBY_EVENT_ALL, trace); 04427 } 04428 04429 /* 04430 * call-seq: 04431 * thr.add_trace_func(proc) -> proc 04432 * 04433 * Adds _proc_ as a handler for tracing. 04434 * See <code>Thread#set_trace_func</code> and +set_trace_func+. 04435 */ 04436 04437 static VALUE 04438 thread_add_trace_func_m(VALUE obj, VALUE trace) 04439 { 04440 rb_thread_t *th; 04441 GetThreadPtr(obj, th); 04442 thread_add_trace_func(th, trace); 04443 return trace; 04444 } 04445 04446 /* 04447 * call-seq: 04448 * thr.set_trace_func(proc) -> proc 04449 * thr.set_trace_func(nil) -> nil 04450 * 04451 * Establishes _proc_ on _thr_ as the handler for tracing, or 04452 * disables tracing if the parameter is +nil+. 04453 * See +set_trace_func+. 04454 */ 04455 04456 static VALUE 04457 thread_set_trace_func_m(VALUE obj, VALUE trace) 04458 { 04459 rb_thread_t *th; 04460 GetThreadPtr(obj, th); 04461 rb_threadptr_remove_event_hook(th, call_trace_func); 04462 04463 if (NIL_P(trace)) { 04464 th->tracing = EVENT_RUNNING_NOTHING; 04465 return Qnil; 04466 } 04467 thread_add_trace_func(th, trace); 04468 return trace; 04469 } 04470 04471 static const char * 04472 get_event_name(rb_event_flag_t event) 04473 { 04474 switch (event) { 04475 case RUBY_EVENT_LINE: 04476 return "line"; 04477 case RUBY_EVENT_CLASS: 04478 return "class"; 04479 case RUBY_EVENT_END: 04480 return "end"; 04481 case RUBY_EVENT_CALL: 04482 return "call"; 04483 case RUBY_EVENT_RETURN: 04484 return "return"; 04485 case RUBY_EVENT_C_CALL: 04486 return "c-call"; 04487 case RUBY_EVENT_C_RETURN: 04488 return "c-return"; 04489 case RUBY_EVENT_RAISE: 04490 return "raise"; 04491 default: 04492 return "unknown"; 04493 } 04494 } 04495 04496 static VALUE 04497 call_trace_proc(VALUE args, int tracing) 04498 { 04499 struct event_call_args *p = (struct event_call_args *)args; 04500 const char *srcfile = rb_sourcefile(); 04501 VALUE eventname = rb_str_new2(get_event_name(p->event)); 04502 VALUE filename = srcfile ? rb_str_new2(srcfile) : Qnil; 04503 VALUE argv[6]; 04504 int line = rb_sourceline(); 04505 ID id = 0; 04506 VALUE klass = 0; 04507 04508 if (p->klass != 0) { 04509 id = p->id; 04510 klass = p->klass; 04511 } 04512 else { 04513 rb_thread_method_id_and_class(p->th, &id, &klass); 04514 } 04515 if (id == ID_ALLOCATOR) 04516 return Qnil; 04517 if (klass) { 04518 if (TYPE(klass) == T_ICLASS) { 04519 klass = RBASIC(klass)->klass; 04520 } 04521 else if (FL_TEST(klass, FL_SINGLETON)) { 04522 klass = rb_iv_get(klass, "__attached__"); 04523 } 04524 } 04525 04526 argv[0] = eventname; 04527 argv[1] = filename; 04528 argv[2] = INT2FIX(line); 04529 argv[3] = id ? ID2SYM(id) : Qnil; 04530 argv[4] = (p->self && srcfile) ? rb_binding_new() : Qnil; 04531 argv[5] = klass ? klass : Qnil; 04532 04533 return rb_proc_call_with_block(p->proc, 6, argv, Qnil); 04534 } 04535 04536 static void 04537 call_trace_func(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 04538 { 04539 struct event_call_args args; 04540 04541 args.th = GET_THREAD(); 04542 args.event = event; 04543 args.proc = proc; 04544 args.self = self; 04545 args.id = id; 04546 args.klass = klass; 04547 ruby_suppress_tracing(call_trace_proc, (VALUE)&args, FALSE); 04548 } 04549 04550 VALUE 04551 ruby_suppress_tracing(VALUE (*func)(VALUE, int), VALUE arg, int always) 04552 { 04553 rb_thread_t *th = GET_THREAD(); 04554 return thread_suppress_tracing(th, EVENT_RUNNING_TRACE, func, arg, always); 04555 } 04556 04557 static VALUE 04558 thread_suppress_tracing(rb_thread_t *th, int ev, VALUE (*func)(VALUE, int), VALUE arg, int always) 04559 { 04560 int state, tracing = th->tracing, running = tracing & ev; 04561 volatile int raised; 04562 volatile int outer_state; 04563 VALUE result = Qnil; 04564 04565 if (running == ev && !always) { 04566 return Qnil; 04567 } 04568 else { 04569 th->tracing |= ev; 04570 } 04571 04572 raised = rb_threadptr_reset_raised(th); 04573 outer_state = th->state; 04574 th->state = 0; 04575 04576 PUSH_TAG(); 04577 if ((state = EXEC_TAG()) == 0) { 04578 result = (*func)(arg, running); 04579 } 04580 04581 if (raised) { 04582 rb_threadptr_set_raised(th); 04583 } 04584 POP_TAG(); 04585 04586 th->tracing = tracing; 04587 if (state) { 04588 JUMP_TAG(state); 04589 } 04590 th->state = outer_state; 04591 04592 return result; 04593 } 04594 04595 /* 04596 * call-seq: 04597 * thr.backtrace -> array 04598 * 04599 * Returns the current back trace of the _thr_. 04600 */ 04601 04602 static VALUE 04603 rb_thread_backtrace_m(VALUE thval) 04604 { 04605 return rb_thread_backtrace(thval); 04606 } 04607 04608 /* 04609 * Document-class: ThreadError 04610 * 04611 * Raised when an invalid operation is attempted on a thread. 04612 * 04613 * For example, when no other thread has been started: 04614 * 04615 * Thread.stop 04616 * 04617 * <em>raises the exception:</em> 04618 * 04619 * ThreadError: stopping only thread 04620 */ 04621 04622 /* 04623 * +Thread+ encapsulates the behavior of a thread of 04624 * execution, including the main thread of the Ruby script. 04625 * 04626 * In the descriptions of the methods in this class, the parameter _sym_ 04627 * refers to a symbol, which is either a quoted string or a 04628 * +Symbol+ (such as <code>:name</code>). 04629 */ 04630 04631 void 04632 Init_Thread(void) 04633 { 04634 #undef rb_intern 04635 #define rb_intern(str) rb_intern_const(str) 04636 04637 VALUE cThGroup; 04638 rb_thread_t *th = GET_THREAD(); 04639 04640 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); 04641 rb_define_singleton_method(rb_cThread, "start", thread_start, -2); 04642 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); 04643 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); 04644 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); 04645 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); 04646 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); 04647 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); 04648 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); 04649 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); 04650 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); 04651 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); 04652 #if THREAD_DEBUG < 0 04653 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); 04654 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); 04655 #endif 04656 04657 rb_define_method(rb_cThread, "initialize", thread_initialize, -2); 04658 rb_define_method(rb_cThread, "raise", thread_raise_m, -1); 04659 rb_define_method(rb_cThread, "join", thread_join_m, -1); 04660 rb_define_method(rb_cThread, "value", thread_value, 0); 04661 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0); 04662 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0); 04663 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0); 04664 rb_define_method(rb_cThread, "run", rb_thread_run, 0); 04665 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0); 04666 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1); 04667 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2); 04668 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1); 04669 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); 04670 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); 04671 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); 04672 rb_define_method(rb_cThread, "status", rb_thread_status, 0); 04673 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0); 04674 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0); 04675 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0); 04676 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); 04677 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); 04678 rb_define_method(rb_cThread, "group", rb_thread_group, 0); 04679 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, 0); 04680 04681 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); 04682 04683 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed"); 04684 OBJ_TAINT(closed_stream_error); 04685 OBJ_FREEZE(closed_stream_error); 04686 04687 cThGroup = rb_define_class("ThreadGroup", rb_cObject); 04688 rb_define_alloc_func(cThGroup, thgroup_s_alloc); 04689 rb_define_method(cThGroup, "list", thgroup_list, 0); 04690 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0); 04691 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0); 04692 rb_define_method(cThGroup, "add", thgroup_add, 1); 04693 04694 { 04695 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); 04696 rb_define_const(cThGroup, "Default", th->thgroup); 04697 } 04698 04699 rb_cMutex = rb_define_class("Mutex", rb_cObject); 04700 rb_define_alloc_func(rb_cMutex, mutex_alloc); 04701 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); 04702 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); 04703 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0); 04704 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); 04705 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); 04706 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1); 04707 04708 recursive_key = rb_intern("__recursive_key__"); 04709 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); 04710 04711 /* trace */ 04712 rb_define_global_function("set_trace_func", set_trace_func, 1); 04713 rb_define_method(rb_cThread, "set_trace_func", thread_set_trace_func_m, 1); 04714 rb_define_method(rb_cThread, "add_trace_func", thread_add_trace_func_m, 1); 04715 04716 /* init thread core */ 04717 { 04718 /* main thread setting */ 04719 { 04720 /* acquire global vm lock */ 04721 gvl_init(th->vm); 04722 gvl_acquire(th->vm, th); 04723 native_mutex_initialize(&th->interrupt_lock); 04724 } 04725 } 04726 04727 rb_thread_create_timer_thread(); 04728 04729 /* suppress warnings on cygwin, mingw and mswin.*/ 04730 (void)native_mutex_trylock; 04731 } 04732 04733 int 04734 ruby_native_thread_p(void) 04735 { 04736 rb_thread_t *th = ruby_thread_from_native(); 04737 04738 return th != 0; 04739 } 04740 04741 static int 04742 check_deadlock_i(st_data_t key, st_data_t val, int *found) 04743 { 04744 VALUE thval = key; 04745 rb_thread_t *th; 04746 GetThreadPtr(thval, th); 04747 04748 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) { 04749 *found = 1; 04750 } 04751 else if (th->locking_mutex) { 04752 rb_mutex_t *mutex; 04753 GetMutexPtr(th->locking_mutex, mutex); 04754 04755 native_mutex_lock(&mutex->lock); 04756 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { 04757 *found = 1; 04758 } 04759 native_mutex_unlock(&mutex->lock); 04760 } 04761 04762 return (*found) ? ST_STOP : ST_CONTINUE; 04763 } 04764 04765 #ifdef DEBUG_DEADLOCK_CHECK 04766 static int 04767 debug_i(st_data_t key, st_data_t val, int *found) 04768 { 04769 VALUE thval = key; 04770 rb_thread_t *th; 04771 GetThreadPtr(thval, th); 04772 04773 printf("th:%p %d %d", th, th->status, th->interrupt_flag); 04774 if (th->locking_mutex) { 04775 rb_mutex_t *mutex; 04776 GetMutexPtr(th->locking_mutex, mutex); 04777 04778 native_mutex_lock(&mutex->lock); 04779 printf(" %p %d\n", mutex->th, mutex->cond_waiting); 04780 native_mutex_unlock(&mutex->lock); 04781 } 04782 else 04783 puts(""); 04784 04785 return ST_CONTINUE; 04786 } 04787 #endif 04788 04789 static void 04790 rb_check_deadlock(rb_vm_t *vm) 04791 { 04792 int found = 0; 04793 04794 if (vm_living_thread_num(vm) > vm->sleeper) return; 04795 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); 04796 if (patrol_thread && patrol_thread != GET_THREAD()) return; 04797 04798 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); 04799 04800 if (!found) { 04801 VALUE argv[2]; 04802 argv[0] = rb_eFatal; 04803 argv[1] = rb_str_new2("deadlock detected"); 04804 #ifdef DEBUG_DEADLOCK_CHECK 04805 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread); 04806 st_foreach(vm->living_threads, debug_i, (st_data_t)0); 04807 #endif 04808 vm->sleeper--; 04809 rb_threadptr_raise(vm->main_thread, 2, argv); 04810 } 04811 } 04812 04813 static void 04814 update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 04815 { 04816 VALUE coverage = GET_THREAD()->cfp->iseq->coverage; 04817 if (coverage && RBASIC(coverage)->klass == 0) { 04818 long line = rb_sourceline() - 1; 04819 long count; 04820 if (RARRAY_PTR(coverage)[line] == Qnil) { 04821 return; 04822 } 04823 count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1; 04824 if (POSFIXABLE(count)) { 04825 RARRAY_PTR(coverage)[line] = LONG2FIX(count); 04826 } 04827 } 04828 } 04829 04830 VALUE 04831 rb_get_coverages(void) 04832 { 04833 return GET_VM()->coverages; 04834 } 04835 04836 void 04837 rb_set_coverages(VALUE coverages) 04838 { 04839 GET_VM()->coverages = coverages; 04840 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil); 04841 } 04842 04843 void 04844 rb_reset_coverages(void) 04845 { 04846 GET_VM()->coverages = Qfalse; 04847 rb_remove_event_hook(update_coverage); 04848 } 04849 04850