friend class FrontendNet;
int m_no_threads;
+ int m_max_threads;
std::vector<Port> m_ports;
int m_listen_duration;
int m_session_timeout;
yf::FrontendNet::Rep::Rep()
{
- m_no_threads = 5;
+ m_max_threads = m_no_threads = 5;
m_listen_duration = 0;
m_session_timeout = 300; // 5 minutes
m_connect_max = 0;
tt = new My_Timer_Thread(&m_p->mySocketManager,
m_p->m_listen_duration);
- ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads);
+ ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads,
+ m_p->m_max_threads);
for (i = 0; i<m_p->m_ports.size(); i++)
{
+ threads_str);
m_p->m_no_threads = threads;
}
+ else if (!strcmp((const char *) ptr->name, "max-threads"))
+ {
+ std::string threads_str = mp::xml::get_text(ptr);
+ int threads = atoi(threads_str.c_str());
+ if (threads < 1)
+ throw yf::FilterException("Bad value for threads: "
+ + threads_str);
+ m_p->m_max_threads = threads;
+ }
else if (!strcmp((const char *) ptr->name, "timeout"))
{
std::string timeout_str = mp::xml::get_text(ptr);
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
unsigned m_no_threads;
- unsigned m_no_threads_waiting;
+ unsigned m_min_threads;
+ unsigned m_max_threads;
+ unsigned m_waiting_threads;
};
const unsigned int queue_size_per_thread = 64;
}
}
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
- yazpp_1::ISocketObservable *obs, int no_threads)
+ yazpp_1::ISocketObservable *obs,
+ unsigned min_threads, unsigned max_threads)
: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
- m_p->m_no_threads = no_threads;
- m_p->m_no_threads_waiting = 0;
- int i;
- for (i = 0; i<no_threads; i++)
+ m_p->m_min_threads = m_p->m_no_threads = min_threads;
+ m_p->m_max_threads = max_threads;
+ m_p->m_waiting_threads = 0;
+ unsigned i;
+ for (i = 0; i < m_p->m_no_threads; i++)
{
Worker w(this);
m_p->m_thrds.add_thread(new boost::thread(w));
m_p->m_cond_input_data.notify_all();
}
m_p->m_thrds.join_all();
-
m_p->m_socketObservable->deleteObserver(this);
}
out = m_p->m_output.front();
m_p->m_output.pop_front();
}
-
-
if (out)
{
std::ostringstream os;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
os << "tbusy/total " <<
- m_p->m_no_threads - m_p->m_no_threads_waiting <<
+ m_p->m_no_threads - m_p->m_waiting_threads <<
"/" << m_p->m_no_threads
<< " queue in/out " << m_p->m_input.size() << "/"
<< m_p->m_output.size();
void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
{
- tbusy = m_p->m_no_threads - m_p->m_no_threads_waiting;
+ tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
total = m_p->m_no_threads;
}
IThreadPoolMsg *in = 0;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
- m_p->m_no_threads_waiting++;
+ m_p->m_waiting_threads++;
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
- m_p->m_no_threads_waiting--;
+ m_p->m_waiting_threads--;
if (m_p->m_stop_flag)
break;
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
-
+ if (m_p->m_waiting_threads == 0 &&
+ m_p->m_no_threads < m_p->m_max_threads)
+ {
+ m_p->m_no_threads++;
+ Worker w(this);
+ m_p->m_thrds.add_thread(new boost::thread(w));
+ }
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);