/* * call-seq: * conn.wait_for_notify( [ timeout ] ) -> String * conn.wait_for_notify( [ timeout ] ) { |event, pid| block } * * Blocks while waiting for notification(s), or until the optional * _timeout_ is reached, whichever comes first. _timeout_ is * measured in seconds and can be fractional. * * Returns +nil+ if _timeout_ is reached, the name of the NOTIFY * event otherwise. If used in block form, passes the name of the * NOTIFY +event+ and the generating +pid+ into the block. * */ static VALUE pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self) { PGconn *conn = get_pgconn(self); PGnotify *notify; int sd = PQsocket(conn); int ret; struct timeval timeout; struct timeval *ptimeout = NULL; VALUE timeout_in, relname = Qnil, be_pid = Qnil; double timeout_sec; fd_set sd_rset; if (sd < 0) rb_bug("PQsocket(conn): couldn't fetch the connection's socket!"); if (rb_scan_args(argc, argv, "01", &timeout_in) == 1) { timeout_sec = NUM2DBL(timeout_in); timeout.tv_sec = (long)timeout_sec; timeout.tv_usec = (long)((timeout_sec - (long)timeout_sec) * 1e6); ptimeout = &timeout; } FD_ZERO(&sd_rset); FD_SET(sd, &sd_rset); ret = rb_thread_select(sd+1, &sd_rset, NULL, NULL, ptimeout); if (ret == 0) { return Qnil; } else if (ret < 0) { rb_sys_fail(0); } if ( (ret = PQconsumeInput(conn)) != 1 ) { rb_raise(rb_ePGError, "PQconsumeInput == %d: %s", ret, PQerrorMessage(conn)); } while ((notify = PQnotifies(conn)) != NULL) { relname = rb_tainted_str_new2(notify->relname); be_pid = INT2NUM(notify->be_pid); PQfreemem(notify); } if (rb_block_given_p()) rb_yield( rb_ary_new3(2, relname, be_pid) ); return relname; }