Add zebrasrv.pid
[idzebra-moved-to-github.git] / isamc / merge.c
1 /* $Id: merge.c,v 1.23 2003-06-23 15:36:11 adam Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23
24
25 #include <stdlib.h>
26 #include <assert.h>
27 #include <string.h>
28 #include <stdio.h>
29 #include <yaz/log.h>
30 #include "isamc-p.h"
31
32 struct isc_merge_block {
33     int offset;       /* offset in r_buf */
34     int block;        /* block number of file (0 if none) */
35     int dirty;        /* block is different from that on file */
36 };
37
38 #if 0
39 static void opt_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
40                         int last)
41 {
42     int i, no_dirty = 0;
43     for (i = 0; i<ptr; i++)
44         if (mb[i].dirty)
45             no_dirty++;
46     if (no_dirty*4 < ptr*3)
47         return;
48     /* bubble-sort it */
49     for (i = 0; i<ptr; i++)
50     {
51         int tmp, j, j_min = -1;
52         for (j = i; j<ptr; j++)
53         {
54             if (j_min < 0 || mb[j_min].block > mb[j].block)
55                 j_min = j;
56         }
57         assert (j_min >= 0);
58         tmp = mb[j_min].block;
59         mb[j_min].block = mb[i].block;
60         mb[i].block = tmp;
61         mb[i].dirty = 1;
62     }
63     if (!last)
64         mb[i].dirty = 1;
65 }
66 #endif
67
68 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
69                           char *r_buf, int *firstpos, int cat, int last,
70                           int *numkeys)
71 {
72     int i;
73
74     for (i = 0; i<ptr; i++)
75     {
76         /* consider this block number */
77         if (!mb[i].block) 
78         {
79             mb[i].block = isc_alloc_block (is, cat);
80             mb[i].dirty = 1;
81         }
82
83         /* consider next block pointer */
84         if (last && i == ptr-1)
85             mb[i+1].block = 0;
86         else if (!mb[i+1].block)       
87         {
88             mb[i+1].block = isc_alloc_block (is, cat);
89             mb[i+1].dirty = 1;
90             mb[i].dirty = 1;
91         }
92     }
93
94     for (i = 0; i<ptr; i++)
95     {
96         char *src;
97         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
98
99         assert (ssize);
100
101         /* skip rest if not dirty */
102         if (!mb[i].dirty)
103         {
104             assert (mb[i].block);
105             if (!*firstpos)
106                 *firstpos = mb[i].block;
107             if (is->method->debug > 2)
108                 logf (LOG_LOG, "isc: skip ptr=%d size=%d %d %d",
109                      i, ssize, cat, mb[i].block);
110             ++(is->files[cat].no_skip_writes);
111             continue;
112         }
113         /* write block */
114
115         if (!*firstpos)
116         {
117             *firstpos = mb[i].block;
118             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
119             ssize += ISAMC_BLOCK_OFFSET_1;
120
121             memcpy (src+sizeof(int)+sizeof(ssize), numkeys,
122                     sizeof(*numkeys));
123             if (is->method->debug > 2)
124                 logf (LOG_LOG, "isc: flush ptr=%d numk=%d size=%d nextpos=%d",
125                      i, *numkeys, (int) ssize, mb[i+1].block);
126         }
127         else
128         {
129             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
130             ssize += ISAMC_BLOCK_OFFSET_N;
131             if (is->method->debug > 2)
132                 logf (LOG_LOG, "isc: flush ptr=%d size=%d nextpos=%d",
133                      i, (int) ssize, mb[i+1].block);
134         }
135         memcpy (src, &mb[i+1].block, sizeof(int));
136         memcpy (src+sizeof(int), &ssize, sizeof(ssize));
137         isc_write_block (is, cat, mb[i].block, src);
138     }
139 }
140
141 static int get_border (ISAMC is, struct isc_merge_block *mb, int ptr,
142                        int cat, int firstpos)
143 {
144    /* Border set to initial fill or block size depending on
145       whether we are creating a new one or updating and old one.
146     */
147     
148     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
149                                is->method->filecat[cat].ifill;
150     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
151     
152     assert (ptr < 199);
153
154     return mb[ptr].offset + fill - off;
155 }
156
157 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I *data)
158 {
159
160     char i_item[128], *i_item_ptr;
161     int i_more, i_mode, i;
162
163     ISAMC_PP pp; 
164     char f_item[128], *f_item_ptr;
165     int f_more;
166     int last_dirty = 0;
167     int debug = is->method->debug;
168  
169     struct isc_merge_block mb[200];
170
171     int firstpos = 0;
172     int cat = 0;
173     char r_item_buf[128]; /* temporary result output */
174     char *r_buf;          /* block with resulting data */
175     int r_offset = 0;     /* current offset in r_buf */
176     int ptr = 0;          /* pointer */
177     void *r_clientData;   /* encode client data */
178     int border;
179     int numKeys = 0;
180
181     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
182     r_buf = is->merge_buf + 128;
183
184     pp = isc_pp_open (is, ipos);
185     /* read first item from file. make sure f_more indicates no boundary */
186     f_item_ptr = f_item;
187     f_more = isc_read_item (pp, &f_item_ptr);
188     if (f_more > 0)
189         f_more = 1;
190     cat = pp->cat;
191
192     if (debug > 1)
193         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
194
195     /* read first item from i */
196     i_item_ptr = i_item;
197     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
198
199     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
200     mb[ptr].dirty = 0;
201     mb[ptr].offset = 0;
202
203     border = get_border (is, mb, ptr, cat, firstpos);
204     while (i_more || f_more)
205     {
206         char *r_item = r_item_buf;
207         int cmp;
208
209         if (f_more > 1)
210         {
211             /* block to block boundary in the original file. */
212             f_more = 1;
213             if (cat == pp->cat) 
214             {
215                 /* the resulting output is of the same category as the
216                    the original 
217                 */
218                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
219                 {
220                     /* the resulting output block is too small/empty. Delete
221                        the original (if any)
222                     */
223                     if (debug > 3)
224                         logf (LOG_LOG, "isc: release A");
225                     if (mb[ptr].block)
226                         isc_release_block (is, pp->cat, mb[ptr].block);
227                     mb[ptr].block = pp->pos;
228                     if (!mb[ptr].dirty)
229                         mb[ptr].dirty = 1;
230                     if (ptr > 0)
231                         mb[ptr-1].dirty = 1;
232                 }
233                 else
234                 {
235                     /* indicate new boundary based on the original file */
236                     mb[++ptr].block = pp->pos;
237                     mb[ptr].dirty = last_dirty;
238                     mb[ptr].offset = r_offset;
239                     if (debug > 3)
240                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
241                             ptr, r_offset);
242                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
243                     {
244                         /* We are dealing with block(s) of max size. Block(s)
245                            except 1 will be flushed.
246                          */
247                         if (debug > 2)
248                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
249                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
250                                       0, &pp->numKeys);
251                         mb[0].block = mb[ptr-1].block;
252                         mb[0].dirty = mb[ptr-1].dirty;
253                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
254                                 mb[ptr].offset - mb[ptr-1].offset);
255                         mb[0].offset = 0;
256
257                         mb[1].block = mb[ptr].block;
258                         mb[1].dirty = mb[ptr].dirty;
259                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
260                         ptr = 1;
261                         r_offset = mb[ptr].offset;
262                     }
263                 }
264             }
265             border = get_border (is, mb, ptr, cat, firstpos);
266         }
267         last_dirty = 0;
268         if (!f_more)
269             cmp = -1;
270         else if (!i_more)
271             cmp = 1;
272         else
273             cmp = (*is->method->compare_item)(i_item, f_item);
274         if (cmp == 0)                   /* insert i=f */
275         {
276             if (!i_mode)   /* delete item? */
277             {
278                 /* move i */
279                 i_item_ptr = i_item;
280                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
281                                            &i_mode);
282                 /* is next input item the same as current except
283                    for the delete flag? */
284                 cmp = (*is->method->compare_item)(i_item, f_item);
285                 if (!cmp && i_mode)   /* delete/insert nop? */
286                 {
287                     /* yes! insert as if it was an insert only */
288                     memcpy (r_item, i_item, i_item_ptr - i_item);
289                     i_item_ptr = i_item;
290                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
291                                                 &i_mode);
292                 }
293                 else
294                 {
295                     /* no! delete the item */
296                     r_item = NULL;
297                     last_dirty = 1;
298                     mb[ptr].dirty = 2;
299                 }
300             }
301             else
302             {
303                 memcpy (r_item, f_item, f_item_ptr - f_item);
304
305                 /* move i */
306                 i_item_ptr = i_item;
307                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
308                                            &i_mode);
309             }
310             /* move f */
311             f_item_ptr = f_item;
312             f_more = isc_read_item (pp, &f_item_ptr);
313         }
314         else if (cmp > 0)               /* insert f */
315         {
316             memcpy (r_item, f_item, f_item_ptr - f_item);
317             /* move f */
318             f_item_ptr = f_item;
319             f_more = isc_read_item (pp, &f_item_ptr);
320         }
321         else                            /* insert i */
322         {
323             if (!i_mode)                /* delete item which isn't there? */
324             {
325                 logf (LOG_FATAL, "Inconsistent register at offset %d",
326                                  r_offset);
327                 abort ();
328             }
329             memcpy (r_item, i_item, i_item_ptr - i_item);
330             mb[ptr].dirty = 2;
331             last_dirty = 1;
332             /* move i */
333             i_item_ptr = i_item;
334             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
335                                         &i_mode);
336         }
337         if (r_item)  /* insert resulting item? */
338         {
339             char *r_out_ptr = r_buf + r_offset;
340             int new_offset;
341
342             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
343                                      &r_out_ptr, &r_item);
344             new_offset = r_out_ptr - r_buf; 
345
346             numKeys++;
347
348             if (border < new_offset && border >= r_offset)
349             {
350                 if (debug > 2)
351                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
352                 /* Max size of current block category reached ...
353                    make new virtual block entry */
354                 mb[++ptr].block = 0;
355                 mb[ptr].dirty = 1;
356                 mb[ptr].offset = r_offset;
357                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
358                 {
359                     /* We are dealing with block(s) of max size. Block(s)
360                        except one will be flushed. Note: the block(s) are
361                        surely not the last one(s).
362                      */
363                     if (debug > 2)
364                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
365                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
366                                   0, &pp->numKeys);
367                     mb[0].block = mb[ptr-1].block;
368                     mb[0].dirty = mb[ptr-1].dirty;
369                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
370                             mb[ptr].offset - mb[ptr-1].offset);
371                     mb[0].offset = 0;
372
373                     mb[1].block = mb[ptr].block;
374                     mb[1].dirty = mb[0].dirty;
375                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
376                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
377                             new_offset - r_offset);
378                     new_offset = (new_offset - r_offset) + mb[1].offset;
379                     ptr = 1;
380                 }
381                 border = get_border (is, mb, ptr, cat, firstpos);
382             }
383             r_offset = new_offset;
384         }
385         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
386         {
387             /* Max number blocks in current category reached ->
388                must switch to next category (with larger block size) 
389             */
390             int j = 0;
391
392             (is->files[cat].no_remap)++;
393             /* delete all original block(s) read so far */
394             for (i = 0; i < ptr; i++)
395                 if (mb[i].block)
396                     isc_release_block (is, pp->cat, mb[i].block);
397             /* also delete all block to be read in the future */
398             pp->deleteFlag = 1;
399
400             /* remap block offsets */
401             assert (mb[j].offset == 0);
402             cat++;
403             mb[j].dirty = 1;
404             mb[j].block = 0;
405             mb[ptr].offset = r_offset;
406             for (i = 1; i < ptr; i++)
407             {
408                 int border = is->method->filecat[cat].ifill -
409                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
410                 if (debug > 3)
411                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
412                 if (mb[i+1].offset > border && mb[i].offset <= border)
413                 {
414                     if (debug > 3)
415                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
416                     mb[++j].dirty = 1;
417                     mb[j].block = 0;
418                     mb[j].offset = mb[i].offset;
419                 }
420             }
421             if (debug > 2)
422                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
423                       ptr, j, cat);
424             ptr = j;
425             border = get_border (is, mb, ptr, cat, firstpos);
426             if (debug > 3)
427                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
428         }
429     }
430     if (mb[ptr].offset < r_offset)
431     {   /* make the final boundary offset */
432         mb[++ptr].dirty = 1; 
433         mb[ptr].block = 0; 
434         mb[ptr].offset = r_offset;
435     }
436     else
437     {   /* empty output. Release last block if any */
438         if (cat == pp->cat && mb[ptr].block)
439         {
440             if (debug > 3)
441                 logf (LOG_LOG, "isc: release C");
442             isc_release_block (is, pp->cat, mb[ptr].block);
443             mb[ptr].block = 0;
444             if (ptr > 0)
445                 mb[ptr-1].dirty = 1;
446         }
447     }
448
449     if (debug > 2)
450         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
451
452     if (firstpos)
453     {
454         /* we have to patch initial block with num keys if that
455            has changed */
456         if (numKeys != isc_pp_num (pp))
457         {
458             if (debug > 2)
459                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
460                                 firstpos, numKeys);
461             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
462                       sizeof(numKeys), &numKeys);
463         }
464     }
465     else if (ptr > 0)
466     {   /* we haven't flushed initial block yet and there surely are some
467            blocks to flush. Make first block dirty if numKeys differ */
468         if (numKeys != isc_pp_num (pp))
469             mb[0].dirty = 1;
470     }
471     /* flush rest of block(s) in r_buf */
472     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
473
474     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
475     if (!firstpos)
476         cat = 0;
477     if (debug > 1)
478         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
479     isc_pp_close (pp);
480     return cat + firstpos * 8;
481 }
482
483 /*
484  * $Log: merge.c,v $
485  * Revision 1.23  2003-06-23 15:36:11  adam
486  * Implemented isamb_unlink.
487  *
488  * Revision 1.22  2003/03/05 16:41:10  adam
489  * Fix GCC warnings
490  *
491  * Revision 1.21  2002/08/02 19:26:56  adam
492  * Towards GPL
493  *
494  * Revision 1.20  1999/11/30 13:48:04  adam
495  * Improved installation. Updated for inclusion of YAZ header files.
496  *
497  * Revision 1.19  1999/07/14 12:12:07  heikki
498  * Large-block isam-h  (may not work too well... Abandoning for isam-d)
499  *
500  * Revision 1.17  1999/07/13 14:22:17  heikki
501  * Better allocation strategy in isamh_merge
502  *
503  * Revision 1.16  1999/07/08 14:23:27  heikki
504  * Fixed a bug in isamh_pp_read and cleaned up a bit
505  *
506  * Revision 1.15  1999/07/07 09:36:04  heikki
507  * Fixed an assertion in isamh
508  *
509  * Revision 1.13  1999/07/06 09:37:05  heikki
510  * Working on isamh - not ready yet.
511  *
512  * Revision 1.12  1999/06/30 15:03:55  heikki
513  * first take on isamh, the append-only isam structure
514  *
515  * Revision 1.11  1999/05/26 07:49:14  adam
516  * C++ compilation.
517  *
518  * Revision 1.10  1998/03/19 12:22:09  adam
519  * Minor change.
520  *
521  * Revision 1.9  1998/03/19 10:04:38  adam
522  * Minor changes.
523  *
524  * Revision 1.8  1998/03/18 09:23:55  adam
525  * Blocks are stored in chunks on free list - up to factor 2 in speed.
526  * Fixed bug that could occur in block category rearrangemen.
527  *
528  * Revision 1.7  1998/03/11 11:18:18  adam
529  * Changed the isc_merge to take into account the mfill (minimum-fill).
530  *
531  * Revision 1.6  1998/03/06 13:54:03  adam
532  * Fixed two nasty bugs in isc_merge.
533  *
534  * Revision 1.5  1997/02/12 20:42:43  adam
535  * Bug fix: during isc_merge operations, some pages weren't marked dirty
536  * even though they should be. At this point the merge operation marks
537  * a page dirty if the previous page changed at all. A better approach is
538  * to mark it dirty if the last key written changed in previous page.
539  *
540  * Revision 1.4  1996/11/08 11:15:31  adam
541  * Number of keys in chain are stored in first block and the function
542  * to retrieve this information, isc_pp_num is implemented.
543  *
544  * Revision 1.3  1996/11/04 14:08:59  adam
545  * Optimized free block usage.
546  *
547  * Revision 1.2  1996/11/01 13:36:46  adam
548  * New element, max_blocks_mem, that control how many blocks of max size
549  * to store in memory during isc_merge.
550  * Function isc_merge now ignores delete/update of identical keys and
551  * the proper blocks are then non-dirty and not written in flush_blocks.
552  *
553  * Revision 1.1  1996/11/01  08:59:15  adam
554  * First version of isc_merge that supports update/delete.
555  *
556  */
557
558