e1c992cb992ef175e0914c089b205029f07c5288
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.73 2007-04-11 18:42:25 quinn Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <netdb.h>
29 #include <signal.h>
30 #include <ctype.h>
31 #include <assert.h>
32
33 #include <yaz/marcdisp.h>
34 #include <yaz/comstack.h>
35 #include <yaz/tcpip.h>
36 #include <yaz/proto.h>
37 #include <yaz/readconf.h>
38 #include <yaz/pquery.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/yaz-util.h>
41 #include <yaz/nmem.h>
42
43 #if HAVE_CONFIG_H
44 #include "cconfig.h"
45 #endif
46
47 #define USE_TIMING 0
48 #if USE_TIMING
49 #include <yaz/timing.h>
50 #endif
51
52 #include <netinet/in.h>
53
54 #include "pazpar2.h"
55 #include "eventl.h"
56 #include "http.h"
57 #include "termlists.h"
58 #include "reclists.h"
59 #include "relevance.h"
60 #include "config.h"
61 #include "database.h"
62 #include "settings.h"
63
64 #define MAX_CHUNK 15
65
66 static void client_fatal(struct client *cl);
67 static void connection_destroy(struct connection *co);
68 static int client_prep_connection(struct client *cl);
69 static void ingest_records(struct client *cl, Z_Records *r);
70 void session_alert_watch(struct session *s, int what);
71
72 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
73
74 static struct connection *connection_freelist = 0;
75 static struct client *client_freelist = 0;
76
77 static char *client_states[] = {
78     "Client_Connecting",
79     "Client_Connected",
80     "Client_Idle",
81     "Client_Initializing",
82     "Client_Searching",
83     "Client_Presenting",
84     "Client_Error",
85     "Client_Failed",
86     "Client_Disconnected",
87     "Client_Stopped"
88 };
89
90 // Note: Some things in this structure will eventually move to configuration
91 struct parameters global_parameters = 
92 {
93     "",
94     "",
95     "",
96     "",
97     0,
98     0,
99     30,
100     "81",
101     "Index Data PazPar2 (MasterKey)",
102     VERSION,
103     600, // 10 minutes
104     60,
105     100,
106     MAX_CHUNK,
107     0,
108     0
109 };
110
111 static int send_apdu(struct client *c, Z_APDU *a)
112 {
113     struct connection *co = c->connection;
114     char *buf;
115     int len, r;
116
117     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
118     {
119         odr_perror(global_parameters.odr_out, "Encoding APDU");
120         abort();
121     }
122     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
123     r = cs_put(co->link, buf, len);
124     if (r < 0)
125     {
126         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
127         return -1;
128     }
129     else if (r == 1)
130     {
131         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
132         exit(1);
133     }
134     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
135     co->state = Conn_Waiting;
136     return 0;
137 }
138
139 // Set authentication token in init if one is set for the client
140 // TODO: Extend this to handle other schemes than open (should be simple)
141 static void init_authentication(struct client *cl, Z_InitRequest *req)
142 {
143     struct session_database *sdb = cl->database;
144     char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
145
146     if (auth)
147     {
148         Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
149                 sizeof(*idAuth));
150         idAuth->which = Z_IdAuthentication_open;
151         idAuth->u.open = auth;
152         req->idAuthentication = idAuth;
153     }
154 }
155
156 static void send_init(IOCHAN i)
157 {
158
159     struct connection *co = iochan_getdata(i);
160     struct client *cl = co->client;
161     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
162
163     a->u.initRequest->implementationId = global_parameters.implementationId;
164     a->u.initRequest->implementationName = global_parameters.implementationName;
165     a->u.initRequest->implementationVersion =
166         global_parameters.implementationVersion;
167     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
168     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
169     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
170
171     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
172     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
173     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
174
175     init_authentication(cl, a->u.initRequest);
176
177     /* add virtual host if tunneling through Z39.50 proxy */
178     
179     if (0 < strlen(global_parameters.zproxy_override) 
180         && 0 < strlen(cl->database->database->url))
181         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo, 
182                                  global_parameters.odr_out, VAL_PROXY,
183                                  1, cl->database->database->url);
184
185     if (send_apdu(cl, a) >= 0)
186     {
187         iochan_setflags(i, EVENT_INPUT);
188         cl->state = Client_Initializing;
189     }
190     else
191         cl->state = Client_Error;
192     odr_reset(global_parameters.odr_out);
193 }
194
195 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
196 {
197     switch (n->kind)
198     {
199         case CCL_RPN_AND:
200         case CCL_RPN_OR:
201         case CCL_RPN_NOT:
202         case CCL_RPN_PROX:
203             pull_terms(nmem, n->u.p[0], termlist, num);
204             pull_terms(nmem, n->u.p[1], termlist, num);
205             break;
206         case CCL_RPN_TERM:
207             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
208             break;
209         default: // NOOP
210             break;
211     }
212 }
213
214 // Extract terms from query into null-terminated termlist
215 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
216 {
217     int num = 0;
218
219     pull_terms(nmem, query, termlist, &num);
220     termlist[num] = 0;
221 }
222
223 static void send_search(IOCHAN i)
224 {
225     struct connection *co = iochan_getdata(i);
226     struct client *cl = co->client; 
227     struct session *se = cl->session;
228     struct session_database *sdb = cl->database;
229     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
230     int ndb, cerror, cpos;
231     char **databaselist;
232     Z_Query *zquery;
233     struct ccl_rpn_node *cn;
234     int ssub = 0, lslb = 100000, mspn = 10;
235     char *recsyn;
236     char *piggyback;
237
238     yaz_log(YLOG_DEBUG, "Sending search");
239
240     cn = ccl_find_str(sdb->database->ccl_map, se->query, &cerror, &cpos);
241     if (!cn)
242         return;
243
244     if (!se->relevance)
245     {
246         // Initialize relevance structure with query terms
247         char *p[512];
248         extract_terms(se->nmem, cn, p);
249         se->relevance = relevance_create(se->nmem, (const char **) p,
250                 se->expected_maxrecs);
251     }
252
253     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
254             sizeof(Z_Query));
255     zquery->which = Z_Query_type_1;
256     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
257     ccl_rpn_delete(cn);
258
259     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
260         ;
261     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
262     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
263         databaselist[ndb] = sdb->database->databases[ndb];
264
265     if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
266     {
267         if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
268             a->u.searchRequest->preferredRecordSyntax =
269                     yaz_str_to_z3950oid(global_parameters.odr_out,
270                     CLASS_RECSYN, recsyn);
271         a->u.searchRequest->smallSetUpperBound = &ssub;
272         a->u.searchRequest->largeSetLowerBound = &lslb;
273         a->u.searchRequest->mediumSetPresentNumber = &mspn;
274     }
275     a->u.searchRequest->resultSetName = "Default";
276     a->u.searchRequest->databaseNames = databaselist;
277     a->u.searchRequest->num_databaseNames = ndb;
278
279     if (send_apdu(cl, a) >= 0)
280     {
281         iochan_setflags(i, EVENT_INPUT);
282         cl->state = Client_Searching;
283         cl->requestid = se->requestid;
284     }
285     else
286         cl->state = Client_Error;
287
288     odr_reset(global_parameters.odr_out);
289 }
290
291 static void send_present(IOCHAN i)
292 {
293     struct connection *co = iochan_getdata(i);
294     struct client *cl = co->client; 
295     struct session_database *sdb = cl->database;
296     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
297     int toget;
298     int start = cl->records + 1;
299     char *recsyn;
300
301     toget = global_parameters.chunk;
302     if (toget > global_parameters.toget - cl->records)
303         toget = global_parameters.toget - cl->records;
304     if (toget > cl->hits - cl->records)
305         toget = cl->hits - cl->records;
306
307     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
308
309     a->u.presentRequest->resultSetStartPoint = &start;
310     a->u.presentRequest->numberOfRecordsRequested = &toget;
311
312     a->u.presentRequest->resultSetId = "Default";
313
314     if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
315         a->u.presentRequest->preferredRecordSyntax =
316                 yaz_str_to_z3950oid(global_parameters.odr_out,
317                 CLASS_RECSYN, recsyn);
318
319     if (send_apdu(cl, a) >= 0)
320     {
321         iochan_setflags(i, EVENT_INPUT);
322         cl->state = Client_Presenting;
323     }
324     else
325         cl->state = Client_Error;
326     odr_reset(global_parameters.odr_out);
327 }
328
329 static void do_initResponse(IOCHAN i, Z_APDU *a)
330 {
331     struct connection *co = iochan_getdata(i);
332     struct client *cl = co->client;
333     Z_InitResponse *r = a->u.initResponse;
334
335     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
336
337     if (*r->result)
338     {
339         cl->state = Client_Idle;
340     }
341     else
342         cl->state = Client_Failed; // FIXME need to do something to the connection
343 }
344
345 static void do_searchResponse(IOCHAN i, Z_APDU *a)
346 {
347     struct connection *co = iochan_getdata(i);
348     struct client *cl = co->client;
349     struct session *se = cl->session;
350     Z_SearchResponse *r = a->u.searchResponse;
351
352     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
353             cl->database->database->url, *r->searchStatus);
354
355     if (*r->searchStatus)
356     {
357         cl->hits = *r->resultCount;
358         se->total_hits += cl->hits;
359         if (r->presentStatus && !*r->presentStatus && r->records)
360         {
361             yaz_log(YLOG_DEBUG, "Records in search response %s", 
362                     cl->database->database->url);
363             ingest_records(cl, r->records);
364         }
365         cl->state = Client_Idle;
366     }
367     else
368     {          /*"FAILED"*/
369         cl->hits = 0;
370         cl->state = Client_Error;
371         if (r->records) {
372             Z_Records *recs = r->records;
373             if (recs->which == Z_Records_NSD)
374             {
375                 yaz_log(YLOG_WARN, 
376                         "Search response: Non-surrogate diagnostic %s",
377                         cl->database->database->url);
378                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
379                 cl->state = Client_Error;
380             }
381         }
382     }
383 }
384
385 static void do_closeResponse(IOCHAN i, Z_APDU *a)
386 {
387     struct connection *co = iochan_getdata(i);
388     struct client *cl = co->client;
389     /* Z_Close *r = a->u.close; */
390
391     yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
392
393     cl->state = Client_Failed;
394     connection_destroy(co);
395 }
396
397
398 char *normalize_mergekey(char *buf, int skiparticle)
399 {
400     char *p = buf, *pout = buf;
401
402     if (skiparticle)
403     {
404         char firstword[64];
405         char articles[] = "the den der die des an a "; // must end in space
406
407         while (*p && !isalnum(*p))
408             p++;
409         pout = firstword;
410         while (*p && *p != ' ' && pout - firstword < 62)
411             *(pout++) = tolower(*(p++));
412         *(pout++) = ' ';
413         *(pout++) = '\0';
414         if (!strstr(articles, firstword))
415             p = buf;
416         pout = buf;
417     }
418
419     while (*p)
420     {
421         while (*p && !isalnum(*p))
422             p++;
423         while (isalnum(*p))
424             *(pout++) = tolower(*(p++));
425         if (*p)
426             *(pout++) = ' ';
427         while (*p && !isalnum(*p))
428             p++;
429     }
430     if (buf != pout)
431         do {
432             *(pout--) = '\0';
433         }
434         while (pout > buf && *pout == ' ');
435
436     return buf;
437 }
438
439 static void add_facet(struct session *s, const char *type, const char *value)
440 {
441     int i;
442
443     if (!*value)
444         return;
445     for (i = 0; i < s->num_termlists; i++)
446         if (!strcmp(s->termlists[i].name, type))
447             break;
448     if (i == s->num_termlists)
449     {
450         if (i == SESSION_MAX_TERMLISTS)
451         {
452             yaz_log(YLOG_FATAL, "Too many termlists");
453             exit(1);
454         }
455         s->termlists[i].name = nmem_strdup(s->nmem, type);
456         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
457         s->num_termlists = i + 1;
458     }
459     termlist_insert(s->termlists[i].termlist, value);
460 }
461
462 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
463 {
464     struct database_retrievalmap *m;
465     struct database *db = cl->database->database;
466     xmlNode *res;
467     xmlDoc *rdoc;
468
469     // First normalize to XML
470     if (db->yaz_marc)
471     {
472         char *buf;
473         int len;
474         if (rec->which != Z_External_octet)
475         {
476             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
477                     cl->database->database->url);
478             return 0;
479         }
480         buf = (char*) rec->u.octet_aligned->buf;
481         len = rec->u.octet_aligned->len;
482         if (yaz_marc_read_iso2709(db->yaz_marc, buf, len) < 0)
483         {
484             yaz_log(YLOG_WARN, "Failed to decode MARC %s",
485                     cl->database->database->url);
486             return 0;
487         }
488         if (yaz_marc_write_xml(db->yaz_marc, &res,
489                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
490         {
491             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
492                     cl->database->database->url);
493             return 0;
494         }
495         rdoc = xmlNewDoc((xmlChar *) "1.0");
496         xmlDocSetRootElement(rdoc, res);
497     }
498     else
499     {
500         yaz_log(YLOG_FATAL, "Unknown native_syntax in normalize_record");
501         exit(1);
502     }
503
504     if (global_parameters.dump_records)
505     {
506         fprintf(stderr, "Input Record (normalized):\n----------------\n");
507 #if LIBXML_VERSION >= 20600
508         xmlDocFormatDump(stderr, rdoc, 1);
509 #else
510         xmlDocDump(stderr, rdoc);
511 #endif
512     }
513
514     for (m = db->map; m; m = m->next)
515     {
516         xmlDoc *new;
517         if (!(new = xsltApplyStylesheet(m->stylesheet, rdoc, 0)))
518         {
519             yaz_log(YLOG_WARN, "XSLT transformation failed");
520             return 0;
521         }
522         xmlFreeDoc(rdoc);
523         rdoc = new;
524     }
525     if (global_parameters.dump_records)
526     {
527         fprintf(stderr, "Record:\n----------------\n");
528 #if LIBXML_VERSION >= 20600
529         xmlDocFormatDump(stderr, rdoc, 1);
530 #else
531         xmlDocDump(stderr, rdoc);
532 #endif
533     }
534     return rdoc;
535 }
536
537 // Extract what appears to be years from buf, storing highest and
538 // lowest values.
539 static int extract_years(const char *buf, int *first, int *last)
540 {
541     *first = -1;
542     *last = -1;
543     while (*buf)
544     {
545         const char *e;
546         int len;
547
548         while (*buf && !isdigit(*buf))
549             buf++;
550         len = 0;
551         for (e = buf; *e && isdigit(*e); e++)
552             len++;
553         if (len == 4)
554         {
555             int value = atoi(buf);
556             if (*first < 0 || value < *first)
557                 *first = value;
558             if (*last < 0 || value > *last)
559                 *last = value;
560         }
561         buf = e;
562     }
563     return *first;
564 }
565
566 static struct record *ingest_record(struct client *cl, Z_External *rec)
567 {
568     xmlDoc *xdoc = normalize_record(cl, rec);
569     xmlNode *root, *n;
570     struct record *res;
571     struct record_cluster *cluster;
572     struct session *se = cl->session;
573     xmlChar *mergekey, *mergekey_norm;
574     xmlChar *type = 0;
575     xmlChar *value = 0;
576     struct conf_service *service = global_parameters.server->service;
577
578     if (!xdoc)
579         return 0;
580
581     root = xmlDocGetRootElement(xdoc);
582     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
583     {
584         yaz_log(YLOG_WARN, "No mergekey found in record");
585         xmlFreeDoc(xdoc);
586         return 0;
587     }
588
589     res = nmem_malloc(se->nmem, sizeof(struct record));
590     res->next = 0;
591     res->client = cl;
592     res->metadata = nmem_malloc(se->nmem,
593             sizeof(struct record_metadata*) * service->num_metadata);
594     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
595
596     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
597     xmlFree(mergekey);
598     normalize_mergekey((char *) mergekey_norm, 0);
599
600     cluster = reclist_insert(se->reclist, res, (char *) mergekey_norm, 
601                              &se->total_merged);
602     if (global_parameters.dump_records)
603         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
604                 cl->database->database->url, cl->records);
605     if (!cluster)
606     {
607         /* no room for record */
608         xmlFreeDoc(xdoc);
609         return 0;
610     }
611     relevance_newrec(se->relevance, cluster);
612
613     for (n = root->children; n; n = n->next)
614     {
615         if (type)
616             xmlFree(type);
617         if (value)
618             xmlFree(value);
619         type = value = 0;
620
621         if (n->type != XML_ELEMENT_NODE)
622             continue;
623         if (!strcmp((const char *) n->name, "metadata"))
624         {
625             struct conf_metadata *md = 0;
626             struct conf_sortkey *sk = 0;
627             struct record_metadata **wheretoput, *newm;
628             int imeta;
629             int first, last;
630
631             type = xmlGetProp(n, (xmlChar *) "type");
632             value = xmlNodeListGetString(xdoc, n->children, 0);
633
634             if (!type || !value)
635                 continue;
636
637             // First, find out what field we're looking at
638             for (imeta = 0; imeta < service->num_metadata; imeta++)
639                 if (!strcmp((const char *) type, service->metadata[imeta].name))
640                 {
641                     md = &service->metadata[imeta];
642                     if (md->sortkey_offset >= 0)
643                         sk = &service->sortkeys[md->sortkey_offset];
644                     break;
645                 }
646             if (!md)
647             {
648                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
649                 continue;
650             }
651
652             // Find out where we are putting it
653             if (md->merge == Metadata_merge_no)
654                 wheretoput = &res->metadata[imeta];
655             else
656                 wheretoput = &cluster->metadata[imeta];
657             
658             // Put it there
659             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
660             newm->next = 0;
661             if (md->type == Metadata_type_generic)
662             {
663                 char *p, *pe;
664                 for (p = (char *) value; *p && isspace(*p); p++)
665                     ;
666                 for (pe = p + strlen(p) - 1;
667                         pe > p && strchr(" ,/.:([", *pe); pe--)
668                     *pe = '\0';
669                 newm->data.text = nmem_strdup(se->nmem, p);
670
671             }
672             else if (md->type == Metadata_type_year)
673             {
674                 if (extract_years((char *) value, &first, &last) < 0)
675                     continue;
676             }
677             else
678             {
679                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
680                 continue;
681             }
682             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
683             {
684                 yaz_log(YLOG_WARN, "Only range merging supported for years");
685                 continue;
686             }
687             if (md->merge == Metadata_merge_unique)
688             {
689                 struct record_metadata *mnode;
690                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
691                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
692                         break;
693                 if (!mnode)
694                 {
695                     newm->next = *wheretoput;
696                     *wheretoput = newm;
697                 }
698             }
699             else if (md->merge == Metadata_merge_longest)
700             {
701                 if (!*wheretoput ||
702                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
703                 {
704                     *wheretoput = newm;
705                     if (sk)
706                     {
707                         char *s = nmem_strdup(se->nmem, newm->data.text);
708                         if (!cluster->sortkeys[md->sortkey_offset])
709                             cluster->sortkeys[md->sortkey_offset] = 
710                                 nmem_malloc(se->nmem, sizeof(union data_types));
711                         normalize_mergekey(s,
712                                 (sk->type == Metadata_sortkey_skiparticle));
713                         cluster->sortkeys[md->sortkey_offset]->text = s;
714                     }
715                 }
716             }
717             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
718             {
719                 newm->next = *wheretoput;
720                 *wheretoput = newm;
721             }
722             else if (md->merge == Metadata_merge_range)
723             {
724                 assert(md->type == Metadata_type_year);
725                 if (!*wheretoput)
726                 {
727                     *wheretoput = newm;
728                     (*wheretoput)->data.number.min = first;
729                     (*wheretoput)->data.number.max = last;
730                     if (sk)
731                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
732                 }
733                 else
734                 {
735                     if (first < (*wheretoput)->data.number.min)
736                         (*wheretoput)->data.number.min = first;
737                     if (last > (*wheretoput)->data.number.max)
738                         (*wheretoput)->data.number.max = last;
739                 }
740 #ifdef GAGA
741                 if (sk)
742                 {
743                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
744                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
745                 }
746 #endif
747             }
748             else
749                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
750
751             if (md->rank)
752                 relevance_countwords(se->relevance, cluster, 
753                                      (char *) value, md->rank);
754             if (md->termlist)
755             {
756                 if (md->type == Metadata_type_year)
757                 {
758                     char year[64];
759                     sprintf(year, "%d", last);
760                     add_facet(se, (char *) type, year);
761                     if (first != last)
762                     {
763                         sprintf(year, "%d", first);
764                         add_facet(se, (char *) type, year);
765                     }
766                 }
767                 else
768                     add_facet(se, (char *) type, (char *) value);
769             }
770             xmlFree(type);
771             xmlFree(value);
772             type = value = 0;
773         }
774         else
775             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
776     }
777     if (type)
778         xmlFree(type);
779     if (value)
780         xmlFree(value);
781
782     xmlFreeDoc(xdoc);
783
784     relevance_donerecord(se->relevance, cluster);
785     se->total_records++;
786
787     return res;
788 }
789
790 // Retrieve first defined value for 'name' for given database.
791 // Will be extended to take into account user associated with session
792 char *session_setting_oneval(struct session_database *db, int offset)
793 {
794     if (!db->settings[offset])
795         return "";
796     return db->settings[offset]->value;
797 }
798
799 static void ingest_records(struct client *cl, Z_Records *r)
800 {
801 #if USE_TIMING
802     yaz_timing_t t = yaz_timing_create();
803 #endif
804     struct record *rec;
805     struct session *s = cl->session;
806     Z_NamePlusRecordList *rlist;
807     int i;
808
809     if (r->which != Z_Records_DBOSD)
810         return;
811     rlist = r->u.databaseOrSurDiagnostics;
812     for (i = 0; i < rlist->num_records; i++)
813     {
814         Z_NamePlusRecord *npr = rlist->records[i];
815
816         cl->records++;
817         if (npr->which != Z_NamePlusRecord_databaseRecord)
818         {
819             yaz_log(YLOG_WARN, 
820                     "Unexpected record type, probably diagnostic %s",
821                     cl->database->database->url);
822             continue;
823         }
824
825         rec = ingest_record(cl, npr->u.databaseRecord);
826         if (!rec)
827             continue;
828     }
829     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
830         session_alert_watch(s, SESSION_WATCH_RECORDS);
831
832 #if USE_TIMING
833     yaz_timing_stop(t);
834     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
835             yaz_timing_get_real(t), yaz_timing_get_user(t),
836             yaz_timing_get_sys(t));
837     yaz_timing_destroy(&t);
838 #endif
839 }
840
841 static void do_presentResponse(IOCHAN i, Z_APDU *a)
842 {
843     struct connection *co = iochan_getdata(i);
844     struct client *cl = co->client;
845     Z_PresentResponse *r = a->u.presentResponse;
846
847     if (r->records) {
848         Z_Records *recs = r->records;
849         if (recs->which == Z_Records_NSD)
850         {
851             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
852                     cl->database->database->url);
853             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
854             cl->state = Client_Error;
855         }
856     }
857
858     if (!*r->presentStatus && cl->state != Client_Error)
859     {
860         yaz_log(YLOG_DEBUG, "Good Present response %s",
861                 cl->database->database->url);
862         ingest_records(cl, r->records);
863         cl->state = Client_Idle;
864     }
865     else if (*r->presentStatus) 
866     {
867         yaz_log(YLOG_WARN, "Bad Present response %s",
868                 cl->database->database->url);
869         cl->state = Client_Error;
870     }
871 }
872
873 static void handler(IOCHAN i, int event)
874 {
875     struct connection *co = iochan_getdata(i);
876     struct client *cl = co->client;
877     struct session *se = 0;
878
879     if (cl)
880         se = cl->session;
881     else
882     {
883         yaz_log(YLOG_WARN, "Destroying orphan connection");
884         connection_destroy(co);
885         return;
886     }
887
888     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
889     {
890         int errcode;
891         socklen_t errlen = sizeof(errcode);
892
893         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
894             &errlen) < 0 || errcode != 0)
895         {
896             client_fatal(cl);
897             return;
898         }
899         else
900         {
901             yaz_log(YLOG_DEBUG, "Connect OK");
902             co->state = Conn_Open;
903             if (cl)
904                 cl->state = Client_Connected;
905         }
906     }
907
908     else if (event & EVENT_INPUT)
909     {
910         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
911
912         if (len < 0)
913         {
914             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
915                     cl->database->database->url);
916             connection_destroy(co);
917             return;
918         }
919         else if (len == 0)
920         {
921             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
922             connection_destroy(co);
923             return;
924         }
925         else if (len > 1) // We discard input if we have no connection
926         {
927             co->state = Conn_Open;
928
929             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
930             {
931                 Z_APDU *a;
932
933                 odr_reset(global_parameters.odr_in);
934                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
935                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
936                 {
937                     client_fatal(cl);
938                     return;
939                 }
940                 switch (a->which)
941                 {
942                     case Z_APDU_initResponse:
943                         do_initResponse(i, a);
944                         break;
945                     case Z_APDU_searchResponse:
946                         do_searchResponse(i, a);
947                         break;
948                     case Z_APDU_presentResponse:
949                         do_presentResponse(i, a);
950                         break;
951                     case Z_APDU_close:
952                         do_closeResponse(i, a);
953                         break;
954                     default:
955                         yaz_log(YLOG_WARN, 
956                                 "Unexpected Z39.50 response from %s",  
957                                 cl->database->database->url);
958                         client_fatal(cl);
959                         return;
960                 }
961                 // We aren't expecting staggered output from target
962                 // if (cs_more(t->link))
963                 //    iochan_setevent(i, EVENT_INPUT);
964             }
965             else  // we throw away response and go to idle mode
966             {
967                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
968                 cl->state = Client_Idle;
969             }
970         }
971         /* if len==1 we do nothing but wait for more input */
972     }
973
974     if (cl->state == Client_Connected) {
975         send_init(i);
976     }
977
978     if (cl->state == Client_Idle)
979     {
980         if (cl->requestid != se->requestid && *se->query) {
981             send_search(i);
982         }
983         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
984             cl->records < cl->hits) {
985             send_present(i);
986         }
987     }
988 }
989
990 // Disassociate connection from client
991 static void connection_release(struct connection *co)
992 {
993     struct client *cl = co->client;
994
995     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
996     if (!cl)
997         return;
998     cl->connection = 0;
999     co->client = 0;
1000 }
1001
1002 // Close connection and recycle structure
1003 static void connection_destroy(struct connection *co)
1004 {
1005     struct host *h = co->host;
1006     cs_close(co->link);
1007     iochan_destroy(co->iochan);
1008
1009     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
1010     if (h->connections == co)
1011         h->connections = co->next;
1012     else
1013     {
1014         struct connection *pco;
1015         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
1016             ;
1017         if (pco)
1018             pco->next = co->next;
1019         else
1020             abort();
1021     }
1022     if (co->client)
1023     {
1024         if (co->client->state != Client_Idle)
1025             co->client->state = Client_Disconnected;
1026         co->client->connection = 0;
1027     }
1028     co->next = connection_freelist;
1029     connection_freelist = co;
1030 }
1031
1032 // Creates a new connection for client, associated with the host of 
1033 // client's database
1034 static struct connection *connection_create(struct client *cl)
1035 {
1036     struct connection *new;
1037     COMSTACK link; 
1038     int res;
1039     void *addr;
1040
1041
1042     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1043         {
1044             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1045             exit(1);
1046         }
1047     
1048     if (0 == strlen(global_parameters.zproxy_override)){
1049         /* no Z39.50 proxy needed - direct connect */
1050         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
1051         
1052         if (!(addr = cs_straddr(link, cl->database->database->host->ipport)))
1053             {
1054                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1055                         "Lookup of IP address %s failed", 
1056                         cl->database->database->host->ipport);
1057                 return 0;
1058             }
1059     
1060     } else {
1061         /* Z39.50 proxy connect */
1062         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1063                 cl->database->database->url, global_parameters.zproxy_override);
1064
1065         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1066             {
1067                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1068                         "Lookup of IP address %s failed", 
1069                         global_parameters.zproxy_override);
1070                 return 0;
1071             }
1072     }
1073
1074     res = cs_connect(link, addr);
1075     if (res < 0)
1076     {
1077         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
1078         return 0;
1079     }
1080
1081     if ((new = connection_freelist))
1082         connection_freelist = new->next;
1083     else
1084     {
1085         new = xmalloc(sizeof (struct connection));
1086         new->ibuf = 0;
1087         new->ibufsize = 0;
1088     }
1089     new->state = Conn_Connecting;
1090     new->host = cl->database->database->host;
1091     new->next = new->host->connections;
1092     new->host->connections = new;
1093     new->client = cl;
1094     cl->connection = new;
1095     new->link = link;
1096
1097     new->iochan = iochan_create(cs_fileno(link), handler, 0);
1098     iochan_setdata(new->iochan, new);
1099     new->iochan->next = channel_list;
1100     channel_list = new->iochan;
1101     return new;
1102 }
1103
1104 // Close connection and set state to error
1105 static void client_fatal(struct client *cl)
1106 {
1107     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
1108     connection_destroy(cl->connection);
1109     cl->state = Client_Error;
1110 }
1111
1112 // Ensure that client has a connection associated
1113 static int client_prep_connection(struct client *cl)
1114 {
1115     struct connection *co;
1116     struct session *se = cl->session;
1117     struct host *host = cl->database->database->host;
1118
1119     co = cl->connection;
1120
1121     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
1122
1123     if (!co)
1124     {
1125         // See if someone else has an idle connection
1126         // We should look at timestamps here to select the longest-idle connection
1127         for (co = host->connections; co; co = co->next)
1128             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1129                 break;
1130         if (co)
1131         {
1132             connection_release(co);
1133             cl->connection = co;
1134             co->client = cl;
1135         }
1136         else
1137             co = connection_create(cl);
1138     }
1139     if (co)
1140     {
1141         if (co->state == Conn_Connecting)
1142         {
1143             cl->state = Client_Connecting;
1144             iochan_setflag(co->iochan, EVENT_OUTPUT);
1145         }
1146         else if (co->state == Conn_Open)
1147         {
1148             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1149                 cl->state = Client_Idle;
1150             iochan_setflag(co->iochan, EVENT_OUTPUT);
1151         }
1152         return 1;
1153     }
1154     else
1155         return 0;
1156 }
1157
1158 static struct client *client_create(void)
1159 {
1160     struct client *r;
1161     if (client_freelist)
1162     {
1163         r = client_freelist;
1164         client_freelist = client_freelist->next;
1165     }
1166     else
1167         r = xmalloc(sizeof(struct client));
1168     r->database = 0;
1169     r->connection = 0;
1170     r->session = 0;
1171     r->hits = 0;
1172     r->records = 0;
1173     r->setno = 0;
1174     r->requestid = -1;
1175     r->diagnostic = 0;
1176     r->state = Client_Disconnected;
1177     r->next = 0;
1178     return r;
1179 }
1180
1181 void client_destroy(struct client *c)
1182 {
1183     struct session *se = c->session;
1184     if (c == se->clients)
1185         se->clients = c->next;
1186     else
1187     {
1188         struct client *cc;
1189         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1190             ;
1191         if (cc)
1192             cc->next = c->next;
1193     }
1194     if (c->connection)
1195         connection_release(c->connection);
1196     c->next = client_freelist;
1197     client_freelist = c;
1198 }
1199
1200 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1201 {
1202     s->watchlist[what].fun = fun;
1203     s->watchlist[what].data = data;
1204 }
1205
1206 void session_alert_watch(struct session *s, int what)
1207 {
1208     if (!s->watchlist[what].fun)
1209         return;
1210     (*s->watchlist[what].fun)(s->watchlist[what].data);
1211     s->watchlist[what].fun = 0;
1212     s->watchlist[what].data = 0;
1213 }
1214
1215 //callback for grep_databases
1216 static void select_targets_callback(void *context, struct session_database *db)
1217 {
1218     struct session *se = (struct session*) context;
1219     struct client *cl = client_create();
1220     cl->database = db;
1221     cl->session = se;
1222     cl->next = se->clients;
1223     se->clients = cl;
1224 }
1225
1226 // Associates a set of clients with a session;
1227 // Note: Session-databases represent databases with per-session setting overrides
1228 int select_targets(struct session *se, struct database_criterion *crit)
1229 {
1230     while (se->clients)
1231         client_destroy(se->clients);
1232
1233     return session_grep_databases(se, crit, select_targets_callback);
1234 }
1235
1236 int session_active_clients(struct session *s)
1237 {
1238     struct client *c;
1239     int res = 0;
1240
1241     for (c = s->clients; c; c = c->next)
1242         if (c->connection && (c->state == Client_Connecting ||
1243                     c->state == Client_Initializing ||
1244                     c->state == Client_Searching ||
1245                     c->state == Client_Presenting))
1246             res++;
1247
1248     return res;
1249 }
1250
1251 // parses crit1=val1,crit2=val2|val3,...
1252 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1253 {
1254     struct database_criterion *res = 0;
1255     char **values;
1256     int num;
1257     int i;
1258
1259     if (!buf || !*buf)
1260         return 0;
1261     nmem_strsplit(m, ",", buf,  &values, &num);
1262     for (i = 0; i < num; i++)
1263     {
1264         char **subvalues;
1265         int subnum;
1266         int subi;
1267         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1268         char *eq = strchr(values[i], '=');
1269         if (!eq)
1270         {
1271             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1272             return 0;
1273         }
1274         *(eq++) = '\0';
1275         new->name = values[i];
1276         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1277         new->values = 0;
1278         for (subi = 0; subi < subnum; subi++)
1279         {
1280             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1281             newv->value = subvalues[subi];
1282             newv->next = new->values;
1283             new->values = newv;
1284         }
1285         new->next = res;
1286         res = new;
1287     }
1288     return res;
1289 }
1290
1291 char *search(struct session *se, char *query, char *filter)
1292 {
1293     int live_channels = 0;
1294     struct client *cl;
1295     struct database_criterion *criteria;
1296
1297     yaz_log(YLOG_DEBUG, "Search");
1298
1299     nmem_reset(se->nmem);
1300     criteria = parse_filter(se->nmem, filter);
1301     strcpy(se->query, query);
1302     se->requestid++;
1303     select_targets(se, criteria);
1304     for (cl = se->clients; cl; cl = cl->next)
1305     {
1306         if (client_prep_connection(cl))
1307             live_channels++;
1308     }
1309     if (live_channels)
1310     {
1311         int maxrecs = live_channels * global_parameters.toget;
1312         se->num_termlists = 0;
1313         se->reclist = reclist_create(se->nmem, maxrecs);
1314         // This will be initialized in send_search()
1315         se->relevance = 0;
1316         se->total_records = se->total_hits = se->total_merged = 0;
1317         se->expected_maxrecs = maxrecs;
1318     }
1319     else
1320         return "NOTARGETS";
1321
1322     return 0;
1323 }
1324
1325 // Apply a session override to a database
1326 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
1327 {
1328     struct session_database *sdb;
1329
1330     for (sdb = se->databases; sdb; sdb = sdb->next)
1331         if (!strcmp(dbname, sdb->database->url))
1332         {
1333             struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
1334             int offset = settings_offset(setting);
1335
1336             if (offset < 0)
1337             {
1338                 yaz_log(YLOG_WARN, "Unknown setting %s", setting);
1339                 return;
1340             }
1341             new->precedence = 0;
1342             new->target = dbname;
1343             new->name = setting;
1344             new->value = value;
1345             new->user = "";
1346             new->next = sdb->settings[offset];
1347             sdb->settings[offset] = new;
1348             break;
1349         }
1350     if (!sdb)
1351         yaz_log(YLOG_WARN, "Unknown database in setting override: %s", dbname);
1352 }
1353
1354 void session_init_databases_fun(void *context, struct database *db)
1355 {
1356     struct session *se = (struct session *) context;
1357     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
1358     int num = settings_num();
1359     int i;
1360
1361     new->database = db;
1362     new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
1363     for (i = 0; i < num; i++)
1364         new->settings[i] = db->settings[i];
1365     new->next = se->databases;
1366     se->databases = new;
1367 }
1368
1369 // Initialize session_database list -- this represents this session's view
1370 // of the database list -- subject to modification by the settings ws command
1371 void session_init_databases(struct session *se)
1372 {
1373     se->databases = 0;
1374     grep_databases(se, 0, session_init_databases_fun);
1375 }
1376
1377 void destroy_session(struct session *s)
1378 {
1379     yaz_log(YLOG_LOG, "Destroying session");
1380     while (s->clients)
1381         client_destroy(s->clients);
1382     nmem_destroy(s->nmem);
1383     wrbuf_destroy(s->wrbuf);
1384 }
1385
1386 struct session *new_session(NMEM nmem) 
1387 {
1388     int i;
1389     struct session *session = nmem_malloc(nmem, sizeof(*session));
1390
1391     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
1392     
1393     session->total_hits = 0;
1394     session->total_records = 0;
1395     session->num_termlists = 0;
1396     session->reclist = 0;
1397     session->requestid = -1;
1398     session->clients = 0;
1399     session->expected_maxrecs = 0;
1400     session->query[0] = '\0';
1401     session->session_nmem = nmem;
1402     session->nmem = nmem_create();
1403     session->wrbuf = wrbuf_alloc();
1404     session_init_databases(session);
1405     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1406     {
1407         session->watchlist[i].data = 0;
1408         session->watchlist[i].fun = 0;
1409     }
1410
1411     return session;
1412 }
1413
1414 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1415 {
1416     static struct hitsbytarget res[1000]; // FIXME MM
1417     struct client *cl;
1418
1419     *count = 0;
1420     for (cl = se->clients; cl; cl = cl->next)
1421     {
1422         char *name = session_setting_oneval(cl->database, PZ_NAME);
1423
1424         res[*count].id = cl->database->database->url;
1425         res[*count].name = *name ? name : "Unknown";
1426         res[*count].hits = cl->hits;
1427         res[*count].records = cl->records;
1428         res[*count].diagnostic = cl->diagnostic;
1429         res[*count].state = client_states[cl->state];
1430         res[*count].connected  = cl->connection ? 1 : 0;
1431         (*count)++;
1432     }
1433
1434     return res;
1435 }
1436
1437 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1438 {
1439     int i;
1440
1441     for (i = 0; i < s->num_termlists; i++)
1442         if (!strcmp((const char *) s->termlists[i].name, name))
1443             return termlist_highscore(s->termlists[i].termlist, num);
1444     return 0;
1445 }
1446
1447 #ifdef MISSING_HEADERS
1448 void report_nmem_stats(void)
1449 {
1450     size_t in_use, is_free;
1451
1452     nmem_get_memory_in_use(&in_use);
1453     nmem_get_memory_free(&is_free);
1454
1455     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1456             (long) in_use, (long) is_free);
1457 }
1458 #endif
1459
1460 struct record_cluster *show_single(struct session *s, int id)
1461 {
1462     struct record_cluster *r;
1463
1464     reclist_rewind(s->reclist);
1465     while ((r = reclist_read_record(s->reclist)))
1466         if (r->recid == id)
1467             return r;
1468     return 0;
1469 }
1470
1471 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1472         int *num, int *total, int *sumhits, NMEM nmem_show)
1473 {
1474     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1475                                        * sizeof(struct record_cluster *));
1476     struct reclist_sortparms *spp;
1477     int i;
1478 #if USE_TIMING    
1479     yaz_timing_t t = yaz_timing_create();
1480 #endif
1481
1482     for (spp = sp; spp; spp = spp->next)
1483         if (spp->type == Metadata_sortkey_relevance)
1484         {
1485             relevance_prepare_read(s->relevance, s->reclist);
1486             break;
1487         }
1488     reclist_sort(s->reclist, sp);
1489
1490     *total = s->reclist->num_records;
1491     *sumhits = s->total_hits;
1492
1493     for (i = 0; i < start; i++)
1494         if (!reclist_read_record(s->reclist))
1495         {
1496             *num = 0;
1497             recs = 0;
1498             break;
1499         }
1500
1501     for (i = 0; i < *num; i++)
1502     {
1503         struct record_cluster *r = reclist_read_record(s->reclist);
1504         if (!r)
1505         {
1506             *num = i;
1507             break;
1508         }
1509         recs[i] = r;
1510     }
1511 #if USE_TIMING
1512     yaz_timing_stop(t);
1513     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1514             yaz_timing_get_real(t), yaz_timing_get_user(t),
1515             yaz_timing_get_sys(t));
1516     yaz_timing_destroy(&t);
1517 #endif
1518     return recs;
1519 }
1520
1521 void statistics(struct session *se, struct statistics *stat)
1522 {
1523     struct client *cl;
1524     int count = 0;
1525
1526     memset(stat, 0, sizeof(*stat));
1527     for (cl = se->clients; cl; cl = cl->next)
1528     {
1529         if (!cl->connection)
1530             stat->num_no_connection++;
1531         switch (cl->state)
1532         {
1533             case Client_Connecting: stat->num_connecting++; break;
1534             case Client_Initializing: stat->num_initializing++; break;
1535             case Client_Searching: stat->num_searching++; break;
1536             case Client_Presenting: stat->num_presenting++; break;
1537             case Client_Idle: stat->num_idle++; break;
1538             case Client_Failed: stat->num_failed++; break;
1539             case Client_Error: stat->num_error++; break;
1540             default: break;
1541         }
1542         count++;
1543     }
1544     stat->num_hits = se->total_hits;
1545     stat->num_records = se->total_records;
1546
1547     stat->num_clients = count;
1548 }
1549
1550 static void start_http_listener(void)
1551 {
1552     char hp[128] = "";
1553     struct conf_server *ser = global_parameters.server;
1554
1555     if (*global_parameters.listener_override)
1556         strcpy(hp, global_parameters.listener_override);
1557     else
1558     {
1559         strcpy(hp, ser->host ? ser->host : "");
1560         if (ser->port)
1561         {
1562             if (*hp)
1563                 strcat(hp, ":");
1564             sprintf(hp + strlen(hp), "%d", ser->port);
1565         }
1566     }
1567     http_init(hp);
1568 }
1569
1570 static void start_proxy(void)
1571 {
1572     char hp[128] = "";
1573     struct conf_server *ser = global_parameters.server;
1574
1575     if (*global_parameters.proxy_override)
1576         strcpy(hp, global_parameters.proxy_override);
1577     else if (ser->proxy_host || ser->proxy_port)
1578     {
1579         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1580         if (ser->proxy_port)
1581         {
1582             if (*hp)
1583                 strcat(hp, ":");
1584             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1585         }
1586     }
1587     else
1588         return;
1589
1590     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1591 }
1592
1593 static void start_zproxy(void)
1594 {
1595     struct conf_server *ser = global_parameters.server;
1596
1597     if (*global_parameters.zproxy_override){
1598         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1599                 global_parameters.zproxy_override);
1600         return;
1601     }
1602
1603     else if (ser->zproxy_host || ser->zproxy_port)
1604     {
1605         char hp[128] = "";
1606
1607         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1608         if (ser->zproxy_port)
1609         {
1610             if (*hp)
1611                 strcat(hp, ":");
1612             else
1613                 strcat(hp, "@:");
1614
1615             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1616         }
1617         strcpy(global_parameters.zproxy_override, hp);
1618         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1619                 global_parameters.zproxy_override);
1620
1621     }
1622     else
1623         return;
1624 }
1625
1626
1627
1628 int main(int argc, char **argv)
1629 {
1630     int ret;
1631     char *arg;
1632
1633     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
1634         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1635
1636     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1637
1638     while ((ret = options("t:f:x:h:p:z:s:d", argv, argc, &arg)) != -2)
1639     {
1640         switch (ret) {
1641             case 'f':
1642                 if (!read_config(arg))
1643                     exit(1);
1644                 break;
1645             case 'h':
1646                 strcpy(global_parameters.listener_override, arg);
1647                 break;
1648             case 'p':
1649                 strcpy(global_parameters.proxy_override, arg);
1650                 break;
1651             case 'z':
1652                 strcpy(global_parameters.zproxy_override, arg);
1653                 break;
1654             case 't':
1655                 strcpy(global_parameters.settings_path_override, arg);
1656                 break;
1657             case 's':
1658                 load_simpletargets(arg);
1659                 break;
1660             case 'd':
1661                 global_parameters.dump_records = 1;
1662                 break;
1663             default:
1664                 fprintf(stderr, "Usage: pazpar2\n"
1665                         "    -f configfile\n"
1666                         "    -h [host:]port          (REST protocol listener)\n"
1667                         "    -C cclconfig\n"
1668                         "    -s simpletargetfile\n"
1669                         "    -p hostname[:portno]    (HTTP proxy)\n"
1670                         "    -z hostname[:portno]    (Z39.50 proxy)\n"
1671                         "    -d                      (show internal records)\n");
1672                 exit(1);
1673         }
1674     }
1675
1676     if (!config)
1677     {
1678         yaz_log(YLOG_FATAL, "Load config with -f");
1679         exit(1);
1680     }
1681     global_parameters.server = config->servers;
1682
1683     start_http_listener();
1684     start_proxy();
1685     start_zproxy();
1686
1687     if (*global_parameters.settings_path_override)
1688         settings_read(global_parameters.settings_path_override);
1689     else if (global_parameters.server->settings)
1690         settings_read(global_parameters.server->settings);
1691     else
1692         yaz_log(YLOG_WARN, "No settings-directory specified. Problems may well ensue!");
1693     prepare_databases();
1694     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1695     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1696
1697     event_loop(&channel_list);
1698
1699     return 0;
1700 }
1701
1702 /*
1703  * Local variables:
1704  * c-basic-offset: 4
1705  * indent-tabs-mode: nil
1706  * End:
1707  * vim: shiftwidth=4 tabstop=8 expandtab
1708  */