Basic target management
[pazpar2-moved-to-github.git] / pazpar2.c
1 /* $Id: pazpar2.c,v 1.8 2006-12-03 06:43:24 quinn Exp $ */;
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/comstack.h>
15 #include <yaz/tcpip.h>
16 #include <yaz/proto.h>
17 #include <yaz/readconf.h>
18 #include <yaz/pquery.h>
19 #include <yaz/yaz-util.h>
20 #include <yaz/ccl.h>
21 #include <yaz/yaz-ccl.h>
22
23 #include "pazpar2.h"
24 #include "eventl.h"
25 #include "command.h"
26 #include "http.h"
27 #include "termlists.h"
28 #include "reclists.h"
29 #include "relevance.h"
30
31 #define PAZPAR2_VERSION "0.1"
32 #define MAX_CHUNK 10
33
34 static void client_fatal(struct client *cl);
35 static void connection_destroy(struct connection *co);
36 static int client_prep_connection(struct client *cl);
37
38 IOCHAN channel_list = 0;  // Master list of connections we're listening to.
39
40 static struct connection *connection_freelist = 0;
41 static struct client *client_freelist = 0;
42
43 static struct host *hosts = 0;  // The hosts we know about 
44 static struct database *databases = 0; // The databases we know about
45
46 static char *client_states[] = {
47     "Client_Connecting",
48     "Client_Connected",
49     "Client_Idle",
50     "Client_Initializing",
51     "Client_Searching",
52     "Client_Presenting",
53     "Client_Error",
54     "Client_Failed",
55     "Client_Disconnected",
56     "Client_Stopped"
57 };
58
59 static struct parameters {
60     int timeout;                /* operations timeout, in seconds */
61     char implementationId[128];
62     char implementationName[128];
63     char implementationVersion[128];
64     int target_timeout; // seconds
65     int toget;
66     int chunk;
67     CCL_bibset ccl_filter;
68     yaz_marc_t yaz_marc;
69     ODR odr_out;
70     ODR odr_in;
71 } global_parameters = 
72 {
73     30,
74     "81",
75     "Index Data PazPar2 (MasterKey)",
76     PAZPAR2_VERSION,
77     600, // 10 minutes
78     100,
79     MAX_CHUNK,
80     0,
81     0,
82     0,
83     0
84 };
85
86
87 static int send_apdu(struct client *c, Z_APDU *a)
88 {
89     struct connection *co = c->connection;
90     char *buf;
91     int len, r;
92
93     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
94     {
95         odr_perror(global_parameters.odr_out, "Encoding APDU");
96         abort();
97     }
98     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
99     r = cs_put(co->link, buf, len);
100     if (r < 0)
101     {
102         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
103         return -1;
104     }
105     else if (r == 1)
106     {
107         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
108         exit(1);
109     }
110     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
111     co->state = Conn_Waiting;
112     return 0;
113 }
114
115
116 static void send_init(IOCHAN i)
117 {
118     struct connection *co = iochan_getdata(i);
119     struct client *cl = co->client;
120     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
121
122     a->u.initRequest->implementationId = global_parameters.implementationId;
123     a->u.initRequest->implementationName = global_parameters.implementationName;
124     a->u.initRequest->implementationVersion =
125         global_parameters.implementationVersion;
126     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
127     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
128     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
129
130     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
131     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
132     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
133     if (send_apdu(cl, a) >= 0)
134     {
135         iochan_setflags(i, EVENT_INPUT);
136         cl->state = Client_Initializing;
137     }
138     else
139         cl->state = Client_Error;
140     odr_reset(global_parameters.odr_out);
141 }
142
143 static void send_search(IOCHAN i)
144 {
145     struct connection *co = iochan_getdata(i);
146     struct client *cl = co->client; 
147     struct session *se = cl->session;
148     struct database *db = cl->database;
149     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
150     int ndb, cerror, cpos;
151     char **databaselist;
152     Z_Query *zquery;
153     struct ccl_rpn_node *cn;
154
155     yaz_log(YLOG_DEBUG, "Sending search");
156
157     cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
158     if (!cn)
159         return;
160     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
161             sizeof(Z_Query));
162     zquery->which = Z_Query_type_1;
163     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
164     ccl_rpn_delete(cn);
165
166     for (ndb = 0; *db->databases[ndb]; ndb++)
167         ;
168     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
169     for (ndb = 0; *db->databases[ndb]; ndb++)
170         databaselist[ndb] = db->databases[ndb];
171
172     a->u.searchRequest->resultSetName = "Default";
173     a->u.searchRequest->databaseNames = databaselist;
174     a->u.searchRequest->num_databaseNames = ndb;
175
176     if (send_apdu(cl, a) >= 0)
177     {
178         iochan_setflags(i, EVENT_INPUT);
179         cl->state = Client_Searching;
180         cl->requestid = se->requestid;
181     }
182     else
183         cl->state = Client_Error;
184
185     odr_reset(global_parameters.odr_out);
186 }
187
188 static void send_present(IOCHAN i)
189 {
190     struct connection *co = iochan_getdata(i);
191     struct client *cl = co->client; 
192     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
193     int toget;
194     int start = cl->records + 1;
195
196     toget = global_parameters.chunk;
197     if (toget > cl->hits - cl->records)
198         toget = cl->hits - cl->records;
199
200     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
201
202     a->u.presentRequest->resultSetStartPoint = &start;
203     a->u.presentRequest->numberOfRecordsRequested = &toget;
204
205     a->u.presentRequest->resultSetId = "Default";
206
207     a->u.presentRequest->preferredRecordSyntax =
208             yaz_oidval_to_z3950oid(global_parameters.odr_out,
209             CLASS_RECSYN, VAL_USMARC);
210
211     if (send_apdu(cl, a) >= 0)
212     {
213         iochan_setflags(i, EVENT_INPUT);
214         cl->state = Client_Presenting;
215     }
216     else
217         cl->state = Client_Error;
218     odr_reset(global_parameters.odr_out);
219 }
220
221 static void do_initResponse(IOCHAN i, Z_APDU *a)
222 {
223     struct connection *co = iochan_getdata(i);
224     struct client *cl = co->client;
225     Z_InitResponse *r = a->u.initResponse;
226
227     yaz_log(YLOG_DEBUG, "Received init response");
228
229     if (*r->result)
230     {
231         cl->state = Client_Idle;
232     }
233     else
234         cl->state = Client_Failed; // FIXME need to do something to the connection
235 }
236
237 static void do_searchResponse(IOCHAN i, Z_APDU *a)
238 {
239     struct connection *co = iochan_getdata(i);
240     struct client *cl = co->client;
241     struct session *se = cl->session;
242     Z_SearchResponse *r = a->u.searchResponse;
243
244     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
245
246     if (*r->searchStatus)
247     {
248         cl->hits = *r->resultCount;
249         cl->state = Client_Idle;
250         se->total_hits += cl->hits;
251     }
252     else
253     {          /*"FAILED"*/
254         cl->hits = 0;
255         cl->state = Client_Error;
256         if (r->records) {
257             Z_Records *recs = r->records;
258             if (recs->which == Z_Records_NSD)
259             {
260                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
261                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
262                 cl->state = Client_Error;
263             }
264         }
265     }
266 }
267
268 const char *find_field(const char *rec, const char *field)
269 {
270     char lbuf[5];
271     char *line;
272
273     lbuf[0] = '\n';
274     strcpy(lbuf + 1, field);
275
276     if ((line = strstr(rec, lbuf)))
277         return ++line;
278     else
279         return 0;
280 }
281
282 const char *find_subfield(const char *field, char subfield)
283 {
284     const char *p = field;
285
286     while (*p && *p != '\n')
287     {
288         while (*p != '\n' && *p != '\t')
289             p++;
290         if (*p == '\t' && *(++p) == subfield) {
291             if (*(++p) == ' ')
292             {
293                 while (isspace(*p))
294                     p++;
295                 return p;
296             }
297         }
298     }
299     return 0;
300 }
301
302 // Extract 245 $a $b 100 $a
303 char *extract_title(struct session *s, const char *rec)
304 {
305     const char *field, *subfield;
306     char *e, *ef;
307     unsigned char *obuf, *p;
308
309     wrbuf_rewind(s->wrbuf);
310
311     if (!(field = find_field(rec, "245")))
312         return 0;
313     if (!(subfield = find_subfield(field, 'a')))
314         return 0;
315     ef = index(subfield, '\n');
316     if ((e = index(subfield, '\t')) && e < ef)
317         ef = e;
318     if (ef)
319     {
320         wrbuf_write(s->wrbuf, subfield, ef - subfield);
321         if ((subfield = find_subfield(field, 'b'))) 
322         {
323             ef = index(subfield, '\n');
324             if ((e = index(subfield, '\t')) && e < ef)
325                 ef = e;
326             if (ef)
327             {
328                 wrbuf_putc(s->wrbuf, ' ');
329                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
330             }
331         }
332     }
333     if ((field = find_field(rec, "100")))
334     {
335         if ((subfield = find_subfield(field, 'a')))
336         {
337             ef = index(subfield, '\n');
338             if ((e = index(subfield, '\t')) && e < ef)
339                 ef = e;
340             if (ef)
341             {
342                 wrbuf_puts(s->wrbuf, ", by ");
343                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
344             }
345         }
346     }
347     wrbuf_putc(s->wrbuf, '\0');
348     obuf = nmem_strdup(s->nmem, wrbuf_buf(s->wrbuf));
349     for (p = obuf; *p; p++)
350         if (*p == '&' || *p == '<' || *p > 122 || *p < ' ')
351             *p = ' ';
352     return obuf;
353 }
354
355 // Extract 245 $a $b 100 $a
356 char *extract_mergekey(struct session *s, const char *rec)
357 {
358     const char *field, *subfield;
359     char *e, *ef;
360     char *out, *p, *pout;
361
362     wrbuf_rewind(s->wrbuf);
363
364     if (!(field = find_field(rec, "245")))
365         return 0;
366     if (!(subfield = find_subfield(field, 'a')))
367         return 0;
368     ef = index(subfield, '\n');
369     if ((e = index(subfield, '\t')) && e < ef)
370         ef = e;
371     if (ef)
372     {
373         wrbuf_write(s->wrbuf, subfield, ef - subfield);
374         if ((subfield = find_subfield(field, 'b'))) 
375         {
376             ef = index(subfield, '\n');
377             if ((e = index(subfield, '\t')) && e < ef)
378                 ef = e;
379             if (ef)
380             {
381                 wrbuf_puts(s->wrbuf, " field "); 
382                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
383             }
384         }
385     }
386     if ((field = find_field(rec, "100")))
387     {
388         if ((subfield = find_subfield(field, 'a')))
389         {
390             ef = index(subfield, '\n');
391             if ((e = index(subfield, '\t')) && e < ef)
392                 ef = e;
393             if (ef)
394             {
395                 wrbuf_puts(s->wrbuf, " field "); 
396                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
397             }
398         }
399     }
400     wrbuf_putc(s->wrbuf, '\0');
401     p = wrbuf_buf(s->wrbuf);
402     out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
403
404     while (*p)
405     {
406         while (isalnum(*p))
407             *(pout++) = tolower(*(p++));
408         while (*p && !isalnum(*p))
409             p++;
410         *(pout++) = ' ';
411     }
412     if (out != pout)
413         *(--pout) = '\0';
414
415     return out;
416 }
417
418 #ifdef RECHEAP
419 static void push_record(struct session *s, struct record *r)
420 {
421     int p;
422     assert(s->recheap_max + 1 < s->recheap_size);
423
424     s->recheap[p = ++s->recheap_max] = r;
425     while (p > 0)
426     {
427         int parent = (p - 1) >> 1;
428         if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
429         {
430             struct record *tmp;
431             tmp = s->recheap[parent];
432             s->recheap[parent] = s->recheap[p];
433             s->recheap[p] = tmp;
434             p = parent;
435         }
436         else
437             break;
438     }
439 }
440
441 static struct record *top_record(struct session *s)
442 {
443     return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
444 }
445
446 static struct record *pop_record(struct session *s)
447 {
448     struct record *res;
449     int p = 0;
450     int lastnonleaf = (s->recheap_max - 1) >> 1;
451
452     if (s->recheap_max < 0)
453         return 0;
454
455     res = s->recheap[0];
456
457     s->recheap[p] = s->recheap[s->recheap_max--];
458
459     while (p <= lastnonleaf)
460     {
461         int right = (p + 1) << 1;
462         int left = right - 1;
463         int min = left;
464
465         if (right < s->recheap_max &&
466                 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
467             min = right;
468         if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
469         {
470             struct record *tmp = s->recheap[min];
471             s->recheap[min] = s->recheap[p];
472             s->recheap[p] = tmp;
473             p = min;
474         }
475         else
476             break;
477     }
478     return res;
479 }
480
481 // Like pop_record but collapses identical (merge_key) records
482 // The heap will contain multiple independent matching records and possibly
483 // one cluster, created the last time the list was scanned
484 static struct record *pop_mrecord(struct session *s)
485 {
486     struct record *this;
487     struct record *next;
488
489     if (!(this = pop_record(s)))
490         return 0;
491
492     // Collapse identical records
493     while ((next = top_record(s)))
494     {
495         struct record *p, *tmpnext;
496         if (strcmp(this->merge_key, next->merge_key))
497             break;
498         // Absorb record (and clustersiblings) into a supercluster
499         for (p = next; p; p = tmpnext) {
500             tmpnext = p->next_cluster;
501             p->next_cluster = this->next_cluster;
502             this->next_cluster = p;
503         }
504
505         pop_record(s);
506     }
507     return this;
508 }
509
510 // Reads records in sort order. Store records in top of heapspace until rewind is called.
511 static struct record *read_recheap(struct session *s)
512 {
513     struct record *r = pop_mrecord(s);
514
515     if (r)
516     {
517         if (s->recheap_scratch < 0)
518             s->recheap_scratch = s->recheap_size;
519         s->recheap[--s->recheap_scratch] = r;
520     }
521
522     return r;
523 }
524
525 // Return records to heap after read
526 static void rewind_recheap(struct session *s)
527 {
528     while (s->recheap_scratch >= 0) {
529         push_record(s, s->recheap[s->recheap_scratch++]);
530         if (s->recheap_scratch >= s->recheap_size)
531             s->recheap_scratch = -1;
532     }
533 }
534
535 #endif
536
537 // FIXME needs to be generalized. Should flexibly generate X lists per search
538 static void extract_subject(struct session *s, const char *rec)
539 {
540     const char *field, *subfield;
541
542     while ((field = find_field(rec, "650")))
543     {
544         rec = field; 
545         if ((subfield = find_subfield(field, 'a')))
546         {
547             char *e, *ef;
548             char buf[1024];
549             int len;
550
551             ef = index(subfield, '\n');
552             if (!ef)
553                 return;
554             if ((e = index(subfield, '\t')) && e < ef)
555                 ef = e;
556             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
557                 ef--;
558             len = ef - subfield;
559             assert(len < 1023);
560             memcpy(buf, subfield, len);
561             buf[len] = '\0';
562             if (*buf)
563                 termlist_insert(s->termlist, buf);
564         }
565     }
566 }
567
568 static void pull_relevance_field(struct session *s, struct record *head, const char *rec,
569         char *field, int mult)
570 {
571     const char *fb;
572     while ((fb = find_field(rec, field)))
573     {
574         char *ffield = strchr(fb, '\t');
575         if (!ffield)
576             return;
577         char *eol = strchr(ffield, '\n');
578         if (!eol)
579             return;
580         relevance_countwords(s->relevance, head, ffield, eol - ffield, mult);
581         rec = field + 1; // Crude way to cause a loop through repeating fields
582     }
583 }
584
585 static void pull_relevance_keys(struct session *s, struct record *head,  struct record *rec)
586 {
587     relevance_newrec(s->relevance, head);
588     pull_relevance_field(s, head, rec->buf, "100", 2);
589     pull_relevance_field(s, head, rec->buf, "245", 4);
590     //pull_relevance_field(s, head, rec->buf, "530", 1);
591     pull_relevance_field(s, head, rec->buf, "630", 1);
592     pull_relevance_field(s, head, rec->buf, "650", 1);
593     pull_relevance_field(s, head, rec->buf, "700", 1);
594     relevance_donerecord(s->relevance, head);
595 }
596
597 struct record *ingest_record(struct client *cl, char *buf, int len)
598 {
599     struct session *se = cl->session;
600     struct record *res;
601     struct record *head;
602     const char *recbuf;
603
604     wrbuf_rewind(se->wrbuf);
605     yaz_marc_xml(global_parameters.yaz_marc, YAZ_MARC_LINE);
606     if (yaz_marc_decode_wrbuf(global_parameters.yaz_marc, buf, len, se->wrbuf) < 0)
607     {
608         yaz_log(YLOG_WARN, "Failed to decode MARC record");
609         return 0;
610     }
611     wrbuf_putc(se->wrbuf, '\0');
612     recbuf = wrbuf_buf(se->wrbuf);
613
614     res = nmem_malloc(se->nmem, sizeof(struct record));
615     res->buf = nmem_strdup(se->nmem, recbuf);
616
617     extract_subject(se, res->buf);
618
619     res->title = extract_title(se, res->buf);
620     res->merge_key = extract_mergekey(se, res->buf);
621     if (!res->merge_key)
622         return 0;
623     res->client = cl;
624     res->next_cluster = 0;
625     res->target_offset = -1;
626     res->term_frequency_vec = 0;
627
628     head = reclist_insert(se->reclist, res);
629
630     pull_relevance_keys(se, head, res);
631
632     se->total_records++;
633
634     return res;
635 }
636
637 void ingest_records(struct client *cl, Z_Records *r)
638 {
639     struct record *rec;
640     Z_NamePlusRecordList *rlist;
641     int i;
642
643     if (r->which != Z_Records_DBOSD)
644         return;
645     rlist = r->u.databaseOrSurDiagnostics;
646     for (i = 0; i < rlist->num_records; i++)
647     {
648         Z_NamePlusRecord *npr = rlist->records[i];
649         Z_External *e;
650         char *buf;
651         int len;
652
653         if (npr->which != Z_NamePlusRecord_databaseRecord)
654         {
655             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
656             continue;
657         }
658         e = npr->u.databaseRecord;
659         if (e->which != Z_External_octet)
660         {
661             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
662             continue;
663         }
664         buf = (char*) e->u.octet_aligned->buf;
665         len = e->u.octet_aligned->len;
666
667         rec = ingest_record(cl, buf, len);
668         if (!rec)
669             continue;
670     }
671 }
672
673 static void do_presentResponse(IOCHAN i, Z_APDU *a)
674 {
675     struct connection *co = iochan_getdata(i);
676     struct client *cl = co->client;
677     Z_PresentResponse *r = a->u.presentResponse;
678
679     if (r->records) {
680         Z_Records *recs = r->records;
681         if (recs->which == Z_Records_NSD)
682         {
683             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
684             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
685             cl->state = Client_Error;
686         }
687     }
688
689     if (!*r->presentStatus && cl->state != Client_Error)
690     {
691         yaz_log(YLOG_DEBUG, "Good Present response");
692         cl->records += *r->numberOfRecordsReturned;
693         ingest_records(cl, r->records);
694         cl->state = Client_Idle;
695     }
696     else if (*r->presentStatus) 
697     {
698         yaz_log(YLOG_WARN, "Bad Present response");
699         cl->state = Client_Error;
700     }
701 }
702
703 static void handler(IOCHAN i, int event)
704 {
705     struct connection *co = iochan_getdata(i);
706     struct client *cl = co->client;
707     struct session *se = 0;
708
709     if (cl)
710         se = cl->session;
711
712     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
713     {
714         int errcode;
715         socklen_t errlen = sizeof(errcode);
716
717         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
718             &errlen) < 0 || errcode != 0)
719         {
720             client_fatal(cl);
721             return;
722         }
723         else
724         {
725             yaz_log(YLOG_DEBUG, "Connect OK");
726             co->state = Conn_Open;
727             if (cl)
728                 cl->state = Client_Connected;
729         }
730     }
731
732     else if (event & EVENT_INPUT)
733     {
734         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
735
736         if (len < 0)
737         {
738             client_fatal(cl);
739             return;
740         }
741         else if (len == 0)
742         {
743             client_fatal(cl);
744             return;
745         }
746         else if (len > 1) // We discard input if we have no connection
747         {
748             co->state = Conn_Open;
749
750             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
751             {
752                 Z_APDU *a;
753
754                 odr_reset(global_parameters.odr_in);
755                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
756                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
757                 {
758                     client_fatal(cl);
759                     return;
760                 }
761                 switch (a->which)
762                 {
763                     case Z_APDU_initResponse:
764                         do_initResponse(i, a);
765                         break;
766                     case Z_APDU_searchResponse:
767                         do_searchResponse(i, a);
768                         break;
769                     case Z_APDU_presentResponse:
770                         do_presentResponse(i, a);
771                         break;
772                     default:
773                         yaz_log(YLOG_WARN, "Unexpected result from server");
774                         client_fatal(cl);
775                         return;
776                 }
777                 // We aren't expecting staggered output from target
778                 // if (cs_more(t->link))
779                 //    iochan_setevent(i, EVENT_INPUT);
780             }
781             else  // we throw away response and go to idle mode
782             {
783                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
784                 cl->state = Client_Idle;
785             }
786         }
787         /* if len==1 we do nothing but wait for more input */
788     }
789
790     if (cl->state == Client_Connected) {
791         send_init(i);
792     }
793
794     if (cl->state == Client_Idle)
795     {
796         if (cl->requestid != se->requestid && *se->query) {
797             send_search(i);
798         }
799         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
800             cl->records < cl->hits) {
801             send_present(i);
802         }
803     }
804 }
805
806 // Disassociate connection from client
807 static void connection_release(struct connection *co)
808 {
809     struct client *cl = co->client;
810
811     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
812     if (!cl)
813         return;
814     cl->connection = 0;
815     co->client = 0;
816 }
817
818 // Close connection and recycle structure
819 static void connection_destroy(struct connection *co)
820 {
821     struct host *h = co->host;
822     cs_close(co->link);
823     iochan_destroy(co->iochan);
824
825     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
826     if (h->connections == co)
827         h->connections = co->next;
828     else
829     {
830         struct connection *pco;
831         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
832             ;
833         if (pco)
834             pco->next = co->next;
835         else
836             abort();
837     }
838     if (co->client)
839     {
840         if (co->client->state != Client_Idle)
841             co->client->state = Client_Disconnected;
842         co->client->connection = 0;
843     }
844     co->next = connection_freelist;
845     connection_freelist = co;
846 }
847
848 // Creates a new connection for client, associated with the host of 
849 // client's database
850 static struct connection *connection_create(struct client *cl)
851 {
852     struct connection *new;
853     COMSTACK link; 
854     int res;
855     void *addr;
856
857     yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
858     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
859     {
860         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
861         exit(1);
862     }
863
864     if (!(addr = cs_straddr(link, cl->database->host->ipport)))
865     {
866         yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
867         return 0;
868     }
869
870     res = cs_connect(link, addr);
871     if (res < 0)
872     {
873         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
874         return 0;
875     }
876
877     if ((new = connection_freelist))
878         connection_freelist = new->next;
879     else
880     {
881         new = xmalloc(sizeof (struct connection));
882         new->ibuf = 0;
883         new->ibufsize = 0;
884     }
885     new->state = Conn_Connecting;
886     new->host = cl->database->host;
887     new->next = new->host->connections;
888     new->host->connections = new;
889     new->client = cl;
890     cl->connection = new;
891     new->link = link;
892
893     new->iochan = iochan_create(cs_fileno(link), handler, 0);
894     iochan_setdata(new->iochan, new);
895     new->iochan->next = channel_list;
896     channel_list = new->iochan;
897     return new;
898 }
899
900 // Close connection and set state to error
901 static void client_fatal(struct client *cl)
902 {
903     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
904     connection_destroy(cl->connection);
905     cl->state = Client_Error;
906 }
907
908 // Ensure that client has a connection associated
909 static int client_prep_connection(struct client *cl)
910 {
911     struct connection *co;
912     struct session *se = cl->session;
913     struct host *host = cl->database->host;
914
915     co = cl->connection;
916
917     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
918
919     if (!co)
920     {
921         // See if someone else has an idle connection
922         // We should look at timestamps here to select the longest-idle connection
923         for (co = host->connections; co; co = co->next)
924             if (co->state == Conn_Open && (!co->client || co->client->session != se))
925                 break;
926         if (co)
927         {
928             connection_release(co);
929             cl->connection = co;
930             co->client = cl;
931         }
932         else
933             co = connection_create(cl);
934     }
935     if (co)
936     {
937         if (co->state == Conn_Connecting)
938             cl->state = Client_Connecting;
939         else if (co->state == Conn_Open)
940         {
941             if (cl->state == Client_Error || cl->state == Client_Disconnected)
942                 cl->state = Client_Idle;
943         }
944         iochan_setflag(co->iochan, EVENT_OUTPUT);
945         return 1;
946     }
947     else
948         return 0;
949 }
950
951 void load_simpletargets(const char *fn)
952 {
953     FILE *f = fopen(fn, "r");
954     char line[256];
955
956     if (!f)
957     {
958         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
959         exit(1);
960     }
961
962     while (fgets(line, 255, f))
963     {
964         char *url, *db;
965         struct host *host;
966         struct database *database;
967
968         if (strncmp(line, "target ", 7))
969             continue;
970         url = line + 7;
971         url[strlen(url) - 1] = '\0';
972         yaz_log(LOG_DEBUG, "Target: %s", url);
973         if ((db = strchr(url, '/')))
974             *(db++) = '\0';
975         else
976             db = "Default";
977
978         for (host = hosts; host; host = host->next)
979             if (!strcmp(url, host->hostport))
980                 break;
981         if (!host)
982         {
983             struct addrinfo *addrinfo, hints;
984             char *port;
985             char ipport[128];
986             unsigned char addrbuf[4];
987             int res;
988
989             host = xmalloc(sizeof(struct host));
990             host->hostport = xstrdup(url);
991             host->connections = 0;
992
993             if ((port = strchr(url, ':')))
994                 *(port++) = '\0';
995             else
996                 port = "210";
997
998             hints.ai_flags = 0;
999             hints.ai_family = PF_INET;
1000             hints.ai_socktype = SOCK_STREAM;
1001             hints.ai_protocol = IPPROTO_TCP;
1002             hints.ai_addrlen = 0;
1003             hints.ai_addr = 0;
1004             hints.ai_canonname = 0;
1005             hints.ai_next = 0;
1006             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1007             // address.
1008             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1009             {
1010                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1011                 continue;
1012             }
1013             assert(addrinfo->ai_family == PF_INET);
1014             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1015             sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
1016                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1017             host->ipport = xstrdup(ipport);
1018             freeaddrinfo(addrinfo);
1019             host->next = hosts;
1020             hosts = host;
1021         }
1022         database = xmalloc(sizeof(struct database));
1023         database->host = host;
1024         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1025         strcpy(database->url, url);
1026         strcat(database->url, "/");
1027         strcat(database->url, db);
1028         strcpy(database->databases[0], db);
1029         *database->databases[1] = '\0';
1030         database->errors = 0;
1031         database->next = databases;
1032         databases = database;
1033
1034     }
1035     fclose(f);
1036 }
1037
1038 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
1039 {
1040     switch (n->kind)
1041     {
1042         case CCL_RPN_AND:
1043         case CCL_RPN_OR:
1044         case CCL_RPN_NOT:
1045         case CCL_RPN_PROX:
1046             pull_terms(nmem, n->u.p[0], termlist, num);
1047             pull_terms(nmem, n->u.p[1], termlist, num);
1048             break;
1049         case CCL_RPN_TERM:
1050             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
1051             break;
1052         default: // NOOP
1053             break;
1054     }
1055 }
1056
1057 // Extract terms from query into null-terminated termlist
1058 static int extract_terms(NMEM nmem, char *query, char **termlist)
1059 {
1060     int error, pos;
1061     struct ccl_rpn_node *n;
1062     int num = 0;
1063
1064     n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
1065     if (!n)
1066         return -1;
1067     pull_terms(nmem, n, termlist, &num);
1068     termlist[num] = 0;
1069     ccl_rpn_delete(n);
1070     return 0;
1071 }
1072
1073 static struct client *client_create(void)
1074 {
1075     struct client *r;
1076     if (client_freelist)
1077     {
1078         r = client_freelist;
1079         client_freelist = client_freelist->next;
1080     }
1081     else
1082         r = xmalloc(sizeof(struct client));
1083     r->database = 0;
1084     r->connection = 0;
1085     r->session = 0;
1086     r->hits = 0;
1087     r->records = 0;
1088     r->setno = 0;
1089     r->requestid = -1;
1090     r->diagnostic = 0;
1091     r->state = Client_Disconnected;
1092     r->next = 0;
1093     return r;
1094 }
1095
1096 void client_destroy(struct client *c)
1097 {
1098     struct session *se = c->session;
1099     if (c == se->clients)
1100         se->clients = c->next;
1101     else
1102     {
1103         struct client *cc;
1104         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1105             ;
1106         if (cc)
1107             cc->next = c->next;
1108     }
1109     if (c->connection)
1110         connection_destroy(c->connection);
1111     c->next = client_freelist;
1112     client_freelist = c;
1113 }
1114
1115 // This should be extended with parameters to control selection criteria
1116 // Associates a set of clients with a session;
1117 int select_targets(struct session *se)
1118 {
1119     struct database *db;
1120     int c = 0;
1121
1122     while (se->clients)
1123         client_destroy(se->clients);
1124     for (db = databases; db; db = db->next)
1125     {
1126         struct client *cl = client_create();
1127         cl->database = db;
1128         cl->session = se;
1129         cl->next = se->clients;
1130         se->clients = cl;
1131         c++;
1132     }
1133     return c;
1134 }
1135
1136 char *search(struct session *se, char *query)
1137 {
1138     int live_channels = 0;
1139     struct client *cl;
1140
1141     yaz_log(YLOG_DEBUG, "Search");
1142
1143     strcpy(se->query, query);
1144     se->requestid++;
1145     nmem_reset(se->nmem);
1146     for (cl = se->clients; cl; cl = cl->next)
1147     {
1148         cl->hits = -1;
1149         cl->records = 0;
1150         cl->diagnostic = 0;
1151
1152         if (client_prep_connection(cl))
1153             live_channels++;
1154     }
1155     if (live_channels)
1156     {
1157         char *p[512];
1158         int maxrecs = live_channels * global_parameters.toget;
1159         se->termlist = termlist_create(se->nmem, maxrecs, 15);
1160         se->reclist = reclist_create(se->nmem, maxrecs);
1161         extract_terms(se->nmem, query, p);
1162         se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
1163         se->total_records = se->total_hits = 0;
1164     }
1165     else
1166         return "NOTARGETS";
1167
1168     return 0;
1169 }
1170
1171 struct session *new_session() 
1172 {
1173     struct session *session = xmalloc(sizeof(*session));
1174
1175     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1176     
1177     session->total_hits = 0;
1178     session->total_records = 0;
1179     session->termlist = 0;
1180     session->reclist = 0;
1181     session->requestid = -1;
1182     session->clients = 0;
1183     session->query[0] = '\0';
1184     session->nmem = nmem_create();
1185     session->wrbuf = wrbuf_alloc();
1186
1187     select_targets(session);
1188
1189     return session;
1190 }
1191
1192 void session_destroy(struct session *s)
1193 {
1194     // FIXME do some shit here!!!!
1195 }
1196
1197 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1198 {
1199     static struct hitsbytarget res[1000]; // FIXME MM
1200     struct client *cl;
1201
1202     *count = 0;
1203     for (cl = se->clients; cl; cl = cl->next)
1204     {
1205         strcpy(res[*count].id, cl->database->host->hostport);
1206         res[*count].hits = cl->hits;
1207         res[*count].records = cl->records;
1208         res[*count].diagnostic = cl->diagnostic;
1209         res[*count].state = client_states[cl->state];
1210         res[*count].connected  = cl->connection ? 1 : 0;
1211         (*count)++;
1212     }
1213
1214     return res;
1215 }
1216
1217 struct termlist_score **termlist(struct session *s, int *num)
1218 {
1219     return termlist_highscore(s->termlist, num);
1220 }
1221
1222 struct record **show(struct session *s, int start, int *num, int *total, int *sumhits)
1223 {
1224     struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
1225     int i;
1226
1227     relevance_prepare_read(s->relevance, s->reclist);
1228
1229     *total = s->reclist->num_records;
1230     *sumhits = s->total_hits;
1231
1232     for (i = 0; i < start; i++)
1233         if (!reclist_read_record(s->reclist))
1234         {
1235             *num = 0;
1236             return 0;
1237         }
1238
1239     for (i = 0; i < *num; i++)
1240     {
1241         struct record *r = reclist_read_record(s->reclist);
1242         if (!r)
1243         {
1244             *num = i;
1245             break;
1246         }
1247         recs[i] = r;
1248     }
1249     return recs;
1250 }
1251
1252 void statistics(struct session *se, struct statistics *stat)
1253 {
1254     struct client *cl;
1255     int count = 0;
1256
1257     bzero(stat, sizeof(*stat));
1258     for (cl = se->clients; cl; cl = cl->next)
1259     {
1260         if (!cl->connection)
1261             stat->num_no_connection++;
1262         switch (cl->state)
1263         {
1264             case Client_Connecting: stat->num_connecting++; break;
1265             case Client_Initializing: stat->num_initializing++; break;
1266             case Client_Searching: stat->num_searching++; break;
1267             case Client_Presenting: stat->num_presenting++; break;
1268             case Client_Idle: stat->num_idle++; break;
1269             case Client_Failed: stat->num_failed++; break;
1270             case Client_Error: stat->num_error++; break;
1271             default: break;
1272         }
1273         count++;
1274     }
1275     stat->num_hits = se->total_hits;
1276     stat->num_records = se->total_records;
1277
1278     stat->num_clients = count;
1279 }
1280
1281 static CCL_bibset load_cclfile(const char *fn)
1282 {
1283     CCL_bibset res = ccl_qual_mk();
1284     if (ccl_qual_fname(res, fn) < 0)
1285     {
1286         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
1287         exit(1);
1288     }
1289     return res;
1290 }
1291
1292 int main(int argc, char **argv)
1293 {
1294     int ret;
1295     char *arg;
1296
1297     if (signal(SIGPIPE, SIG_IGN) < 0)
1298         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1299
1300     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
1301
1302     while ((ret = options("c:h:p:C:s:", argv, argc, &arg)) != -2)
1303     {
1304         switch (ret) {
1305             case 0:
1306                 break;
1307             case 'c':
1308                 command_init(atoi(arg));
1309                 break;
1310             case 'C':
1311                 global_parameters.ccl_filter = load_cclfile(arg);
1312                 break;
1313             case 'h':
1314                 http_init(atoi(arg));
1315                 break;
1316             case 'p':
1317                 http_set_proxyaddr(arg);
1318                 break;
1319             case 's':
1320                 load_simpletargets(arg);
1321                 break;
1322             default:
1323                 fprintf(stderr, "Usage: pazpar2 -d comport");
1324                 exit(1);
1325         }
1326     }
1327
1328     global_parameters.ccl_filter = load_cclfile("default.bib");
1329     global_parameters.yaz_marc = yaz_marc_create();
1330     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1331     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1332     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1333
1334     event_loop(&channel_list);
1335
1336     return 0;
1337 }
1338
1339 /*
1340  * Local variables:
1341  * c-basic-offset: 4
1342  * indent-tabs-mode: nil
1343  * End:
1344  * vim: shiftwidth=4 tabstop=8 expandtab
1345  */