-/* $Id: pazpar2.c,v 1.7 2006-11-27 19:44:26 quinn Exp $ */;
+/* $Id: pazpar2.c,v 1.8 2006-12-03 06:43:24 quinn Exp $ */;
#include <stdlib.h>
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
#include <sys/socket.h>
+#include <netdb.h>
#include <signal.h>
#include <ctype.h>
#include <assert.h>
#include "relevance.h"
#define PAZPAR2_VERSION "0.1"
-#define MAX_DATABASES 512
#define MAX_CHUNK 10
-static void target_destroy(IOCHAN i);
-
-struct target
-{
- struct session *session;
- char fullname[256];
- char hostport[128];
- char *ibuf;
- int ibufsize;
- char databases[MAX_DATABASES][128];
- COMSTACK link;
- ODR odr_in, odr_out;
- struct target *next;
- void *addr;
- int hits;
- int records;
- int setno;
- int requestid; // ID of current outstanding request
- int diagnostic;
- IOCHAN iochan;
- enum target_state
- {
- No_connection,
- Connecting,
- Connected,
- Initializing,
- Searching,
- Presenting,
- Error,
- Idle,
- Stopped,
- Failed
- } state;
+static void client_fatal(struct client *cl);
+static void connection_destroy(struct connection *co);
+static int client_prep_connection(struct client *cl);
+
+IOCHAN channel_list = 0; // Master list of connections we're listening to.
+
+static struct connection *connection_freelist = 0;
+static struct client *client_freelist = 0;
+
+static struct host *hosts = 0; // The hosts we know about
+static struct database *databases = 0; // The databases we know about
+
+static char *client_states[] = {
+ "Client_Connecting",
+ "Client_Connected",
+ "Client_Idle",
+ "Client_Initializing",
+ "Client_Searching",
+ "Client_Presenting",
+ "Client_Error",
+ "Client_Failed",
+ "Client_Disconnected",
+ "Client_Stopped"
};
-static char *state_strings[] = {
- "No_connection",
- "Connecting",
- "Connected",
- "Initializing",
- "Searching",
- "Presenting",
- "Error",
- "Idle",
- "Failed"
-};
-
-
-IOCHAN channel_list = 0;
-
static struct parameters {
int timeout; /* operations timeout, in seconds */
char implementationId[128];
char implementationName[128];
char implementationVersion[128];
- struct timeval base_time;
+ int target_timeout; // seconds
int toget;
int chunk;
CCL_bibset ccl_filter;
+ yaz_marc_t yaz_marc;
+ ODR odr_out;
+ ODR odr_in;
} global_parameters =
{
30,
"81",
"Index Data PazPar2 (MasterKey)",
PAZPAR2_VERSION,
- {0,0},
+ 600, // 10 minutes
100,
MAX_CHUNK,
+ 0,
+ 0,
+ 0,
0
};
-static int send_apdu(struct target *t, Z_APDU *a)
+static int send_apdu(struct client *c, Z_APDU *a)
{
+ struct connection *co = c->connection;
char *buf;
int len, r;
- if (!z_APDU(t->odr_out, &a, 0, 0))
+ if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
{
- odr_perror(t->odr_out, "Encoding APDU");
+ odr_perror(global_parameters.odr_out, "Encoding APDU");
abort();
}
- buf = odr_getbuf(t->odr_out, &len, 0);
- r = cs_put(t->link, buf, len);
+ buf = odr_getbuf(global_parameters.odr_out, &len, 0);
+ r = cs_put(co->link, buf, len);
if (r < 0)
{
- yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
+ yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
return -1;
}
else if (r == 1)
{
fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
+ exit(1);
}
- odr_reset(t->odr_out); /* release the APDU structure */
+ odr_reset(global_parameters.odr_out); /* release the APDU structure */
+ co->state = Conn_Waiting;
return 0;
}
static void send_init(IOCHAN i)
{
- struct target *t = iochan_getdata(i);
- Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
+ Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
a->u.initRequest->implementationId = global_parameters.implementationId;
a->u.initRequest->implementationName = global_parameters.implementationName;
ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
- if (send_apdu(t, a) >= 0)
+ if (send_apdu(cl, a) >= 0)
{
iochan_setflags(i, EVENT_INPUT);
- t->state = Initializing;
+ cl->state = Client_Initializing;
}
else
- target_destroy(i);
+ cl->state = Client_Error;
+ odr_reset(global_parameters.odr_out);
}
static void send_search(IOCHAN i)
{
- struct target *t = iochan_getdata(i);
- struct session *s = t->session;
- Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
+ struct session *se = cl->session;
+ struct database *db = cl->database;
+ Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
int ndb, cerror, cpos;
char **databaselist;
Z_Query *zquery;
yaz_log(YLOG_DEBUG, "Sending search");
- cn = ccl_find_str(global_parameters.ccl_filter, s->query, &cerror, &cpos);
+ cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
if (!cn)
return;
- a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
+ a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
+ sizeof(Z_Query));
zquery->which = Z_Query_type_1;
- zquery->u.type_1 = ccl_rpn_query(t->odr_out, cn);
+ zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
ccl_rpn_delete(cn);
- for (ndb = 0; *t->databases[ndb]; ndb++)
+ for (ndb = 0; *db->databases[ndb]; ndb++)
;
- databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
- for (ndb = 0; *t->databases[ndb]; ndb++)
- databaselist[ndb] = t->databases[ndb];
+ databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
+ for (ndb = 0; *db->databases[ndb]; ndb++)
+ databaselist[ndb] = db->databases[ndb];
a->u.searchRequest->resultSetName = "Default";
a->u.searchRequest->databaseNames = databaselist;
a->u.searchRequest->num_databaseNames = ndb;
- if (send_apdu(t, a) >= 0)
+ if (send_apdu(cl, a) >= 0)
{
iochan_setflags(i, EVENT_INPUT);
- t->state = Searching;
- t->requestid = s->requestid;
+ cl->state = Client_Searching;
+ cl->requestid = se->requestid;
}
else
- {
- target_destroy(i);
- return;
- }
- odr_reset(t->odr_out);
+ cl->state = Client_Error;
+
+ odr_reset(global_parameters.odr_out);
}
static void send_present(IOCHAN i)
{
- struct target *t = iochan_getdata(i);
- Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
+ Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
int toget;
- int start = t->records + 1;
+ int start = cl->records + 1;
toget = global_parameters.chunk;
- if (toget > t->hits - t->records)
- toget = t->hits - t->records;
+ if (toget > cl->hits - cl->records)
+ toget = cl->hits - cl->records;
yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
a->u.presentRequest->resultSetId = "Default";
- a->u.presentRequest->preferredRecordSyntax = yaz_oidval_to_z3950oid(t->odr_out,
+ a->u.presentRequest->preferredRecordSyntax =
+ yaz_oidval_to_z3950oid(global_parameters.odr_out,
CLASS_RECSYN, VAL_USMARC);
- if (send_apdu(t, a) >= 0)
+ if (send_apdu(cl, a) >= 0)
{
iochan_setflags(i, EVENT_INPUT);
- t->state = Presenting;
+ cl->state = Client_Presenting;
}
else
- {
- target_destroy(i);
- return;
- }
- odr_reset(t->odr_out);
+ cl->state = Client_Error;
+ odr_reset(global_parameters.odr_out);
}
static void do_initResponse(IOCHAN i, Z_APDU *a)
{
- struct target *t = iochan_getdata(i);
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
Z_InitResponse *r = a->u.initResponse;
yaz_log(YLOG_DEBUG, "Received init response");
if (*r->result)
{
- t->state = Idle;
+ cl->state = Client_Idle;
}
else
- target_destroy(i);
+ cl->state = Client_Failed; // FIXME need to do something to the connection
}
static void do_searchResponse(IOCHAN i, Z_APDU *a)
{
- struct target *t = iochan_getdata(i);
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
+ struct session *se = cl->session;
Z_SearchResponse *r = a->u.searchResponse;
yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
if (*r->searchStatus)
{
- t->hits = *r->resultCount;
- t->state = Idle;
- t->session->total_hits += t->hits;
+ cl->hits = *r->resultCount;
+ cl->state = Client_Idle;
+ se->total_hits += cl->hits;
}
else
{ /*"FAILED"*/
- t->hits = 0;
- t->state = Failed;
+ cl->hits = 0;
+ cl->state = Client_Error;
if (r->records) {
Z_Records *recs = r->records;
if (recs->which == Z_Records_NSD)
{
yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
- t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
- t->state = Error;
+ cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
+ cl->state = Client_Error;
}
}
}
const char *find_field(const char *rec, const char *field)
{
- const char *line = rec;
+ char lbuf[5];
+ char *line;
- while (*line)
- {
- const char *eol;
+ lbuf[0] = '\n';
+ strcpy(lbuf + 1, field);
- if (!strncmp(line, field, 3) && line[3] == ' ')
- return line;
- while (*line && *line != '\n')
- line++;
- if (!(eol = strchr(line, '\n')))
- return 0;
- line = eol + 1;
- }
- return 0;
+ if ((line = strstr(rec, lbuf)))
+ return ++line;
+ else
+ return 0;
}
const char *find_subfield(const char *field, char subfield)
while ((field = find_field(rec, "650")))
{
- rec = field + 1; // Crude way to cause a loop through repeating fields
+ rec = field;
if ((subfield = find_subfield(field, 'a')))
{
char *e, *ef;
relevance_donerecord(s->relevance, head);
}
-struct record *ingest_record(struct target *t, char *buf, int len)
+struct record *ingest_record(struct client *cl, char *buf, int len)
{
- struct session *s = t->session;
+ struct session *se = cl->session;
struct record *res;
struct record *head;
const char *recbuf;
- wrbuf_rewind(s->wrbuf);
- yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
- if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
+ wrbuf_rewind(se->wrbuf);
+ yaz_marc_xml(global_parameters.yaz_marc, YAZ_MARC_LINE);
+ if (yaz_marc_decode_wrbuf(global_parameters.yaz_marc, buf, len, se->wrbuf) < 0)
{
yaz_log(YLOG_WARN, "Failed to decode MARC record");
return 0;
}
- wrbuf_putc(s->wrbuf, '\0');
- recbuf = wrbuf_buf(s->wrbuf);
+ wrbuf_putc(se->wrbuf, '\0');
+ recbuf = wrbuf_buf(se->wrbuf);
- res = nmem_malloc(s->nmem, sizeof(struct record));
- res->buf = nmem_strdup(s->nmem, recbuf);
+ res = nmem_malloc(se->nmem, sizeof(struct record));
+ res->buf = nmem_strdup(se->nmem, recbuf);
- extract_subject(s, res->buf);
+ extract_subject(se, res->buf);
- res->title = extract_title(s, res->buf);
- res->merge_key = extract_mergekey(s, res->buf);
+ res->title = extract_title(se, res->buf);
+ res->merge_key = extract_mergekey(se, res->buf);
if (!res->merge_key)
return 0;
- res->target = t;
+ res->client = cl;
res->next_cluster = 0;
res->target_offset = -1;
res->term_frequency_vec = 0;
- head = reclist_insert(s->reclist, res);
+ head = reclist_insert(se->reclist, res);
- pull_relevance_keys(s, head, res);
+ pull_relevance_keys(se, head, res);
- s->total_records++;
+ se->total_records++;
return res;
}
-void ingest_records(struct target *t, Z_Records *r)
+void ingest_records(struct client *cl, Z_Records *r)
{
- //struct session *s = t->session;
struct record *rec;
Z_NamePlusRecordList *rlist;
int i;
buf = (char*) e->u.octet_aligned->buf;
len = e->u.octet_aligned->len;
- rec = ingest_record(t, buf, len);
+ rec = ingest_record(cl, buf, len);
if (!rec)
continue;
}
static void do_presentResponse(IOCHAN i, Z_APDU *a)
{
- struct target *t = iochan_getdata(i);
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
Z_PresentResponse *r = a->u.presentResponse;
if (r->records) {
if (recs->which == Z_Records_NSD)
{
yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
- t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
- t->state = Error;
+ cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
+ cl->state = Client_Error;
}
}
- if (!*r->presentStatus && t->state != Error)
+ if (!*r->presentStatus && cl->state != Client_Error)
{
yaz_log(YLOG_DEBUG, "Good Present response");
- t->records += *r->numberOfRecordsReturned;
- ingest_records(t, r->records);
- t->state = Idle;
+ cl->records += *r->numberOfRecordsReturned;
+ ingest_records(cl, r->records);
+ cl->state = Client_Idle;
}
else if (*r->presentStatus)
{
yaz_log(YLOG_WARN, "Bad Present response");
- t->state = Error;
+ cl->state = Client_Error;
}
}
static void handler(IOCHAN i, int event)
{
- struct target *t = iochan_getdata(i);
- struct session *s = t->session;
- //static int waiting = 0;
+ struct connection *co = iochan_getdata(i);
+ struct client *cl = co->client;
+ struct session *se = 0;
- if (t->state == No_connection) /* Start connection */
- {
- int res = cs_connect(t->link, t->addr);
+ if (cl)
+ se = cl->session;
- t->state = Connecting;
- if (!res) /* we are go */
- iochan_setevent(i, EVENT_OUTPUT);
- else if (res == 1)
- iochan_setflags(i, EVENT_OUTPUT);
- else
- {
- yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
- target_destroy(i);
- return;
- }
- }
-
- else if (t->state == Connecting && event & EVENT_OUTPUT)
+ if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
{
int errcode;
socklen_t errlen = sizeof(errcode);
- if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
+ if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
&errlen) < 0 || errcode != 0)
{
- target_destroy(i);
+ client_fatal(cl);
return;
}
else
{
yaz_log(YLOG_DEBUG, "Connect OK");
- t->state = Connected;
+ co->state = Conn_Open;
+ if (cl)
+ cl->state = Client_Connected;
}
}
else if (event & EVENT_INPUT)
{
- int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
+ int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
if (len < 0)
{
- target_destroy(i);
+ client_fatal(cl);
return;
}
- if (len == 0)
+ else if (len == 0)
{
- target_destroy(i);
+ client_fatal(cl);
return;
}
- else if (len > 1)
+ else if (len > 1) // We discard input if we have no connection
{
- if (t->requestid == s->requestid || t->state == Initializing)
+ co->state = Conn_Open;
+
+ if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
{
Z_APDU *a;
- odr_reset(t->odr_in);
- odr_setbuf(t->odr_in, t->ibuf, len, 0);
- if (!z_APDU(t->odr_in, &a, 0, 0))
+ odr_reset(global_parameters.odr_in);
+ odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
+ if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
{
- target_destroy(i);
+ client_fatal(cl);
return;
}
switch (a->which)
break;
default:
yaz_log(YLOG_WARN, "Unexpected result from server");
- target_destroy(i);
+ client_fatal(cl);
return;
}
+ // We aren't expecting staggered output from target
// if (cs_more(t->link))
// iochan_setevent(i, EVENT_INPUT);
}
else // we throw away response and go to idle mode
{
- yaz_log(YLOG_DEBUG, "Ignoring result to previous operation");
- t->state = Idle;
+ yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
+ cl->state = Client_Idle;
}
}
/* if len==1 we do nothing but wait for more input */
}
- else if (t->state == Connected) {
+ if (cl->state == Client_Connected) {
send_init(i);
}
- if (t->state == Idle)
+ if (cl->state == Client_Idle)
{
- if (t->requestid != s->requestid && *s->query) {
+ if (cl->requestid != se->requestid && *se->query) {
send_search(i);
}
- else if (t->hits > 0 && t->records < global_parameters.toget &&
- t->records < t->hits) {
+ else if (cl->hits > 0 && cl->records < global_parameters.toget &&
+ cl->records < cl->hits) {
send_present(i);
}
}
}
-static void target_destroy(IOCHAN i)
+// Disassociate connection from client
+static void connection_release(struct connection *co)
{
- struct target *t = iochan_getdata(i);
- struct session *s = t->session;
- struct target **p;
- assert(iochan_getfun(i) == handler);
-
- yaz_log(YLOG_DEBUG, "Destroying target");
-
- if (t->ibuf)
- xfree(t->ibuf);
- cs_close(t->link);
- if (t->odr_in)
- odr_destroy(t->odr_in);
- if (t->odr_out)
- odr_destroy(t->odr_out);
- for (p = &s->targets; *p; p = &(*p)->next)
- if (*p == t)
+ struct client *cl = co->client;
+
+ yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
+ if (!cl)
+ return;
+ cl->connection = 0;
+ co->client = 0;
+}
+
+// Close connection and recycle structure
+static void connection_destroy(struct connection *co)
+{
+ struct host *h = co->host;
+ cs_close(co->link);
+ iochan_destroy(co->iochan);
+
+ yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
+ if (h->connections == co)
+ h->connections = co->next;
+ else
+ {
+ struct connection *pco;
+ for (pco = h->connections; pco && pco->next != co; pco = pco->next)
+ ;
+ if (pco)
+ pco->next = co->next;
+ else
+ abort();
+ }
+ if (co->client)
+ {
+ if (co->client->state != Client_Idle)
+ co->client->state = Client_Disconnected;
+ co->client->connection = 0;
+ }
+ co->next = connection_freelist;
+ connection_freelist = co;
+}
+
+// Creates a new connection for client, associated with the host of
+// client's database
+static struct connection *connection_create(struct client *cl)
+{
+ struct connection *new;
+ COMSTACK link;
+ int res;
+ void *addr;
+
+ yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
+ if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
+ {
+ yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
+ exit(1);
+ }
+
+ if (!(addr = cs_straddr(link, cl->database->host->ipport)))
+ {
+ yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
+ return 0;
+ }
+
+ res = cs_connect(link, addr);
+ if (res < 0)
+ {
+ yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
+ return 0;
+ }
+
+ if ((new = connection_freelist))
+ connection_freelist = new->next;
+ else
+ {
+ new = xmalloc(sizeof (struct connection));
+ new->ibuf = 0;
+ new->ibufsize = 0;
+ }
+ new->state = Conn_Connecting;
+ new->host = cl->database->host;
+ new->next = new->host->connections;
+ new->host->connections = new;
+ new->client = cl;
+ cl->connection = new;
+ new->link = link;
+
+ new->iochan = iochan_create(cs_fileno(link), handler, 0);
+ iochan_setdata(new->iochan, new);
+ new->iochan->next = channel_list;
+ channel_list = new->iochan;
+ return new;
+}
+
+// Close connection and set state to error
+static void client_fatal(struct client *cl)
+{
+ yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
+ connection_destroy(cl->connection);
+ cl->state = Client_Error;
+}
+
+// Ensure that client has a connection associated
+static int client_prep_connection(struct client *cl)
+{
+ struct connection *co;
+ struct session *se = cl->session;
+ struct host *host = cl->database->host;
+
+ co = cl->connection;
+
+ yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
+
+ if (!co)
+ {
+ // See if someone else has an idle connection
+ // We should look at timestamps here to select the longest-idle connection
+ for (co = host->connections; co; co = co->next)
+ if (co->state == Conn_Open && (!co->client || co->client->session != se))
+ break;
+ if (co)
{
- *p = (*p)->next;
- break;
+ connection_release(co);
+ cl->connection = co;
+ co->client = cl;
+ }
+ else
+ co = connection_create(cl);
+ }
+ if (co)
+ {
+ if (co->state == Conn_Connecting)
+ cl->state = Client_Connecting;
+ else if (co->state == Conn_Open)
+ {
+ if (cl->state == Client_Error || cl->state == Client_Disconnected)
+ cl->state = Client_Idle;
}
- xfree(t);
- iochan_destroy(i);
+ iochan_setflag(co->iochan, EVENT_OUTPUT);
+ return 1;
+ }
+ else
+ return 0;
}
-int load_targets(struct session *s, const char *fn)
+void load_simpletargets(const char *fn)
{
FILE *f = fopen(fn, "r");
char line[256];
- struct target **target_p;
if (!f)
{
yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
- return -1;
+ exit(1);
}
- while (s->targets)
- target_destroy(s->targets->iochan);
-
- s->query[0] = '\0';
- target_p = &s->targets;
while (fgets(line, 255, f))
{
- char *url, *p;
- struct target *target;
- IOCHAN new;
+ char *url, *db;
+ struct host *host;
+ struct database *database;
if (strncmp(line, "target ", 7))
continue;
url = line + 7;
url[strlen(url) - 1] = '\0';
yaz_log(LOG_DEBUG, "Target: %s", url);
-
- *target_p = target = xmalloc(sizeof(**target_p));
- target->next = 0;
- target_p = &target->next;
- target->state = No_connection;
- target->ibuf = 0;
- target->ibufsize = 0;
- target->odr_in = odr_createmem(ODR_DECODE);
- target->odr_out = odr_createmem(ODR_ENCODE);
- target->hits = -1;
- target->setno = 0;
- target->session = s;
- target->requestid = -1;
- target->records = 0;
- target->diagnostic = 0;
- strcpy(target->fullname, url);
- if ((p = strchr(url, '/')))
- {
- *p = '\0';
- strcpy(target->hostport, url);
- *p = '/';
- p++;
- strcpy(target->databases[0], p);
- target->databases[1][0] = '\0';
- }
+ if ((db = strchr(url, '/')))
+ *(db++) = '\0';
else
- {
- strcpy(target->hostport, url);
- strcpy(target->databases[0], "Default");
- target->databases[1][0] = '\0';
- }
+ db = "Default";
- if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
+ for (host = hosts; host; host = host->next)
+ if (!strcmp(url, host->hostport))
+ break;
+ if (!host)
{
- yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
- exit(1);
+ struct addrinfo *addrinfo, hints;
+ char *port;
+ char ipport[128];
+ unsigned char addrbuf[4];
+ int res;
+
+ host = xmalloc(sizeof(struct host));
+ host->hostport = xstrdup(url);
+ host->connections = 0;
+
+ if ((port = strchr(url, ':')))
+ *(port++) = '\0';
+ else
+ port = "210";
+
+ hints.ai_flags = 0;
+ hints.ai_family = PF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+ hints.ai_addrlen = 0;
+ hints.ai_addr = 0;
+ hints.ai_canonname = 0;
+ hints.ai_next = 0;
+ // This is not robust code. It assumes that getaddrinfo returns AF_INET
+ // address.
+ if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
+ {
+ yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
+ continue;
+ }
+ assert(addrinfo->ai_family == PF_INET);
+ memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
+ sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
+ addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
+ host->ipport = xstrdup(ipport);
+ freeaddrinfo(addrinfo);
+ host->next = hosts;
+ hosts = host;
}
- if (!(target->addr = cs_straddr(target->link, target->hostport)))
- {
- printf("ERROR %s bad-address", target->hostport);
- target->state = Failed;
- continue;
- }
- target->iochan = new = iochan_create(cs_fileno(target->link), handler, 0);
- assert(new);
- iochan_setdata(new, target);
- iochan_setevent(new, EVENT_EXCEPT);
- new->next = channel_list;
- channel_list = new;
+ database = xmalloc(sizeof(struct database));
+ database->host = host;
+ database->url = xmalloc(strlen(url) + strlen(db) + 2);
+ strcpy(database->url, url);
+ strcat(database->url, "/");
+ strcat(database->url, db);
+ strcpy(database->databases[0], db);
+ *database->databases[1] = '\0';
+ database->errors = 0;
+ database->next = databases;
+ databases = database;
+
}
fclose(f);
-
- return 0;
}
static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
return 0;
}
-char *search(struct session *s, char *query)
+static struct client *client_create(void)
{
- IOCHAN c;
- int live_channels = 0;
+ struct client *r;
+ if (client_freelist)
+ {
+ r = client_freelist;
+ client_freelist = client_freelist->next;
+ }
+ else
+ r = xmalloc(sizeof(struct client));
+ r->database = 0;
+ r->connection = 0;
+ r->session = 0;
+ r->hits = 0;
+ r->records = 0;
+ r->setno = 0;
+ r->requestid = -1;
+ r->diagnostic = 0;
+ r->state = Client_Disconnected;
+ r->next = 0;
+ return r;
+}
- yaz_log(YLOG_DEBUG, "Search");
+void client_destroy(struct client *c)
+{
+ struct session *se = c->session;
+ if (c == se->clients)
+ se->clients = c->next;
+ else
+ {
+ struct client *cc;
+ for (cc = se->clients; cc && cc->next != c; cc = cc->next)
+ ;
+ if (cc)
+ cc->next = c->next;
+ }
+ if (c->connection)
+ connection_destroy(c->connection);
+ c->next = client_freelist;
+ client_freelist = c;
+}
- // Determine what iochans belong to this session
- // It might have been better to have a list of them
+// This should be extended with parameters to control selection criteria
+// Associates a set of clients with a session;
+int select_targets(struct session *se)
+{
+ struct database *db;
+ int c = 0;
- strcpy(s->query, query);
- s->requestid++;
- nmem_reset(s->nmem);
- for (c = channel_list; c; c = c->next)
+ while (se->clients)
+ client_destroy(se->clients);
+ for (db = databases; db; db = db->next)
{
- struct target *t;
+ struct client *cl = client_create();
+ cl->database = db;
+ cl->session = se;
+ cl->next = se->clients;
+ se->clients = cl;
+ c++;
+ }
+ return c;
+}
- if (iochan_getfun(c) != handler) // Not a Z target
- continue;
- t = iochan_getdata(c);
- if (t->session == s)
- {
- t->hits = -1;
- t->records = 0;
- t->diagnostic = 0;
+char *search(struct session *se, char *query)
+{
+ int live_channels = 0;
+ struct client *cl;
- if (t->state == Error)
- t->state = Idle;
+ yaz_log(YLOG_DEBUG, "Search");
- if (t->state == Idle)
- iochan_setflag(c, EVENT_OUTPUT);
+ strcpy(se->query, query);
+ se->requestid++;
+ nmem_reset(se->nmem);
+ for (cl = se->clients; cl; cl = cl->next)
+ {
+ cl->hits = -1;
+ cl->records = 0;
+ cl->diagnostic = 0;
+ if (client_prep_connection(cl))
live_channels++;
- }
}
if (live_channels)
{
char *p[512];
int maxrecs = live_channels * global_parameters.toget;
- s->termlist = termlist_create(s->nmem, maxrecs, 15);
- s->reclist = reclist_create(s->nmem, maxrecs);
- extract_terms(s->nmem, query, p);
- s->relevance = relevance_create(s->nmem, (const char **) p, maxrecs);
- s->total_records = s->total_hits = 0;
+ se->termlist = termlist_create(se->nmem, maxrecs, 15);
+ se->reclist = reclist_create(se->nmem, maxrecs);
+ extract_terms(se->nmem, query, p);
+ se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
+ se->total_records = se->total_hits = 0;
}
else
return "NOTARGETS";
session->termlist = 0;
session->reclist = 0;
session->requestid = -1;
- session->targets = 0;
- session->pqf_parser = yaz_pqf_create();
+ session->clients = 0;
session->query[0] = '\0';
session->nmem = nmem_create();
- session->yaz_marc = yaz_marc_create();
- yaz_marc_subfield_str(session->yaz_marc, "\t");
session->wrbuf = wrbuf_alloc();
+ select_targets(session);
+
return session;
}
// FIXME do some shit here!!!!
}
-struct hitsbytarget *hitsbytarget(struct session *s, int *count)
+struct hitsbytarget *hitsbytarget(struct session *se, int *count)
{
static struct hitsbytarget res[1000]; // FIXME MM
- IOCHAN c;
+ struct client *cl;
*count = 0;
- for (c = channel_list; c; c = c->next)
- if (iochan_getfun(c) == handler)
- {
- struct target *t = iochan_getdata(c);
- if (t->session == s)
- {
- strcpy(res[*count].id, t->hostport);
- res[*count].hits = t->hits;
- res[*count].records = t->records;
- res[*count].diagnostic = t->diagnostic;
- res[*count].state = state_strings[(int) t->state];
- (*count)++;
- }
- }
+ for (cl = se->clients; cl; cl = cl->next)
+ {
+ strcpy(res[*count].id, cl->database->host->hostport);
+ res[*count].hits = cl->hits;
+ res[*count].records = cl->records;
+ res[*count].diagnostic = cl->diagnostic;
+ res[*count].state = client_states[cl->state];
+ res[*count].connected = cl->connection ? 1 : 0;
+ (*count)++;
+ }
return res;
}
return recs;
}
-void statistics(struct session *s, struct statistics *stat)
+void statistics(struct session *se, struct statistics *stat)
{
- IOCHAN c;
- int i;
+ struct client *cl;
+ int count = 0;
bzero(stat, sizeof(*stat));
- for (i = 0, c = channel_list; c; i++, c = c->next)
+ for (cl = se->clients; cl; cl = cl->next)
{
- struct target *t;
- if (iochan_getfun(c) != handler)
- continue;
- t = iochan_getdata(c);
- switch (t->state)
+ if (!cl->connection)
+ stat->num_no_connection++;
+ switch (cl->state)
{
- case No_connection: stat->num_no_connection++; break;
- case Connecting: stat->num_connecting++; break;
- case Initializing: stat->num_initializing++; break;
- case Searching: stat->num_searching++; break;
- case Presenting: stat->num_presenting++; break;
- case Idle: stat->num_idle++; break;
- case Failed: stat->num_failed++; break;
- case Error: stat->num_error++; break;
+ case Client_Connecting: stat->num_connecting++; break;
+ case Client_Initializing: stat->num_initializing++; break;
+ case Client_Searching: stat->num_searching++; break;
+ case Client_Presenting: stat->num_presenting++; break;
+ case Client_Idle: stat->num_idle++; break;
+ case Client_Failed: stat->num_failed++; break;
+ case Client_Error: stat->num_error++; break;
default: break;
}
+ count++;
}
- stat->num_hits = s->total_hits;
- stat->num_records = s->total_records;
+ stat->num_hits = se->total_hits;
+ stat->num_records = se->total_records;
- stat->num_connections = i;
+ stat->num_clients = count;
}
static CCL_bibset load_cclfile(const char *fn)
yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
- while ((ret = options("c:h:p:C:", argv, argc, &arg)) != -2)
+ while ((ret = options("c:h:p:C:s:", argv, argc, &arg)) != -2)
{
switch (ret) {
case 0:
case 'p':
http_set_proxyaddr(arg);
break;
+ case 's':
+ load_simpletargets(arg);
+ break;
default:
fprintf(stderr, "Usage: pazpar2 -d comport");
exit(1);
}
-
}
- if (!global_parameters.ccl_filter)
- global_parameters.ccl_filter = load_cclfile("default.bib");
+ global_parameters.ccl_filter = load_cclfile("default.bib");
+ global_parameters.yaz_marc = yaz_marc_create();
+ yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
+ global_parameters.odr_in = odr_createmem(ODR_DECODE);
+ global_parameters.odr_out = odr_createmem(ODR_ENCODE);
event_loop(&channel_list);