X-Git-Url: http://jsfdemo.indexdata.com/?a=blobdiff_plain;f=src%2Fthread_pool_observer.cpp;h=6bbe84b2e49bd7b1430a1ba6d24403dc485a9379;hb=098f68aa472b0827a0dfdfda38a338cde577ef98;hp=367c43a8731bc31abfa651c0143097e9cfd4c06f;hpb=747bd27ce6cec1595cb3f7c5620dce794ba55e4b;p=metaproxy-moved-to-github.git diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index 367c43a..6bbe84b 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,8 +1,7 @@ +/* $Id: thread_pool_observer.cpp,v 1.17 2006-06-10 14:29:13 adam Exp $ + Copyright (c) 2005-2006, Index Data. -/* $Id: thread_pool_observer.cpp,v 1.9 2005-11-04 11:06:52 adam Exp $ - Copyright (c) 2005, Index Data. - -%LICENSE% + See the LICENSE file for details */ #include "config.hpp" @@ -13,6 +12,10 @@ #include #endif +#if HAVE_SYS_SOCKET_H +#include +#endif + #include #include #include @@ -20,12 +23,15 @@ #include #include -#include +#include + +#include #include #include "thread_pool_observer.hpp" +#include "pipe.hpp" -namespace yp2 { +namespace metaproxy_1 { class ThreadPoolSocketObserver::Worker { public: Worker(ThreadPoolSocketObserver *s) : m_s(s) {}; @@ -42,7 +48,7 @@ namespace yp2 { ~Rep(); private: yazpp_1::ISocketObservable *m_socketObservable; - int m_fd[2]; + Pipe m_pipe; boost::thread_group m_thrds; boost::mutex m_mutex_input_data; boost::condition m_cond_input_data; @@ -56,10 +62,10 @@ namespace yp2 { using namespace yazpp_1; -using namespace yp2; +using namespace metaproxy_1; -ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs) - : m_socketObservable(obs) +ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs) + : m_socketObservable(obs), m_pipe(9123) { } @@ -72,12 +78,11 @@ IThreadPoolMsg::~IThreadPoolMsg() } -ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, - int no_threads) +ThreadPoolSocketObserver::ThreadPoolSocketObserver( + yazpp_1::ISocketObservable *obs, int no_threads) : m_p(new Rep(obs)) { - pipe(m_p->m_fd); - obs->addObserver(m_p->m_fd[0], this); + obs->addObserver(m_p->m_pipe.read_fd(), this); obs->maskObserver(this, SOCKET_OBSERVE_READ); m_p->m_stop_flag = false; @@ -100,9 +105,6 @@ ThreadPoolSocketObserver::~ThreadPoolSocketObserver() m_p->m_thrds.join_all(); m_p->m_socketObservable->deleteObserver(this); - - close(m_p->m_fd[0]); - close(m_p->m_fd[1]); } void ThreadPoolSocketObserver::socketNotify(int event) @@ -110,7 +112,7 @@ void ThreadPoolSocketObserver::socketNotify(int event) if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_p->m_fd[0], buf, 1); + recv(m_p->m_pipe.read_fd(), buf, 1, 0); IThreadPoolMsg *out; { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); @@ -141,7 +143,7 @@ void ThreadPoolSocketObserver::run(void *p) { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); m_p->m_output.push_back(out); - write(m_p->m_fd[1], "", 1); + send(m_p->m_pipe.write_fd(), "", 1, 0); } } }