-/* $Id: pazpar2.c,v 1.3 2006-11-21 18:46:43 quinn Exp $ */
+/* $Id: pazpar2.c,v 1.7 2006-11-27 19:44:26 quinn Exp $ */;
#include <stdlib.h>
#include <stdio.h>
#include <yaz/readconf.h>
#include <yaz/pquery.h>
#include <yaz/yaz-util.h>
+#include <yaz/ccl.h>
+#include <yaz/yaz-ccl.h>
#include "pazpar2.h"
#include "eventl.h"
#include "command.h"
#include "http.h"
+#include "termlists.h"
+#include "reclists.h"
+#include "relevance.h"
#define PAZPAR2_VERSION "0.1"
#define MAX_DATABASES 512
#define MAX_CHUNK 10
+static void target_destroy(IOCHAN i);
+
struct target
{
struct session *session;
int setno;
int requestid; // ID of current outstanding request
int diagnostic;
+ IOCHAN iochan;
enum target_state
{
No_connection,
Presenting,
Error,
Idle,
+ Stopped,
Failed
} state;
};
struct timeval base_time;
int toget;
int chunk;
+ CCL_bibset ccl_filter;
} global_parameters =
{
30,
PAZPAR2_VERSION,
{0,0},
100,
- MAX_CHUNK
+ MAX_CHUNK,
+ 0
};
t->state = Initializing;
}
else
- {
- iochan_destroy(i);
- t->state = Failed;
- cs_close(t->link);
- }
+ target_destroy(i);
}
static void send_search(IOCHAN i)
struct target *t = iochan_getdata(i);
struct session *s = t->session;
Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
- int ndb;
+ int ndb, cerror, cpos;
char **databaselist;
Z_Query *zquery;
+ struct ccl_rpn_node *cn;
yaz_log(YLOG_DEBUG, "Sending search");
+
+ cn = ccl_find_str(global_parameters.ccl_filter, s->query, &cerror, &cpos);
+ if (!cn)
+ return;
a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
zquery->which = Z_Query_type_1;
- zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
+ zquery->u.type_1 = ccl_rpn_query(t->odr_out, cn);
+ ccl_rpn_delete(cn);
for (ndb = 0; *t->databases[ndb]; ndb++)
;
}
else
{
- iochan_destroy(i);
- t->state = Failed;
- cs_close(t->link);
+ target_destroy(i);
+ return;
}
odr_reset(t->odr_out);
}
a->u.presentRequest->resultSetId = "Default";
+ a->u.presentRequest->preferredRecordSyntax = yaz_oidval_to_z3950oid(t->odr_out,
+ CLASS_RECSYN, VAL_USMARC);
+
if (send_apdu(t, a) >= 0)
{
iochan_setflags(i, EVENT_INPUT);
}
else
{
- iochan_destroy(i);
- t->state = Failed;
- cs_close(t->link);
+ target_destroy(i);
+ return;
}
odr_reset(t->odr_out);
}
t->state = Idle;
}
else
- {
- t->state = Failed;
- iochan_destroy(i);
- cs_close(t->link);
- }
+ target_destroy(i);
}
static void do_searchResponse(IOCHAN i, Z_APDU *a)
{
t->hits = *r->resultCount;
t->state = Idle;
+ t->session->total_hits += t->hits;
}
else
{ /*"FAILED"*/
while (*line)
{
+ const char *eol;
+
if (!strncmp(line, field, 3) && line[3] == ' ')
return line;
- while (*(line++) != '\n')
- ;
+ while (*line && *line != '\n')
+ line++;
+ if (!(eol = strchr(line, '\n')))
+ return 0;
+ line = eol + 1;
}
return 0;
}
p++;
if (*p == '\t' && *(++p) == subfield) {
if (*(++p) == ' ')
- return ++p;
+ {
+ while (isspace(*p))
+ p++;
+ return p;
+ }
}
}
return 0;
}
// Extract 245 $a $b 100 $a
+char *extract_title(struct session *s, const char *rec)
+{
+ const char *field, *subfield;
+ char *e, *ef;
+ unsigned char *obuf, *p;
+
+ wrbuf_rewind(s->wrbuf);
+
+ if (!(field = find_field(rec, "245")))
+ return 0;
+ if (!(subfield = find_subfield(field, 'a')))
+ return 0;
+ ef = index(subfield, '\n');
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ if (ef)
+ {
+ wrbuf_write(s->wrbuf, subfield, ef - subfield);
+ if ((subfield = find_subfield(field, 'b')))
+ {
+ ef = index(subfield, '\n');
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ if (ef)
+ {
+ wrbuf_putc(s->wrbuf, ' ');
+ wrbuf_write(s->wrbuf, subfield, ef - subfield);
+ }
+ }
+ }
+ if ((field = find_field(rec, "100")))
+ {
+ if ((subfield = find_subfield(field, 'a')))
+ {
+ ef = index(subfield, '\n');
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ if (ef)
+ {
+ wrbuf_puts(s->wrbuf, ", by ");
+ wrbuf_write(s->wrbuf, subfield, ef - subfield);
+ }
+ }
+ }
+ wrbuf_putc(s->wrbuf, '\0');
+ obuf = nmem_strdup(s->nmem, wrbuf_buf(s->wrbuf));
+ for (p = obuf; *p; p++)
+ if (*p == '&' || *p == '<' || *p > 122 || *p < ' ')
+ *p = ' ';
+ return obuf;
+}
+
+// Extract 245 $a $b 100 $a
char *extract_mergekey(struct session *s, const char *rec)
{
const char *field, *subfield;
return out;
}
+#ifdef RECHEAP
static void push_record(struct session *s, struct record *r)
{
int p;
static struct record *pop_record(struct session *s)
{
- struct record *res = s->recheap[0];
+ struct record *res;
int p = 0;
int lastnonleaf = (s->recheap_max - 1) >> 1;
if (s->recheap_max < 0)
return 0;
+ res = s->recheap[0];
+
s->recheap[p] = s->recheap[s->recheap_max--];
while (p <= lastnonleaf)
}
}
+#endif
+
+// FIXME needs to be generalized. Should flexibly generate X lists per search
+static void extract_subject(struct session *s, const char *rec)
+{
+ const char *field, *subfield;
+
+ while ((field = find_field(rec, "650")))
+ {
+ rec = field + 1; // Crude way to cause a loop through repeating fields
+ if ((subfield = find_subfield(field, 'a')))
+ {
+ char *e, *ef;
+ char buf[1024];
+ int len;
+
+ ef = index(subfield, '\n');
+ if (!ef)
+ return;
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
+ ef--;
+ len = ef - subfield;
+ assert(len < 1023);
+ memcpy(buf, subfield, len);
+ buf[len] = '\0';
+ if (*buf)
+ termlist_insert(s->termlist, buf);
+ }
+ }
+}
+
+static void pull_relevance_field(struct session *s, struct record *head, const char *rec,
+ char *field, int mult)
+{
+ const char *fb;
+ while ((fb = find_field(rec, field)))
+ {
+ char *ffield = strchr(fb, '\t');
+ if (!ffield)
+ return;
+ char *eol = strchr(ffield, '\n');
+ if (!eol)
+ return;
+ relevance_countwords(s->relevance, head, ffield, eol - ffield, mult);
+ rec = field + 1; // Crude way to cause a loop through repeating fields
+ }
+}
+
+static void pull_relevance_keys(struct session *s, struct record *head, struct record *rec)
+{
+ relevance_newrec(s->relevance, head);
+ pull_relevance_field(s, head, rec->buf, "100", 2);
+ pull_relevance_field(s, head, rec->buf, "245", 4);
+ //pull_relevance_field(s, head, rec->buf, "530", 1);
+ pull_relevance_field(s, head, rec->buf, "630", 1);
+ pull_relevance_field(s, head, rec->buf, "650", 1);
+ pull_relevance_field(s, head, rec->buf, "700", 1);
+ relevance_donerecord(s->relevance, head);
+}
+
struct record *ingest_record(struct target *t, char *buf, int len)
{
struct session *s = t->session;
struct record *res;
+ struct record *head;
const char *recbuf;
wrbuf_rewind(s->wrbuf);
recbuf = wrbuf_buf(s->wrbuf);
res = nmem_malloc(s->nmem, sizeof(struct record));
+ res->buf = nmem_strdup(s->nmem, recbuf);
+
+ extract_subject(s, res->buf);
- res->merge_key = extract_mergekey(s, recbuf);
+ res->title = extract_title(s, res->buf);
+ res->merge_key = extract_mergekey(s, res->buf);
if (!res->merge_key)
return 0;
- res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
res->target = t;
res->next_cluster = 0;
res->target_offset = -1;
+ res->term_frequency_vec = 0;
- yaz_log(YLOG_DEBUG, "Key: %s", res->merge_key);
+ head = reclist_insert(s->reclist, res);
- push_record(s, res);
+ pull_relevance_keys(s, head, res);
+
+ s->total_records++;
return res;
}
rec = ingest_record(t, buf, len);
if (!rec)
continue;
- yaz_log(YLOG_DEBUG, "Ingested a fooking record");
}
}
t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
t->state = Error;
}
- else
- {
- yaz_log(YLOG_DEBUG, "Got Records!");
- }
}
if (!*r->presentStatus && t->state != Error)
else
{
yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
- cs_close(t->link);
- t->state = Failed;
- iochan_destroy(i);
+ target_destroy(i);
+ return;
}
}
if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
&errlen) < 0 || errcode != 0)
{
- cs_close(t->link);
- iochan_destroy(i);
- t->state = Failed;
+ target_destroy(i);
return;
}
else
if (len < 0)
{
- cs_close(t->link);
- iochan_destroy(i);
- t->state = Failed;
+ target_destroy(i);
return;
}
if (len == 0)
{
- cs_close(t->link);
- iochan_destroy(i);
- t->state = Failed;
+ target_destroy(i);
return;
}
else if (len > 1)
odr_setbuf(t->odr_in, t->ibuf, len, 0);
if (!z_APDU(t->odr_in, &a, 0, 0))
{
- cs_close(t->link);
- iochan_destroy(i);
- t->state = Failed;
+ target_destroy(i);
return;
}
- yaz_log(YLOG_DEBUG, "Successfully decoded %d oct PDU", len);
switch (a->which)
{
case Z_APDU_initResponse:
break;
default:
yaz_log(YLOG_WARN, "Unexpected result from server");
- cs_close(t->link);
- iochan_destroy(i);
- t->state = Failed;
+ target_destroy(i);
return;
}
// if (cs_more(t->link))
// iochan_setevent(i, EVENT_INPUT);
}
else // we throw away response and go to idle mode
+ {
+ yaz_log(YLOG_DEBUG, "Ignoring result to previous operation");
t->state = Idle;
+ }
}
/* if len==1 we do nothing but wait for more input */
}
if (t->state == Idle)
{
- if (t->requestid != s->requestid) {
+ if (t->requestid != s->requestid && *s->query) {
send_search(i);
}
else if (t->hits > 0 && t->records < global_parameters.toget &&
}
}
+static void target_destroy(IOCHAN i)
+{
+ struct target *t = iochan_getdata(i);
+ struct session *s = t->session;
+ struct target **p;
+ assert(iochan_getfun(i) == handler);
+
+ yaz_log(YLOG_DEBUG, "Destroying target");
+
+ if (t->ibuf)
+ xfree(t->ibuf);
+ cs_close(t->link);
+ if (t->odr_in)
+ odr_destroy(t->odr_in);
+ if (t->odr_out)
+ odr_destroy(t->odr_out);
+ for (p = &s->targets; *p; p = &(*p)->next)
+ if (*p == t)
+ {
+ *p = (*p)->next;
+ break;
+ }
+ xfree(t);
+ iochan_destroy(i);
+}
+
int load_targets(struct session *s, const char *fn)
{
FILE *f = fopen(fn, "r");
return -1;
}
+ while (s->targets)
+ target_destroy(s->targets->iochan);
+
+ s->query[0] = '\0';
target_p = &s->targets;
while (fgets(line, 255, f))
{
target->state = Failed;
continue;
}
- new = iochan_create(cs_fileno(target->link), handler, 0);
+ target->iochan = new = iochan_create(cs_fileno(target->link), handler, 0);
+ assert(new);
iochan_setdata(new, target);
iochan_setevent(new, EVENT_EXCEPT);
new->next = channel_list;
return 0;
}
-void search(struct session *s, char *query)
+static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
+{
+ switch (n->kind)
+ {
+ case CCL_RPN_AND:
+ case CCL_RPN_OR:
+ case CCL_RPN_NOT:
+ case CCL_RPN_PROX:
+ pull_terms(nmem, n->u.p[0], termlist, num);
+ pull_terms(nmem, n->u.p[1], termlist, num);
+ break;
+ case CCL_RPN_TERM:
+ termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
+ break;
+ default: // NOOP
+ break;
+ }
+}
+
+// Extract terms from query into null-terminated termlist
+static int extract_terms(NMEM nmem, char *query, char **termlist)
+{
+ int error, pos;
+ struct ccl_rpn_node *n;
+ int num = 0;
+
+ n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
+ if (!n)
+ return -1;
+ pull_terms(nmem, n, termlist, &num);
+ termlist[num] = 0;
+ ccl_rpn_delete(n);
+ return 0;
+}
+
+char *search(struct session *s, char *query)
{
IOCHAN c;
int live_channels = 0;
}
if (live_channels)
{
+ char *p[512];
int maxrecs = live_channels * global_parameters.toget;
- if (!s->recheap_size)
- {
- s->recheap = xmalloc(maxrecs * sizeof(struct record *));
- s->recheap_size = maxrecs;
- }
- else if (s->recheap_size < maxrecs)
- {
- s->recheap = xrealloc(s->recheap, maxrecs * sizeof(struct record*));
- s->recheap_size = maxrecs;
- }
+ s->termlist = termlist_create(s->nmem, maxrecs, 15);
+ s->reclist = reclist_create(s->nmem, maxrecs);
+ extract_terms(s->nmem, query, p);
+ s->relevance = relevance_create(s->nmem, (const char **) p, maxrecs);
+ s->total_records = s->total_hits = 0;
}
- s->recheap_max = -1;
- s->recheap_scratch = -1;
+ else
+ return "NOTARGETS";
+
+ return 0;
}
struct session *new_session()
yaz_log(YLOG_DEBUG, "New pazpar2 session");
+ session->total_hits = 0;
+ session->total_records = 0;
+ session->termlist = 0;
+ session->reclist = 0;
session->requestid = -1;
session->targets = 0;
session->pqf_parser = yaz_pqf_create();
session->yaz_marc = yaz_marc_create();
yaz_marc_subfield_str(session->yaz_marc, "\t");
session->wrbuf = wrbuf_alloc();
- session->recheap = 0;
- session->recheap_size = 0;
return session;
}
return res;
}
-struct record **show(struct session *s, int start, int *num)
+struct termlist_score **termlist(struct session *s, int *num)
+{
+ return termlist_highscore(s->termlist, num);
+}
+
+struct record **show(struct session *s, int start, int *num, int *total, int *sumhits)
{
struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
int i;
- // FIXME -- skip initial records
+ relevance_prepare_read(s->relevance, s->reclist);
+
+ *total = s->reclist->num_records;
+ *sumhits = s->total_hits;
+
+ for (i = 0; i < start; i++)
+ if (!reclist_read_record(s->reclist))
+ {
+ *num = 0;
+ return 0;
+ }
for (i = 0; i < *num; i++)
{
- recs[i] = read_recheap(s);
- if (!recs[i])
+ struct record *r = reclist_read_record(s->reclist);
+ if (!r)
{
*num = i;
break;
}
+ recs[i] = r;
}
- rewind_recheap(s);
return recs;
}
default: break;
}
}
+ stat->num_hits = s->total_hits;
+ stat->num_records = s->total_records;
stat->num_connections = i;
}
+static CCL_bibset load_cclfile(const char *fn)
+{
+ CCL_bibset res = ccl_qual_mk();
+ if (ccl_qual_fname(res, fn) < 0)
+ {
+ yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
+ exit(1);
+ }
+ return res;
+}
+
int main(int argc, char **argv)
{
int ret;
yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
- while ((ret = options("c:h:", argv, argc, &arg)) != -2)
+ while ((ret = options("c:h:p:C:", argv, argc, &arg)) != -2)
{
switch (ret) {
case 0:
case 'c':
command_init(atoi(arg));
break;
+ case 'C':
+ global_parameters.ccl_filter = load_cclfile(arg);
+ break;
case 'h':
http_init(atoi(arg));
break;
+ case 'p':
+ http_set_proxyaddr(arg);
+ break;
default:
fprintf(stderr, "Usage: pazpar2 -d comport");
exit(1);
}
+ if (!global_parameters.ccl_filter)
+ global_parameters.ccl_filter = load_cclfile("default.bib");
+
event_loop(&channel_list);
return 0;