2 * Copyright (C) 1994-2002, Index Data
4 * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
6 * $Id: kinput.c,v 1.50 2002-04-23 13:39:10 adam Exp $
9 * - Allocates a lot of memory for the merge process, but never releases it.
10 * Doesn't matter, as the program terminates soon after.
27 #define KEY_SIZE (1+sizeof(struct it_key))
28 #define INP_NAME_MAX 768
29 #define INP_BUF_START 60000
30 #define INP_BUF_ADD 400000
35 off_t offset; /* file offset */
36 unsigned char *buf; /* buffer block */
37 size_t buf_size; /* number of read bytes in block */
38 size_t chunk; /* number of bytes allocated */
39 size_t buf_ptr; /* current position in buffer */
40 char *prev_name; /* last word read */
41 int sysno; /* last sysno */
42 int seqno; /* last seqno */
43 off_t length; /* length of file */
44 /* handler invoked in each read */
45 void (*readHandler)(struct key_file *keyp, void *rinfo);
50 void getFnameTmp (Res res, char *fname, int no)
54 pre = res_get_def (res, "keyTmpDir", ".");
55 sprintf (fname, "%s/key%d.tmp", pre, no);
58 void extract_get_fname_tmp (ZebraHandle zh, char *fname, int no)
62 pre = res_get_def (zh->res, "keyTmpDir", ".");
63 sprintf (fname, "%s/key%d.tmp", pre, no);
66 void key_file_chunk_read (struct key_file *f)
68 int nr = 0, r = 0, fd;
70 getFnameTmp (f->res, fname, f->no);
71 fd = open (fname, O_BINARY|O_RDONLY);
77 logf (LOG_WARN|LOG_ERRNO, "cannot open %s", fname);
82 if ((f->length = lseek (fd, 0L, SEEK_END)) == (off_t) -1)
84 logf (LOG_WARN|LOG_ERRNO, "cannot seek %s", fname);
89 if (lseek (fd, f->offset, SEEK_SET) == -1)
91 logf (LOG_WARN|LOG_ERRNO, "cannot seek %s", fname);
95 while (f->chunk - nr > 0)
97 r = read (fd, f->buf + nr, f->chunk - nr);
104 logf (LOG_WARN|LOG_ERRNO, "read of %s", fname);
110 (*f->readHandler)(f, f->readInfo);
114 void key_file_destroy (struct key_file *f)
117 xfree (f->prev_name);
121 struct key_file *key_file_init (int no, int chunk, Res res)
125 f = (struct key_file *) xmalloc (sizeof(*f));
133 f->readHandler = NULL;
134 f->buf = (unsigned char *) xmalloc (f->chunk);
135 f->prev_name = (char *) xmalloc (INP_NAME_MAX);
136 *f->prev_name = '\0';
137 key_file_chunk_read (f);
141 int key_file_getc (struct key_file *f)
143 if (f->buf_ptr < f->buf_size)
144 return f->buf[(f->buf_ptr)++];
145 if (f->buf_size < f->chunk)
147 f->offset += f->buf_size;
148 key_file_chunk_read (f);
149 if (f->buf_ptr < f->buf_size)
150 return f->buf[(f->buf_ptr)++];
155 int key_file_decode (struct key_file *f)
159 c = key_file_getc (f);
166 d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
169 d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
170 d = (d << 8) + (key_file_getc (f) & 0xff);
173 d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
174 d = (d << 8) + (key_file_getc (f) & 0xff);
175 d = (d << 8) + (key_file_getc (f) & 0xff);
181 int key_file_read (struct key_file *f, char *key)
186 c = key_file_getc (f);
189 strcpy (key, f->prev_name);
198 while ((key[i++] = key_file_getc (f)))
200 strcpy (f->prev_name, key);
203 d = key_file_decode (f);
206 itkey.sysno = d + f->sysno;
209 f->sysno = itkey.sysno;
212 d = key_file_decode (f);
213 itkey.seqno = d + f->seqno;
214 f->seqno = itkey.seqno;
215 memcpy (key + i, &itkey, sizeof(struct it_key));
216 return i + sizeof (struct it_key);
221 struct key_file **file;
226 int (*cmp)(const void *p1, const void *p2);
227 struct zebra_register *reg;
236 struct heap_info *key_heap_init (int nkeys,
237 int (*cmp)(const void *p1, const void *p2))
239 struct heap_info *hi;
242 hi = (struct heap_info *) xmalloc (sizeof(*hi));
243 hi->info.file = (struct key_file **)
244 xmalloc (sizeof(*hi->info.file) * (1+nkeys));
245 hi->info.buf = (char **) xmalloc (sizeof(*hi->info.buf) * (1+nkeys));
247 hi->ptr = (int *) xmalloc (sizeof(*hi->ptr) * (1+nkeys));
249 for (i = 0; i<= nkeys; i++)
252 hi->info.buf[i] = (char *) xmalloc (INP_NAME_MAX);
257 hi->no_deletions = 0;
258 hi->no_insertions = 0;
259 hi->no_iterations = 0;
263 void key_heap_destroy (struct heap_info *hi, int nkeys)
266 yaz_log (LOG_LOG, "key_heap_destroy");
267 for (i = 0; i<=nkeys; i++)
268 xfree (hi->info.buf[i]);
270 xfree (hi->info.buf);
272 xfree (hi->info.file);
276 static void key_heap_swap (struct heap_info *hi, int i1, int i2)
281 hi->ptr[i1] = hi->ptr[i2];
286 static void key_heap_delete (struct heap_info *hi)
288 int cur = 1, child = 2;
290 assert (hi->heapnum > 0);
292 key_heap_swap (hi, 1, hi->heapnum);
294 while (child <= hi->heapnum) {
295 if (child < hi->heapnum &&
296 (*hi->cmp)(&hi->info.buf[hi->ptr[child]],
297 &hi->info.buf[hi->ptr[child+1]]) > 0)
299 if ((*hi->cmp)(&hi->info.buf[hi->ptr[cur]],
300 &hi->info.buf[hi->ptr[child]]) > 0)
302 key_heap_swap (hi, cur, child);
311 static void key_heap_insert (struct heap_info *hi, const char *buf, int nbytes,
316 cur = ++(hi->heapnum);
317 memcpy (hi->info.buf[hi->ptr[cur]], buf, nbytes);
318 hi->info.file[hi->ptr[cur]] = kf;
321 while (parent && (*hi->cmp)(&hi->info.buf[hi->ptr[parent]],
322 &hi->info.buf[hi->ptr[cur]]) > 0)
324 key_heap_swap (hi, cur, parent);
330 static int heap_read_one (struct heap_info *hi, char *name, char *key)
333 char rbuf[INP_NAME_MAX];
339 strcpy (name, hi->info.buf[n]);
340 kf = hi->info.file[n];
342 memcpy (key, hi->info.buf[n] + r+1, KEY_SIZE);
343 key_heap_delete (hi);
344 if ((r = key_file_read (kf, rbuf)))
345 key_heap_insert (hi, rbuf, r, kf);
350 struct heap_cread_info {
351 char prev_name[INP_NAME_MAX];
352 char cur_name[INP_NAME_MAX];
354 struct heap_info *hi;
359 int heap_cread_item (void *vp, char **dst, int *insertMode)
361 struct heap_cread_info *p = (struct heap_cread_info *) vp;
362 struct heap_info *hi = p->hi;
366 *insertMode = p->key[0];
367 memcpy (*dst, p->key+1, sizeof(struct it_key));
368 (*dst) += sizeof(struct it_key);
372 strcpy (p->prev_name, p->cur_name);
373 if (!(p->more = heap_read_one (hi, p->cur_name, p->key)))
375 if (*p->cur_name && strcmp (p->cur_name, p->prev_name))
380 *insertMode = p->key[0];
381 memcpy (*dst, p->key+1, sizeof(struct it_key));
382 (*dst) += sizeof(struct it_key);
386 int heap_inpc (struct heap_info *hi)
388 struct heap_cread_info hci;
389 ISAMC_I isamc_i = (ISAMC_I) xmalloc (sizeof(*isamc_i));
391 hci.key = (char *) xmalloc (KEY_SIZE);
394 hci.more = heap_read_one (hi, hci.cur_name, hci.key);
396 isamc_i->clientData = &hci;
397 isamc_i->read_item = heap_cread_item;
401 char this_name[INP_NAME_MAX];
402 ISAMC_P isamc_p, isamc_p2;
405 strcpy (this_name, hci.cur_name);
406 assert (hci.cur_name[1]);
408 if ((dict_info = dict_lookup (hi->reg->dict, hci.cur_name)))
410 memcpy (&isamc_p, dict_info+1, sizeof(ISAMC_P));
411 isamc_p2 = isc_merge (hi->reg->isamc, isamc_p, isamc_i);
415 if (!dict_delete (hi->reg->dict, this_name))
421 if (isamc_p2 != isamc_p)
422 dict_insert (hi->reg->dict, this_name,
423 sizeof(ISAMC_P), &isamc_p2);
428 isamc_p = isc_merge (hi->reg->isamc, 0, isamc_i);
430 dict_insert (hi->reg->dict, this_name, sizeof(ISAMC_P), &isamc_p);
438 /* for debugging only */
439 static void print_dict_item (ZebraMaps zm, const char *s)
442 char keybuf[IT_MAX_WORD+1];
444 const char *from = s + 2;
448 const char *res = zebra_maps_output (zm, reg_type, &from);
456 yaz_log (LOG_LOG, "%s", keybuf);
459 int heap_inpb (struct heap_info *hi)
461 struct heap_cread_info hci;
462 ISAMC_I isamc_i = (ISAMC_I) xmalloc (sizeof(*isamc_i));
464 hci.key = (char *) xmalloc (KEY_SIZE);
467 hci.more = heap_read_one (hi, hci.cur_name, hci.key);
469 isamc_i->clientData = &hci;
470 isamc_i->read_item = heap_cread_item;
474 char this_name[INP_NAME_MAX];
475 ISAMC_P isamc_p, isamc_p2;
478 strcpy (this_name, hci.cur_name);
479 assert (hci.cur_name[1]);
483 print_dict_item (hi->reg->zebra_maps, hci.cur_name);
485 if ((dict_info = dict_lookup (hi->reg->dict, hci.cur_name)))
487 memcpy (&isamc_p, dict_info+1, sizeof(ISAMC_P));
488 isamc_p2 = isamb_merge (hi->reg->isamb, isamc_p, isamc_i);
492 if (!dict_delete (hi->reg->dict, this_name))
498 if (isamc_p2 != isamc_p)
499 dict_insert (hi->reg->dict, this_name,
500 sizeof(ISAMC_P), &isamc_p2);
505 isamc_p = isamb_merge (hi->reg->isamb, 0, isamc_i);
507 dict_insert (hi->reg->dict, this_name, sizeof(ISAMC_P), &isamc_p);
515 int heap_inpd (struct heap_info *hi)
517 struct heap_cread_info hci;
518 ISAMD_I isamd_i = (ISAMD_I) xmalloc (sizeof(*isamd_i));
520 hci.key = (char *) xmalloc (KEY_SIZE);
523 hci.more = heap_read_one (hi, hci.cur_name, hci.key);
525 isamd_i->clientData = &hci;
526 isamd_i->read_item = heap_cread_item;
530 char this_name[INP_NAME_MAX];
531 ISAMD_P isamd_p, isamd_p2;
534 strcpy (this_name, hci.cur_name);
535 assert (hci.cur_name[1]);
537 if ((dict_info = dict_lookup (hi->reg->dict, hci.cur_name)))
539 memcpy (&isamd_p, dict_info+1, sizeof(ISAMD_P));
540 isamd_p2 = isamd_append (hi->reg->isamd, isamd_p, isamd_i);
544 if (!dict_delete (hi->reg->dict, this_name))
550 if (isamd_p2 != isamd_p)
551 dict_insert (hi->reg->dict, this_name,
552 sizeof(ISAMD_P), &isamd_p2);
557 isamd_p = isamd_append (hi->reg->isamd, 0, isamd_i);
559 dict_insert (hi->reg->dict, this_name, sizeof(ISAMD_P), &isamd_p);
566 int heap_inp (struct heap_info *hi)
569 char next_name[INP_NAME_MAX];
570 char cur_name[INP_NAME_MAX];
571 int key_buf_size = INP_BUF_START;
577 next_key = (char *) xmalloc (KEY_SIZE);
578 key_buf = (char *) xmalloc (key_buf_size);
579 more = heap_read_one (hi, cur_name, key_buf);
580 while (more) /* EOF ? */
583 key_buf_ptr = KEY_SIZE;
586 if (!(more = heap_read_one (hi, next_name, next_key)))
588 if (*next_name && strcmp (next_name, cur_name))
590 memcpy (key_buf + key_buf_ptr, next_key, KEY_SIZE);
591 key_buf_ptr += KEY_SIZE;
592 if (key_buf_ptr+(int) KEY_SIZE >= key_buf_size)
595 new_key_buf = (char *) xmalloc (key_buf_size + INP_BUF_ADD);
596 memcpy (new_key_buf, key_buf, key_buf_size);
597 key_buf_size += INP_BUF_ADD;
599 key_buf = new_key_buf;
603 nmemb = key_buf_ptr / KEY_SIZE;
604 assert (nmemb * (int) KEY_SIZE == key_buf_ptr);
605 if ((info = dict_lookup (hi->reg->dict, cur_name)))
607 ISAM_P isam_p, isam_p2;
608 memcpy (&isam_p, info+1, sizeof(ISAM_P));
609 isam_p2 = is_merge (hi->reg->isam, isam_p, nmemb, key_buf);
613 if (!dict_delete (hi->reg->dict, cur_name))
619 if (isam_p2 != isam_p)
620 dict_insert (hi->reg->dict, cur_name,
621 sizeof(ISAM_P), &isam_p2);
628 isam_p = is_merge (hi->reg->isam, 0, nmemb, key_buf);
629 dict_insert (hi->reg->dict, cur_name, sizeof(ISAM_P), &isam_p);
631 memcpy (key_buf, next_key, KEY_SIZE);
632 strcpy (cur_name, next_name);
637 int heap_inps (struct heap_info *hi)
639 struct heap_cread_info hci;
640 ISAMS_I isams_i = (ISAMS_I) xmalloc (sizeof(*isams_i));
642 hci.key = (char *) xmalloc (KEY_SIZE);
645 hci.more = heap_read_one (hi, hci.cur_name, hci.key);
647 isams_i->clientData = &hci;
648 isams_i->read_item = heap_cread_item;
652 char this_name[INP_NAME_MAX];
656 strcpy (this_name, hci.cur_name);
657 assert (hci.cur_name[1]);
659 if (!(dict_info = dict_lookup (hi->reg->dict, hci.cur_name)))
661 isams_p = isams_merge (hi->reg->isams, isams_i);
663 dict_insert (hi->reg->dict, this_name, sizeof(ISAMS_P), &isams_p);
667 logf (LOG_FATAL, "isams doesn't support this kind of update");
675 struct progressInfo {
682 void progressFunc (struct key_file *keyp, void *info)
684 struct progressInfo *p = (struct progressInfo *) info;
685 time_t now, remaining;
687 if (keyp->buf_size <= 0 || p->totalBytes <= 0)
691 if (now >= p->lastTime+10)
694 remaining = (time_t) ((now - p->startTime)*
695 ((double) p->totalBytes/p->totalOffset - 1.0));
696 if (remaining <= 130)
697 logf (LOG_LOG, "Merge %2.1f%% completed; %ld seconds remaining",
698 (100.0*p->totalOffset) / p->totalBytes, (long) remaining);
700 logf (LOG_LOG, "Merge %2.1f%% completed; %ld minutes remaining",
701 (100.0*p->totalOffset) / p->totalBytes, (long) remaining/60);
703 p->totalOffset += keyp->buf_size;
710 void zebra_index_merge (ZebraHandle zh)
712 struct key_file **kf;
715 struct heap_info *hi;
716 struct progressInfo progressInfo;
717 int nkeys = zh->reg->key_file_no;
725 extract_get_fname_tmp (zh, fname, nkeys+1);
726 if (access (fname, R_OK) == -1)
733 kf = (struct key_file **) xmalloc ((1+nkeys) * sizeof(*kf));
734 progressInfo.totalBytes = 0;
735 progressInfo.totalOffset = 0;
736 time (&progressInfo.startTime);
737 time (&progressInfo.lastTime);
738 for (i = 1; i<=nkeys; i++)
740 kf[i] = key_file_init (i, 8192, zh->res);
741 kf[i]->readHandler = progressFunc;
742 kf[i]->readInfo = &progressInfo;
743 progressInfo.totalBytes += kf[i]->length;
744 progressInfo.totalOffset += kf[i]->buf_size;
746 hi = key_heap_init (nkeys, key_qsort_compare);
749 for (i = 1; i<=nkeys; i++)
750 if ((r = key_file_read (kf[i], rbuf)))
751 key_heap_insert (hi, rbuf, r, kf[i]);
763 for (i = 1; i<=nkeys; i++)
765 extract_get_fname_tmp (zh, rbuf, i);
768 logf (LOG_LOG, "Iterations . . .%7d", hi->no_iterations);
769 logf (LOG_LOG, "Distinct words .%7d", hi->no_diffs);
770 logf (LOG_LOG, "Updates. . . . .%7d", hi->no_updates);
771 logf (LOG_LOG, "Deletions. . . .%7d", hi->no_deletions);
772 logf (LOG_LOG, "Insertions . . .%7d", hi->no_insertions);
773 zh->reg->key_file_no = 0;
775 key_heap_destroy (hi, nkeys);
776 for (i = 1; i<=nkeys; i++)
777 key_file_destroy (kf[i]);