Forgot to release normalized xml record. Whoops.
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.10 2007-01-04 03:16:14 quinn Exp $ */;
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/comstack.h>
15 #include <yaz/tcpip.h>
16 #include <yaz/proto.h>
17 #include <yaz/readconf.h>
18 #include <yaz/pquery.h>
19 #include <yaz/yaz-util.h>
20 #include <yaz/nmem.h>
21
22 #include "pazpar2.h"
23 #include "eventl.h"
24 #include "command.h"
25 #include "http.h"
26 #include "termlists.h"
27 #include "reclists.h"
28 #include "relevance.h"
29 #include "config.h"
30
31 #define PAZPAR2_VERSION "0.1"
32 #define MAX_CHUNK 15
33
34 static void client_fatal(struct client *cl);
35 static void connection_destroy(struct connection *co);
36 static int client_prep_connection(struct client *cl);
37 static void ingest_records(struct client *cl, Z_Records *r);
38 static struct conf_retrievalprofile *database_retrieval_profile(struct database *db);
39 void session_alert_watch(struct session *s, int what);
40
41 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
42
43 static struct connection *connection_freelist = 0;
44 static struct client *client_freelist = 0;
45
46 static struct host *hosts = 0;  // The hosts we know about 
47 static struct database *databases = 0; // The databases we know about
48
49 static char *client_states[] = {
50     "Client_Connecting",
51     "Client_Connected",
52     "Client_Idle",
53     "Client_Initializing",
54     "Client_Searching",
55     "Client_Presenting",
56     "Client_Error",
57     "Client_Failed",
58     "Client_Disconnected",
59     "Client_Stopped"
60 };
61
62 struct parameters global_parameters = 
63 {
64     0,
65     30,
66     "81",
67     "Index Data PazPar2 (MasterKey)",
68     PAZPAR2_VERSION,
69     600, // 10 minutes
70     60,
71     100,
72     MAX_CHUNK,
73     0,
74     0,
75     0,
76     0
77 };
78
79
80 static int send_apdu(struct client *c, Z_APDU *a)
81 {
82     struct connection *co = c->connection;
83     char *buf;
84     int len, r;
85
86     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
87     {
88         odr_perror(global_parameters.odr_out, "Encoding APDU");
89         abort();
90     }
91     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
92     r = cs_put(co->link, buf, len);
93     if (r < 0)
94     {
95         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
96         return -1;
97     }
98     else if (r == 1)
99     {
100         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
101         exit(1);
102     }
103     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
104     co->state = Conn_Waiting;
105     return 0;
106 }
107
108
109 static void send_init(IOCHAN i)
110 {
111     struct connection *co = iochan_getdata(i);
112     struct client *cl = co->client;
113     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
114
115     a->u.initRequest->implementationId = global_parameters.implementationId;
116     a->u.initRequest->implementationName = global_parameters.implementationName;
117     a->u.initRequest->implementationVersion =
118         global_parameters.implementationVersion;
119     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
120     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
121     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
122
123     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
124     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
125     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
126     if (send_apdu(cl, a) >= 0)
127     {
128         iochan_setflags(i, EVENT_INPUT);
129         cl->state = Client_Initializing;
130     }
131     else
132         cl->state = Client_Error;
133     odr_reset(global_parameters.odr_out);
134 }
135
136 static void send_search(IOCHAN i)
137 {
138     struct connection *co = iochan_getdata(i);
139     struct client *cl = co->client; 
140     struct session *se = cl->session;
141     struct database *db = cl->database;
142     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
143     int ndb, cerror, cpos;
144     char **databaselist;
145     Z_Query *zquery;
146     struct ccl_rpn_node *cn;
147     int ssub = 0, lslb = 100000, mspn = 10;
148
149     yaz_log(YLOG_DEBUG, "Sending search");
150
151     cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
152     if (!cn)
153         return;
154     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
155             sizeof(Z_Query));
156     zquery->which = Z_Query_type_1;
157     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
158     ccl_rpn_delete(cn);
159
160     for (ndb = 0; db->databases[ndb]; ndb++)
161         ;
162     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
163     for (ndb = 0; db->databases[ndb]; ndb++)
164         databaselist[ndb] = db->databases[ndb];
165
166     a->u.presentRequest->preferredRecordSyntax =
167             yaz_oidval_to_z3950oid(global_parameters.odr_out,
168             CLASS_RECSYN, VAL_USMARC);
169     a->u.searchRequest->smallSetUpperBound = &ssub;
170     a->u.searchRequest->largeSetLowerBound = &lslb;
171     a->u.searchRequest->mediumSetPresentNumber = &mspn;
172     a->u.searchRequest->resultSetName = "Default";
173     a->u.searchRequest->databaseNames = databaselist;
174     a->u.searchRequest->num_databaseNames = ndb;
175
176     if (send_apdu(cl, a) >= 0)
177     {
178         iochan_setflags(i, EVENT_INPUT);
179         cl->state = Client_Searching;
180         cl->requestid = se->requestid;
181     }
182     else
183         cl->state = Client_Error;
184
185     odr_reset(global_parameters.odr_out);
186 }
187
188 static void send_present(IOCHAN i)
189 {
190     struct connection *co = iochan_getdata(i);
191     struct client *cl = co->client; 
192     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
193     int toget;
194     int start = cl->records + 1;
195
196     toget = global_parameters.chunk;
197     if (toget > cl->hits - cl->records)
198         toget = cl->hits - cl->records;
199
200     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
201
202     a->u.presentRequest->resultSetStartPoint = &start;
203     a->u.presentRequest->numberOfRecordsRequested = &toget;
204
205     a->u.presentRequest->resultSetId = "Default";
206
207     a->u.presentRequest->preferredRecordSyntax =
208             yaz_oidval_to_z3950oid(global_parameters.odr_out,
209             CLASS_RECSYN, VAL_USMARC);
210
211     if (send_apdu(cl, a) >= 0)
212     {
213         iochan_setflags(i, EVENT_INPUT);
214         cl->state = Client_Presenting;
215     }
216     else
217         cl->state = Client_Error;
218     odr_reset(global_parameters.odr_out);
219 }
220
221 static void do_initResponse(IOCHAN i, Z_APDU *a)
222 {
223     struct connection *co = iochan_getdata(i);
224     struct client *cl = co->client;
225     Z_InitResponse *r = a->u.initResponse;
226
227     yaz_log(YLOG_DEBUG, "Received init response");
228
229     if (*r->result)
230     {
231         cl->state = Client_Idle;
232     }
233     else
234         cl->state = Client_Failed; // FIXME need to do something to the connection
235 }
236
237 static void do_searchResponse(IOCHAN i, Z_APDU *a)
238 {
239     struct connection *co = iochan_getdata(i);
240     struct client *cl = co->client;
241     struct session *se = cl->session;
242     Z_SearchResponse *r = a->u.searchResponse;
243
244     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
245
246     if (*r->searchStatus)
247     {
248         cl->hits = *r->resultCount;
249         se->total_hits += cl->hits;
250         if (r->presentStatus && !*r->presentStatus && r->records)
251         {
252             yaz_log(YLOG_DEBUG, "Records in search response");
253             cl->records += *r->numberOfRecordsReturned;
254             ingest_records(cl, r->records);
255         }
256         cl->state = Client_Idle;
257     }
258     else
259     {          /*"FAILED"*/
260         cl->hits = 0;
261         cl->state = Client_Error;
262         if (r->records) {
263             Z_Records *recs = r->records;
264             if (recs->which == Z_Records_NSD)
265             {
266                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
267                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
268                 cl->state = Client_Error;
269             }
270         }
271     }
272 }
273
274 char *normalize_mergekey(char *buf)
275 {
276     char *p = buf, *pout = buf;
277
278     while (*p)
279     {
280         while (*p && !isalnum(*p))
281             p++;
282         while (isalnum(*p))
283             *(pout++) = tolower(*(p++));
284         if (*p)
285             *(pout++) = ' ';
286         while (*p && !isalnum(*p))
287             p++;
288     }
289     if (buf != pout)
290         *pout = '\0';
291
292     return buf;
293 }
294
295
296 #ifdef GAGA
297 // FIXME needs to be generalized. Should flexibly generate X lists per search
298 static void extract_subject(struct session *s, const char *rec)
299 {
300     const char *field, *subfield;
301
302     while ((field = find_field(rec, "650")))
303     {
304         rec = field; 
305         if ((subfield = find_subfield(field, 'a')))
306         {
307             char *e, *ef;
308             char buf[1024];
309             int len;
310
311             ef = index(subfield, '\n');
312             if (!ef)
313                 return;
314             if ((e = index(subfield, '\t')) && e < ef)
315                 ef = e;
316             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
317                 ef--;
318             len = ef - subfield;
319             assert(len < 1023);
320             memcpy(buf, subfield, len);
321             buf[len] = '\0';
322 #ifdef FIXME
323             if (*buf)
324                 termlist_insert(s->termlist, buf);
325 #endif
326         }
327     }
328 }
329 #endif
330
331 static void add_facet(struct session *s, const char *type, const char *value)
332 {
333     int i;
334
335     for (i = 0; i < s->num_termlists; i++)
336         if (!strcmp(s->termlists[i].name, type))
337             break;
338     if (i == s->num_termlists)
339     {
340         if (i == SESSION_MAX_TERMLISTS)
341         {
342             yaz_log(YLOG_FATAL, "Too many termlists");
343             exit(1);
344         }
345         s->termlists[i].name = nmem_strdup(s->nmem, type);
346         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
347         s->num_termlists = i + 1;
348     }
349     termlist_insert(s->termlists[i].termlist, value);
350 }
351
352 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
353 {
354     struct conf_retrievalprofile *rprofile = cl->database->rprofile;
355     struct conf_retrievalmap *m;
356     xmlNode *res;
357     xmlDoc *rdoc;
358
359     // First normalize to XML
360     if (rprofile->native_syntax == Nativesyn_iso2709)
361     {
362         char *buf;
363         int len;
364         if (rec->which != Z_External_octet)
365         {
366             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
367             return 0;
368         }
369         buf = (char*) rec->u.octet_aligned->buf;
370         len = rec->u.octet_aligned->len;
371         if (yaz_marc_read_iso2709(rprofile->yaz_marc, buf, len) < 0)
372         {
373             yaz_log(YLOG_WARN, "Failed to decode MARC");
374             return 0;
375         }
376         if (yaz_marc_write_xml(rprofile->yaz_marc, &res,
377                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
378         {
379             yaz_log(YLOG_WARN, "Failed to encode as XML");
380             return 0;
381         }
382         rdoc = xmlNewDoc("1.0");
383         xmlDocSetRootElement(rdoc, res);
384     }
385     else
386     {
387         yaz_log(YLOG_FATAL, "Unknown native_syntax in normalize_record");
388         exit(1);
389     }
390     for (m = rprofile->maplist; m; m = m->next)
391     {
392         xmlDoc *new;
393         if (m->type != Map_xslt)
394         {
395             yaz_log(YLOG_WARN, "Unknown map type");
396             return 0;
397         }
398         if (!(new = xsltApplyStylesheet(m->stylesheet, rdoc, 0)))
399         {
400             yaz_log(YLOG_WARN, "XSLT transformation failed");
401             return 0;
402         }
403         xmlFreeDoc(rdoc);
404         rdoc = new;
405     }
406     if (global_parameters.dump_records)
407     {
408         fprintf(stderr, "Record:\n----------------\n");
409         xmlDocFormatDump(stderr, rdoc, 1);
410     }
411     return rdoc;
412 }
413
414 static struct record *ingest_record(struct client *cl, Z_External *rec)
415 {
416     xmlDoc *xdoc = normalize_record(cl, rec);
417     xmlNode *root, *n;
418     struct record *res, *head;
419     struct session *se = cl->session;
420     xmlChar *mergekey, *mergekey_norm;
421
422     if (!xdoc)
423         return 0;
424
425     root = xmlDocGetRootElement(xdoc);
426     if (!(mergekey = xmlGetProp(root, "mergekey")))
427     {
428         yaz_log(YLOG_WARN, "No mergekey found in record");
429         return 0;
430     }
431
432     res = nmem_malloc(se->nmem, sizeof(struct record));
433     res->next_cluster = 0;
434     res->target_offset = -1;
435     res->term_frequency_vec = 0;
436     res->title = "Unknown";
437     res->relevance = 0;
438
439     mergekey_norm = nmem_strdup(se->nmem, (char*) mergekey);
440     xmlFree(mergekey);
441     res->merge_key = normalize_mergekey(mergekey_norm);
442
443     head = reclist_insert(se->reclist, res);
444     relevance_newrec(se->relevance, head);
445
446     for (n = root->children; n; n = n->next)
447     {
448         if (n->type != XML_ELEMENT_NODE)
449             continue;
450         if (!strcmp(n->name, "facet"))
451         {
452             xmlChar *type = xmlGetProp(n, "type");
453             xmlChar *value = xmlNodeListGetString(xdoc, n->children, 0);
454             add_facet(se, type, value);
455             relevance_countwords(se->relevance, head, value, 1);
456             xmlFree(type);
457             xmlFree(value);
458         }
459         else if (!strcmp(n->name, "metadata"))
460         {
461             xmlChar *type = xmlGetProp(n, "type"), *value;
462             if (!strcmp(type, "title"))
463                 res->title = nmem_strdup(se->nmem,
464                         value = xmlNodeListGetString(xdoc, n->children, 0));
465
466             relevance_countwords(se->relevance, head, value, 4);
467             xmlFree(type);
468             xmlFree(value);
469         }
470         else
471             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
472     }
473
474     xmlFreeDoc(xdoc);
475
476     relevance_donerecord(se->relevance, head);
477     se->total_records++;
478
479     return res;
480 }
481
482 static void ingest_records(struct client *cl, Z_Records *r)
483 {
484     struct record *rec;
485     struct session *s = cl->session;
486     Z_NamePlusRecordList *rlist;
487     int i;
488
489     if (r->which != Z_Records_DBOSD)
490         return;
491     rlist = r->u.databaseOrSurDiagnostics;
492     for (i = 0; i < rlist->num_records; i++)
493     {
494         Z_NamePlusRecord *npr = rlist->records[i];
495
496         if (npr->which != Z_NamePlusRecord_databaseRecord)
497         {
498             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
499             continue;
500         }
501
502         rec = ingest_record(cl, npr->u.databaseRecord);
503         if (!rec)
504             continue;
505     }
506     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
507         session_alert_watch(s, SESSION_WATCH_RECORDS);
508 }
509
510 static void do_presentResponse(IOCHAN i, Z_APDU *a)
511 {
512     struct connection *co = iochan_getdata(i);
513     struct client *cl = co->client;
514     Z_PresentResponse *r = a->u.presentResponse;
515
516     if (r->records) {
517         Z_Records *recs = r->records;
518         if (recs->which == Z_Records_NSD)
519         {
520             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
521             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
522             cl->state = Client_Error;
523         }
524     }
525
526     if (!*r->presentStatus && cl->state != Client_Error)
527     {
528         yaz_log(YLOG_DEBUG, "Good Present response");
529         cl->records += *r->numberOfRecordsReturned;
530         ingest_records(cl, r->records);
531         cl->state = Client_Idle;
532     }
533     else if (*r->presentStatus) 
534     {
535         yaz_log(YLOG_WARN, "Bad Present response");
536         cl->state = Client_Error;
537     }
538 }
539
540 static void handler(IOCHAN i, int event)
541 {
542     struct connection *co = iochan_getdata(i);
543     struct client *cl = co->client;
544     struct session *se = 0;
545
546     if (cl)
547         se = cl->session;
548     else
549     {
550         yaz_log(YLOG_WARN, "Destroying orphan connection");
551         connection_destroy(co);
552         return;
553     }
554
555     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
556     {
557         int errcode;
558         socklen_t errlen = sizeof(errcode);
559
560         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
561             &errlen) < 0 || errcode != 0)
562         {
563             client_fatal(cl);
564             return;
565         }
566         else
567         {
568             yaz_log(YLOG_DEBUG, "Connect OK");
569             co->state = Conn_Open;
570             if (cl)
571                 cl->state = Client_Connected;
572         }
573     }
574
575     else if (event & EVENT_INPUT)
576     {
577         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
578
579         if (len < 0)
580         {
581             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from Z server");
582             connection_destroy(co);
583             return;
584         }
585         else if (len == 0)
586         {
587             yaz_log(YLOG_WARN, "EOF reading from Z server");
588             connection_destroy(co);
589             return;
590         }
591         else if (len > 1) // We discard input if we have no connection
592         {
593             co->state = Conn_Open;
594
595             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
596             {
597                 Z_APDU *a;
598
599                 odr_reset(global_parameters.odr_in);
600                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
601                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
602                 {
603                     client_fatal(cl);
604                     return;
605                 }
606                 switch (a->which)
607                 {
608                     case Z_APDU_initResponse:
609                         do_initResponse(i, a);
610                         break;
611                     case Z_APDU_searchResponse:
612                         do_searchResponse(i, a);
613                         break;
614                     case Z_APDU_presentResponse:
615                         do_presentResponse(i, a);
616                         break;
617                     default:
618                         yaz_log(YLOG_WARN, "Unexpected result from server");
619                         client_fatal(cl);
620                         return;
621                 }
622                 // We aren't expecting staggered output from target
623                 // if (cs_more(t->link))
624                 //    iochan_setevent(i, EVENT_INPUT);
625             }
626             else  // we throw away response and go to idle mode
627             {
628                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
629                 cl->state = Client_Idle;
630             }
631         }
632         /* if len==1 we do nothing but wait for more input */
633     }
634
635     if (cl->state == Client_Connected) {
636         send_init(i);
637     }
638
639     if (cl->state == Client_Idle)
640     {
641         if (cl->requestid != se->requestid && *se->query) {
642             send_search(i);
643         }
644         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
645             cl->records < cl->hits) {
646             send_present(i);
647         }
648     }
649 }
650
651 // Disassociate connection from client
652 static void connection_release(struct connection *co)
653 {
654     struct client *cl = co->client;
655
656     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
657     if (!cl)
658         return;
659     cl->connection = 0;
660     co->client = 0;
661 }
662
663 // Close connection and recycle structure
664 static void connection_destroy(struct connection *co)
665 {
666     struct host *h = co->host;
667     cs_close(co->link);
668     iochan_destroy(co->iochan);
669
670     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
671     if (h->connections == co)
672         h->connections = co->next;
673     else
674     {
675         struct connection *pco;
676         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
677             ;
678         if (pco)
679             pco->next = co->next;
680         else
681             abort();
682     }
683     if (co->client)
684     {
685         if (co->client->state != Client_Idle)
686             co->client->state = Client_Disconnected;
687         co->client->connection = 0;
688     }
689     co->next = connection_freelist;
690     connection_freelist = co;
691 }
692
693 // Creates a new connection for client, associated with the host of 
694 // client's database
695 static struct connection *connection_create(struct client *cl)
696 {
697     struct connection *new;
698     COMSTACK link; 
699     int res;
700     void *addr;
701
702     yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
703     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
704     {
705         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
706         exit(1);
707     }
708
709     if (!(addr = cs_straddr(link, cl->database->host->ipport)))
710     {
711         yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
712         return 0;
713     }
714
715     res = cs_connect(link, addr);
716     if (res < 0)
717     {
718         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
719         return 0;
720     }
721
722     if ((new = connection_freelist))
723         connection_freelist = new->next;
724     else
725     {
726         new = xmalloc(sizeof (struct connection));
727         new->ibuf = 0;
728         new->ibufsize = 0;
729     }
730     new->state = Conn_Connecting;
731     new->host = cl->database->host;
732     new->next = new->host->connections;
733     new->host->connections = new;
734     new->client = cl;
735     cl->connection = new;
736     new->link = link;
737
738     new->iochan = iochan_create(cs_fileno(link), handler, 0);
739     iochan_setdata(new->iochan, new);
740     new->iochan->next = channel_list;
741     channel_list = new->iochan;
742     return new;
743 }
744
745 // Close connection and set state to error
746 static void client_fatal(struct client *cl)
747 {
748     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
749     connection_destroy(cl->connection);
750     cl->state = Client_Error;
751 }
752
753 // Ensure that client has a connection associated
754 static int client_prep_connection(struct client *cl)
755 {
756     struct connection *co;
757     struct session *se = cl->session;
758     struct host *host = cl->database->host;
759
760     co = cl->connection;
761
762     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
763
764     if (!co)
765     {
766         // See if someone else has an idle connection
767         // We should look at timestamps here to select the longest-idle connection
768         for (co = host->connections; co; co = co->next)
769             if (co->state == Conn_Open && (!co->client || co->client->session != se))
770                 break;
771         if (co)
772         {
773             connection_release(co);
774             cl->connection = co;
775             co->client = cl;
776         }
777         else
778             co = connection_create(cl);
779     }
780     if (co)
781     {
782         if (co->state == Conn_Connecting)
783         {
784             cl->state = Client_Connecting;
785             iochan_setflag(co->iochan, EVENT_OUTPUT);
786         }
787         else if (co->state == Conn_Open)
788         {
789             if (cl->state == Client_Error || cl->state == Client_Disconnected)
790                 cl->state = Client_Idle;
791             iochan_setflag(co->iochan, EVENT_OUTPUT);
792         }
793         return 1;
794     }
795     else
796         return 0;
797 }
798
799 // This function will most likely vanish when a proper target profile mechanism is
800 // introduced.
801 void load_simpletargets(const char *fn)
802 {
803     FILE *f = fopen(fn, "r");
804     char line[256];
805
806     if (!f)
807     {
808         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
809         exit(1);
810     }
811
812     while (fgets(line, 255, f))
813     {
814         char *url, *db;
815         struct host *host;
816         struct database *database;
817
818         if (strncmp(line, "target ", 7))
819             continue;
820         url = line + 7;
821         url[strlen(url) - 1] = '\0';
822         yaz_log(YLOG_DEBUG, "Target: %s", url);
823         if ((db = strchr(url, '/')))
824             *(db++) = '\0';
825         else
826             db = "Default";
827
828         for (host = hosts; host; host = host->next)
829             if (!strcmp(url, host->hostport))
830                 break;
831         if (!host)
832         {
833             struct addrinfo *addrinfo, hints;
834             char *port;
835             char ipport[128];
836             unsigned char addrbuf[4];
837             int res;
838
839             host = xmalloc(sizeof(struct host));
840             host->hostport = xstrdup(url);
841             host->connections = 0;
842
843             if ((port = strchr(url, ':')))
844                 *(port++) = '\0';
845             else
846                 port = "210";
847
848             hints.ai_flags = 0;
849             hints.ai_family = PF_INET;
850             hints.ai_socktype = SOCK_STREAM;
851             hints.ai_protocol = IPPROTO_TCP;
852             hints.ai_addrlen = 0;
853             hints.ai_addr = 0;
854             hints.ai_canonname = 0;
855             hints.ai_next = 0;
856             // This is not robust code. It assumes that getaddrinfo returns AF_INET
857             // address.
858             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
859             {
860                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
861                 xfree(host->hostport);
862                 xfree(host);
863                 continue;
864             }
865             assert(addrinfo->ai_family == PF_INET);
866             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
867             sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
868                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
869             host->ipport = xstrdup(ipport);
870             freeaddrinfo(addrinfo);
871             host->next = hosts;
872             hosts = host;
873         }
874         database = xmalloc(sizeof(struct database));
875         database->host = host;
876         database->url = xmalloc(strlen(url) + strlen(db) + 2);
877         strcpy(database->url, url);
878         strcat(database->url, "/");
879         strcat(database->url, db);
880         
881         database->databases = xmalloc(2 * sizeof(char *));
882         database->databases[0] = xstrdup(db);
883         database->databases[1] = 0;
884         database->errors = 0;
885         database->qprofile = 0;
886         database->rprofile = database_retrieval_profile(database);
887         database->next = databases;
888         databases = database;
889
890     }
891     fclose(f);
892 }
893
894 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
895 {
896     switch (n->kind)
897     {
898         case CCL_RPN_AND:
899         case CCL_RPN_OR:
900         case CCL_RPN_NOT:
901         case CCL_RPN_PROX:
902             pull_terms(nmem, n->u.p[0], termlist, num);
903             pull_terms(nmem, n->u.p[1], termlist, num);
904             break;
905         case CCL_RPN_TERM:
906             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
907             break;
908         default: // NOOP
909             break;
910     }
911 }
912
913 // Extract terms from query into null-terminated termlist
914 static int extract_terms(NMEM nmem, char *query, char **termlist)
915 {
916     int error, pos;
917     struct ccl_rpn_node *n;
918     int num = 0;
919
920     n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
921     if (!n)
922         return -1;
923     pull_terms(nmem, n, termlist, &num);
924     termlist[num] = 0;
925     ccl_rpn_delete(n);
926     return 0;
927 }
928
929 static struct client *client_create(void)
930 {
931     struct client *r;
932     if (client_freelist)
933     {
934         r = client_freelist;
935         client_freelist = client_freelist->next;
936     }
937     else
938         r = xmalloc(sizeof(struct client));
939     r->database = 0;
940     r->connection = 0;
941     r->session = 0;
942     r->hits = 0;
943     r->records = 0;
944     r->setno = 0;
945     r->requestid = -1;
946     r->diagnostic = 0;
947     r->state = Client_Disconnected;
948     r->next = 0;
949     return r;
950 }
951
952 void client_destroy(struct client *c)
953 {
954     struct session *se = c->session;
955     if (c == se->clients)
956         se->clients = c->next;
957     else
958     {
959         struct client *cc;
960         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
961             ;
962         if (cc)
963             cc->next = c->next;
964     }
965     if (c->connection)
966         connection_release(c->connection);
967     c->next = client_freelist;
968     client_freelist = c;
969 }
970
971 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
972 {
973     s->watchlist[what].fun = fun;
974     s->watchlist[what].data = data;
975 }
976
977 void session_alert_watch(struct session *s, int what)
978 {
979     if (!s->watchlist[what].fun)
980         return;
981     (*s->watchlist[what].fun)(s->watchlist[what].data);
982     s->watchlist[what].fun = 0;
983     s->watchlist[what].data = 0;
984 }
985
986 // This needs to be extended with selection criteria
987 static struct conf_retrievalprofile *database_retrieval_profile(struct database *db)
988 {
989     if (!config)
990     {
991         yaz_log(YLOG_FATAL, "Must load configuration (-f)");
992         exit(1);
993     }
994     if (!config->retrievalprofiles)
995     {
996         yaz_log(YLOG_FATAL, "No retrieval profiles defined");
997     }
998     return config->retrievalprofiles;
999 }
1000
1001 // This should be extended with parameters to control selection criteria
1002 // Associates a set of clients with a session;
1003 int select_targets(struct session *se)
1004 {
1005     struct database *db;
1006     int c = 0;
1007
1008     while (se->clients)
1009         client_destroy(se->clients);
1010     for (db = databases; db; db = db->next)
1011     {
1012         struct client *cl = client_create();
1013         cl->database = db;
1014         cl->session = se;
1015         cl->next = se->clients;
1016         se->clients = cl;
1017         c++;
1018     }
1019     return c;
1020 }
1021
1022 int session_active_clients(struct session *s)
1023 {
1024     struct client *c;
1025     int res = 0;
1026
1027     for (c = s->clients; c; c = c->next)
1028         if (c->connection && (c->state == Client_Connecting ||
1029                     c->state == Client_Initializing ||
1030                     c->state == Client_Searching ||
1031                     c->state == Client_Presenting))
1032             res++;
1033
1034     return res;
1035 }
1036
1037 char *search(struct session *se, char *query)
1038 {
1039     int live_channels = 0;
1040     struct client *cl;
1041
1042     yaz_log(YLOG_DEBUG, "Search");
1043
1044     strcpy(se->query, query);
1045     se->requestid++;
1046     nmem_reset(se->nmem);
1047     for (cl = se->clients; cl; cl = cl->next)
1048     {
1049         cl->hits = -1;
1050         cl->records = 0;
1051         cl->diagnostic = 0;
1052
1053         if (client_prep_connection(cl))
1054             live_channels++;
1055     }
1056     if (live_channels)
1057     {
1058         char *p[512];
1059         int maxrecs = live_channels * global_parameters.toget;
1060         se->num_termlists = 0;
1061         se->reclist = reclist_create(se->nmem, maxrecs);
1062         extract_terms(se->nmem, query, p);
1063         se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
1064         se->total_records = se->total_hits = 0;
1065         se->expected_maxrecs = maxrecs;
1066     }
1067     else
1068         return "NOTARGETS";
1069
1070     return 0;
1071 }
1072
1073 void destroy_session(struct session *s)
1074 {
1075     yaz_log(YLOG_LOG, "Destroying session");
1076     while (s->clients)
1077         client_destroy(s->clients);
1078     nmem_destroy(s->nmem);
1079     wrbuf_free(s->wrbuf, 1);
1080 }
1081
1082 struct session *new_session() 
1083 {
1084     int i;
1085     struct session *session = xmalloc(sizeof(*session));
1086
1087     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1088     
1089     session->total_hits = 0;
1090     session->total_records = 0;
1091     session->num_termlists = 0;
1092     session->reclist = 0;
1093     session->requestid = -1;
1094     session->clients = 0;
1095     session->expected_maxrecs = 0;
1096     session->query[0] = '\0';
1097     session->nmem = nmem_create();
1098     session->wrbuf = wrbuf_alloc();
1099     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1100     {
1101         session->watchlist[i].data = 0;
1102         session->watchlist[i].fun = 0;
1103     }
1104
1105     select_targets(session);
1106
1107     return session;
1108 }
1109
1110 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1111 {
1112     static struct hitsbytarget res[1000]; // FIXME MM
1113     struct client *cl;
1114
1115     *count = 0;
1116     for (cl = se->clients; cl; cl = cl->next)
1117     {
1118         strcpy(res[*count].id, cl->database->host->hostport);
1119         res[*count].hits = cl->hits;
1120         res[*count].records = cl->records;
1121         res[*count].diagnostic = cl->diagnostic;
1122         res[*count].state = client_states[cl->state];
1123         res[*count].connected  = cl->connection ? 1 : 0;
1124         (*count)++;
1125     }
1126
1127     return res;
1128 }
1129
1130 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1131 {
1132     int i;
1133
1134     for (i = 0; i < s->num_termlists; i++)
1135         if (!strcmp(s->termlists[i].name, name))
1136             return termlist_highscore(s->termlists[i].termlist, num);
1137     return 0;
1138 }
1139
1140 #ifdef REPORT_NMEM
1141 // conditional compilation by SH: This lead to a warning with currently installed
1142 // YAZ header files on us1
1143 void report_nmem_stats(void)
1144 {
1145     size_t in_use, is_free;
1146
1147     nmem_get_memory_in_use(&in_use);
1148     nmem_get_memory_free(&is_free);
1149
1150     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1151             (long) in_use, (long) is_free);
1152 }
1153 #endif
1154
1155 struct record **show(struct session *s, int start, int *num, int *total,
1156                      int *sumhits, NMEM nmem_show)
1157 {
1158     struct record **recs = nmem_malloc(nmem_show, *num 
1159                                        * sizeof(struct record *));
1160     int i;
1161
1162     relevance_prepare_read(s->relevance, s->reclist);
1163
1164     *total = s->reclist->num_records;
1165     *sumhits = s->total_hits;
1166
1167     for (i = 0; i < start; i++)
1168         if (!reclist_read_record(s->reclist))
1169         {
1170             *num = 0;
1171             return 0;
1172         }
1173
1174     for (i = 0; i < *num; i++)
1175     {
1176         struct record *r = reclist_read_record(s->reclist);
1177         if (!r)
1178         {
1179             *num = i;
1180             break;
1181         }
1182         recs[i] = r;
1183     }
1184     return recs;
1185 }
1186
1187 void statistics(struct session *se, struct statistics *stat)
1188 {
1189     struct client *cl;
1190     int count = 0;
1191
1192     bzero(stat, sizeof(*stat));
1193     for (cl = se->clients; cl; cl = cl->next)
1194     {
1195         if (!cl->connection)
1196             stat->num_no_connection++;
1197         switch (cl->state)
1198         {
1199             case Client_Connecting: stat->num_connecting++; break;
1200             case Client_Initializing: stat->num_initializing++; break;
1201             case Client_Searching: stat->num_searching++; break;
1202             case Client_Presenting: stat->num_presenting++; break;
1203             case Client_Idle: stat->num_idle++; break;
1204             case Client_Failed: stat->num_failed++; break;
1205             case Client_Error: stat->num_error++; break;
1206             default: break;
1207         }
1208         count++;
1209     }
1210     stat->num_hits = se->total_hits;
1211     stat->num_records = se->total_records;
1212
1213     stat->num_clients = count;
1214 }
1215
1216 static CCL_bibset load_cclfile(const char *fn)
1217 {
1218     CCL_bibset res = ccl_qual_mk();
1219     if (ccl_qual_fname(res, fn) < 0)
1220     {
1221         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
1222         exit(1);
1223     }
1224     return res;
1225 }
1226
1227 int main(int argc, char **argv)
1228 {
1229     int ret;
1230     char *arg;
1231     int setport = 0;
1232
1233     if (signal(SIGPIPE, SIG_IGN) < 0)
1234         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1235
1236     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1237
1238     while ((ret = options("f:x:c:h:p:C:s:d", argv, argc, &arg)) != -2)
1239     {
1240         switch (ret) {
1241             case 'f':
1242                 if (!read_config(arg))
1243                     exit(1);
1244                 break;
1245             case 'c':
1246                 command_init(atoi(arg));
1247                 setport++;
1248                 break;
1249             case 'h':
1250                 http_init(arg);
1251                 setport++;
1252                 break;
1253             case 'C':
1254                 global_parameters.ccl_filter = load_cclfile(arg);
1255                 break;
1256             case 'p':
1257                 http_set_proxyaddr(arg);
1258                 break;
1259             case 's':
1260                 load_simpletargets(arg);
1261                 break;
1262             case 'd':
1263                 global_parameters.dump_records = 1;
1264                 break;
1265             default:
1266                 fprintf(stderr, "Usage: pazpar2\n"
1267                         "    -f configfile\n"
1268                         "    -h [host:]port          (REST protocol listener)\n"
1269                         "    -c cmdport              (telnet-style)\n"
1270                         "    -C cclconfig\n"
1271                         "    -s simpletargetfile\n"
1272                         "    -p hostname[:portno]    (HTTP proxy)\n");
1273                 exit(1);
1274         }
1275     }
1276
1277     if (!setport)
1278     {
1279         fprintf(stderr, "Set command port with -h or -c\n");
1280         exit(1);
1281     }
1282
1283     global_parameters.ccl_filter = load_cclfile("../etc/default.bib");
1284     global_parameters.yaz_marc = yaz_marc_create();
1285     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1286     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1287     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1288
1289     event_loop(&channel_list);
1290
1291     return 0;
1292 }
1293
1294 /*
1295  * Local variables:
1296  * c-basic-offset: 4
1297  * indent-tabs-mode: nil
1298  * End:
1299  * vim: shiftwidth=4 tabstop=8 expandtab
1300  */