X-Git-Url: http://jsfdemo.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-pdu-assoc.cpp;h=c0b70c37b121795f78d352d7abd3a2d5b018ae74;hb=1063240efa2d4f03a72353936e8a7b73ee249b0d;hp=744e793df97d80886bada051292059acb359079b;hpb=5c45ffbb2b430a6f41277c303a5e9b94242dab96;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 744e793..c0b70c3 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -4,7 +4,14 @@ * Sebastian Hammer, Adam Dickmeiss * * $Log: yaz-pdu-assoc.cpp,v $ - * Revision 1.7 1999-04-21 12:09:01 adam + * Revision 1.9 1999-12-06 13:52:45 adam + * Modified for new location of YAZ header files. Experimental threaded + * operation. + * + * Revision 1.8 1999/04/28 13:04:03 adam + * Fixed setting of proxy otherInfo so that database(s) are removed. + * + * Revision 1.7 1999/04/21 12:09:01 adam * Many improvements. Modified to proxy server to work with "sessions" * based on cookies. * @@ -34,14 +41,13 @@ #include -#include -#include +#include +#include -Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, - COMSTACK cs) +Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable) { m_state = Closed; - m_cs = cs; + m_cs = 0; m_socketObservable = socketObservable; m_PDU_Observer = 0; m_queue_out = 0; @@ -51,11 +57,13 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, m_parent = 0; m_next = 0; m_destroyed = 0; + m_idleTime = 0; + m_log = LOG_DEBUG; } IYaz_PDU_Observable *Yaz_PDU_Assoc::clone() { - Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0); + Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable); return copy; } @@ -66,8 +74,8 @@ Yaz_PDU_Assoc::~Yaz_PDU_Assoc() void Yaz_PDU_Assoc::socketNotify(int event) { - logf (LOG_LOG, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event); - if (m_state == Connected) + logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event); + if (0 /* m_state == Connected */) { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -82,7 +90,11 @@ void Yaz_PDU_Assoc::socketNotify(int event) close(); m_PDU_Observer->failNotify(); } - else + else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + { + m_PDU_Observer->timeoutNotify(); + } + else { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -108,20 +120,26 @@ void Yaz_PDU_Assoc::socketNotify(int event) if (!(new_line = cs_accept(m_cs))) return; - Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable, - new_line); + /* 1. create socket-manager + 2. create pdu-assoc + 3. create top-level object + setup observer for child fileid in pdu-assoc + 4. start thread + */ + int fd = cs_fileno(new_line); + cs_fileno(new_line) = -1; + cs_close (new_line); +#if 1 + childNotify(fd); +#else + Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable); assoc->m_parent = this; assoc->m_next = m_children; m_children = assoc; - + assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc); - assoc->m_state = Ready; - assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc); - assoc->m_socketObservable->maskObserver(assoc, - YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT); - assoc->m_socketObservable->timeoutObserver(assoc, - assoc->m_idleTime); + socket(fd); +#endif } } else if (m_state == Ready) @@ -139,7 +157,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; else if (res <= 0) { - logf (LOG_LOG, "Connection closed by client"); + logf (m_log, "Connection closed by peer"); close(); m_PDU_Observer->failNotify(); return; @@ -166,7 +184,7 @@ void Yaz_PDU_Assoc::close() m_state = Closed; if (m_cs) { - logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); cs_close (m_cs); } m_cs = 0; @@ -227,9 +245,10 @@ int Yaz_PDU_Assoc::flush_PDU() { int r; - logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU"); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU"); if (m_state != Ready) { + logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); return 1; } PDU_Queue *q = m_queue_out; @@ -251,11 +270,11 @@ int Yaz_PDU_Assoc::flush_PDU() m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT| YAZ_SOCKET_OBSERVE_WRITE); - logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)", + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)", q->m_len); return r; } - logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; delete q; @@ -268,13 +287,13 @@ int Yaz_PDU_Assoc::flush_PDU() int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) { - logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU"); + logf (m_log, "Yaz_PDU_Assoc::send_PDU"); PDU_Queue **pq = &m_queue_out; int is_idle = (*pq ? 0 : 1); if (!m_cs) { - logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); + logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); return -1; } while (*pq) @@ -283,7 +302,7 @@ int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) if (is_idle) return flush_PDU (); else - logf (LOG_LOG, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", + logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", cs_fileno(m_cs)); return 0; } @@ -293,8 +312,7 @@ COMSTACK Yaz_PDU_Assoc::comstack() if (!m_cs) { CS_TYPE cs_type = tcpip_type; - int protocol = PROTO_Z3950; - m_cs = cs_create (cs_type, 0, protocol); + m_cs = cs_create (cs_type, 0, PROTO_Z3950); } return m_cs; } @@ -306,7 +324,7 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, void *ap; COMSTACK cs = comstack(); - logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr); + logf (m_log, "Yaz_PDU_Assoc::listen %s", addr); m_PDU_Observer = observer; if (!cs) return; @@ -324,54 +342,98 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, void Yaz_PDU_Assoc::idleTime(int idleTime) { m_idleTime = idleTime; - logf (LOG_LOG, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); + logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); m_socketObservable->timeoutObserver(this, m_idleTime); } void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, const char *addr) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr); + logf (m_log, "Yaz_PDU_Assoc::connect %s", addr); close(); m_PDU_Observer = observer; COMSTACK cs = comstack(); void *ap = cs_straddr (cs, addr); if (!ap) { - logf (LOG_LOG, "cs_straddr failed"); + logf (m_log, "cs_straddr failed"); return; } int res = cs_connect (cs, ap); - if (res < 0) + logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res); + m_socketObservable->addObserver(cs_fileno(cs), this); + m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT| + YAZ_SOCKET_OBSERVE_WRITE); + m_state = Connecting; +} + +void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd) +{ + close(); + m_PDU_Observer = observer; + if (fd >= 0) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect failed"); + CS_TYPE cs_type = tcpip_type; + m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950); + m_state = Ready; + m_socketObservable->addObserver(fd, this); + m_socketObservable->maskObserver(this, + YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT); + m_socketObservable->timeoutObserver(this, m_idleTime); + } +} + #if 1 - logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs)); - m_socketObservable->addObserver(cs_fileno(cs), this); - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT| - YAZ_SOCKET_OBSERVE_WRITE); - m_state = Connecting; +void Yaz_PDU_Assoc::childNotify(int fd) +{ + /// Clone PDU Observable (keep socket manager) + IYaz_PDU_Observable *new_observable = clone(); + + /// Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + /// Attach new socket to it + new_observable->socket(observer, fd); +} #else - close (); -#endif - } - else - { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs)); - m_socketObservable->addObserver(cs_fileno(cs), this); - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT| - YAZ_SOCKET_OBSERVE_WRITE); - if (res == 1) - { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect pending"); - m_state = Connecting; - } - else - { - logf (LOG_LOG, "Yaz_PDU_Assoc::Connect complete"); - m_state = Connected; - } - } + +#include +#include + +class thread_info { + Yaz_SocketManager *socketManager; + IYaz_PDU_Observable * + +}; + +static void *events(void *p) +{ + Yaz_SocketManager *s = (Yaz_SocketManager *) p; + + while (s->processEvent() > 0) + ; + return 0; +} + +void Yaz_PDU_Assoc::childNotify(int fd) +{ + Yaz_SocketManager *socket_observable = new Yaz_SocketManager; + IYaz_PDU_Observable *new_observable = clone(); + + m_socketObservable = socket_observable; + + /// Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + /// Attach new socket to it + new_observable->socket(observer, fd); + + pthread_t type; + + int id = pthread_create (&type, 0, events, socket_observable); + logf (LOG_LOG, "pthread_create returned id=%d", id); } +#endif +