obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
- m_p->m_min_threads = m_p->m_no_threads = min_threads;
+ m_p->m_no_threads = 0;
+ m_p->m_min_threads = min_threads;
m_p->m_max_threads = max_threads;
m_p->m_waiting_threads = 0;
m_p->m_stack_size = stack_size;
unsigned i;
- for (i = 0; i < m_p->m_no_threads; i++)
- {
- Worker w(this);
- boost::thread::attributes attrs;
- if (m_p->m_stack_size)
- attrs.set_stack_size(m_p->m_stack_size);
-
- boost::thread *x = new boost::thread(attrs, w);
-
- m_p->m_thrds.add_thread(x);
- }
+ for (i = 0; i < min_threads; i++)
+ add_worker();
}
ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
m_p->m_socketObservable->deleteObserver(this);
}
+void ThreadPoolSocketObserver::add_worker(void)
+{
+ Worker w(this);
+#if BOOST_VERSION >= 1050000
+ boost::thread::attributes attrs;
+ if (m_p->m_stack_size)
+ attrs.set_stack_size(m_stack_size);
+ boost::thread *x = new boost::thread(attrs, w);
+#else
+ boost::thread *x = new boost::thread(w);
+#endif
+ m_p->m_no_threads++;
+ m_p->m_thrds.add_thread(x);
+}
+
void ThreadPoolSocketObserver::socketNotify(int event)
{
if (event & SOCKET_OBSERVE_READ)
if (m_p->m_waiting_threads == 0 &&
m_p->m_no_threads < m_p->m_max_threads)
{
- m_p->m_no_threads++;
- Worker w(this);
-
- boost::thread::attributes attrs;
- if (m_p->m_stack_size)
- attrs.set_stack_size(m_p->m_stack_size);
- boost::thread *x = new boost::thread(attrs, w);
-
- m_p->m_thrds.add_thread(x);
+ add_worker();
}
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);