[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: 20030731: Questions/issues with pqact




Steve,

If you are filing metars to daily files, and you have the decoders
frequently exiting and restarting, then I would check to see if the
problem isn't that the decoder output to disk is slow and therefore
the reading out of the pipe is not fast enough. I have all decoders running
here, and we have mcidas and netcdf decoders running as well, and I don't
see the number of pipes causing decoders to start and stop that frequently.
In fact, my dcgrib2 action will stay running for days (since there is almost
always grib data arriving on CONDUIT or NOAAPORT, NOGAPS, CMC etc).

Have you noticed pqact running slowly? If you issue 2 "kill -USR2" commands
to the pqact process, it will report the delay that pqact is experiencing
between the time a product arrives in the queue and when pqact gets around to
processing it.

I have 5 pqact processes running out of ldmd.conf since we have a lot of FILE
actions that would otherwise have slowed down on disk IO. It also helps separate
out the pqact.xxxxx enties for cutting distributions.

Steve Chiswell


On Thu, 31 Jul 2003, Steve Emmerson wrote:

> Steven,
>
> >Date: Thu, 31 Jul 2003 11:23:40 -0500
> >From: Steven Danz <address@hidden>
> >Organization: Aviation Weather Center
> >To: address@hidden
> >Subject: Questions/issues with pqact
>
> The above message contained the following:
>
> > I've run across an interesting issue with pqact, and while I don't
> > have a solution, I have a minor (maybe) change for pqact and some
> > questions (of course).
> >
> > What I'm seeing.  If I watch the logs for the gempak decoders I'm
> > running I'm seeing the metar, taf, etc (actually all of them) decoders
> > start/stop/start every minute or so.  While things are thrashing
> > starting/stopping isn't great it shouldn't break anything.  The
> > problem that is occurring is that every once in awhile a new decoder
> > starts up before the old decoder stops which corrupts the output file.
> >
> > First, why the trashing. In its current form, the filel.c only allows
> > for 32 open files or pipes. When there are a large number of files
> > and pipes in use (which there is in my case, I have tons of files
> > being written), there is the potential for thrashing the pipes as
> > the decoders are shutdown and restarted. Why not use code similar to
> > action.c to determine the number of possible open files by asking the
> > OS, and then dropping off some for std[in/out/err], shared memory,
> > rpc.  Dividing by 2 seems a bit pessimistic, but an option. I've
> > attached a filel.c (from 5.2.2, 6.1.14 has the same issue) where I'm
> > trying this and it seems to work.
>
> Good points.  Unfortunately, I haven't worked with pqact(1) yet, so
> I don't yet know what to say.  I'll put this on my list of things to
> investigate.
>
> One thing springs to mind: if I put the LDM package on SourceForge,
> would you be willing to be a developer?
>
> > Second, why the startup of a decoder before the old one shuts down? I
> > haven't had a chance to work my way through the code yet, but I'm
> > wondering if the mechanism used to close the pipe wait until the
> > application at the other end is completely closed, or does it just
> > close the pipe at the pqact end and move on, not waiting for the
> > process to terminate?
>
> I'm afraid I don't yet know the answer.
>
> > As a work around I'm thinking of running two pqacts, one with a
> > configuration with just pipes and execs, the other with all the files.
> > Is this supported?
>
> Yes.  In fact, that's typically the way the GEMPAK and McIDAS decoders
> work: they have their own pqact(1) configuration-files.
>
> > I wasn't sure if running multiple pqacts on the
> > same queue would work or not.
>
> Shouldn't be a problem.
>
> > Thanks for your time
> >
> > Steven
> >
> > --
> > Steven Danz
> > Senior Software Development Engineer
> > Aviation Weather Center (NOAA/NWS/NCEP)
> > 7220 NW 101st Terrace, Room 101
> > Kansas City, MO 64153-2371
> >
> > Email: address@hidden
> > Phone: 816.584.7251
> > Fax:   816.880.0650
> > URL:   http://aviationweather.gov/
> >
> > The opinions expressed in this message do not necessarily reflect those
> > of the National Weather Service, or the Aviation Weather Center.
>
> Regards,
> Steve Emmerson
>
> > --------------070609090501010602000202
> > Content-Type: text/plain;
> >  name="filel.c"
> > Content-Transfer-Encoding: 7bit
> > Content-Disposition: inline;
> >  filename="filel.c"
> >
> > /*
> >  *   Copyright 1993, University Corporation for Atmospheric Research
> >  *   See ../COPYRIGHT file for copying and redistribution conditions.
> >  */
> > /* $Id: filel.c,v 1.2 2003/07/31 05:07:55 sdanz Exp $ */
> >
> > /* #define _POSIX_SOURCE */
> >
> > #include <features.h>
> > #include <ldmconfig.h>
> > #include <stdio.h>
> > #include <assert.h>
> > #include <string.h>
> > #include <ctype.h>
> > #include <limits.h> /* PATH_MAX */
> > #ifndef PATH_MAX
> > #define PATH_MAX 255
> > #endif /* !PATH_MAX */
> > #include <sys/types.h>
> > #include <sys/stat.h>
> > #include <fcntl.h> /* O_RDONLY et al */
> > #include <unistd.h> /* access, lseek */
> > #include <signal.h>
> > #include <errno.h>
> >
> > #if defined(_AIX) && !defined(NO_WAITPID)
> > /*
> >  * Use POSIX wait macros, not _BSD
> >  */
> > #define _H_M_WAIT
> > #endif
> > #include <sys/wait.h>
> >
> >
> > #include "filel.h"
> > #include "action.h"
> > #include "ldm.h"
> > #include "ldmalloc.h"
> > #include "mkdirs_open.h"
> > #include "ulog.h"
> > #include "pbuf.h"
> >
> > /*
> >  * Defined in pqcat.c
> >  */
> > extern int pipe_timeo;
> >
> > #ifndef NO_DB
> >
> > /*
> >  * Define DB_XPROD non zero if you want the whole "product" data
> >  * structure put into the database, otherwise just the data goes in.
> >  * Be sure this is consistant with ../dbcat and whatever else used
> >  * to read the files.
> >  */
> > # ifndef DB_XPROD
> > # define DB_XPROD 1
> > # endif
> >
> > /*
> >  * Backward compatibility.
> >  * If you want to use gdbm interfaces, define USE_GDBM
> >  */
> > # ifdef USE_GDBM
> > # include "gdbm.h"
> > # else
> > # include <ndbm.h>
> > # endif
> >
> > #endif /* !NO_DB */
> >
> > /*
> >  * Tuning parameter: MAXENTRIES is the number of descriptors
> >  * you wish to allocate to this list. You need to leave enough around
> >  * for the error output and for rpc.
> >  */
> > /* #define MAXENTRIES OPEN_MAX/2 */
> > #define MAXENTRIES 32
> >
> >
> > /*
> >  *
> >  */
> > typedef enum {
> >     FT_NONE = 0,
> >     UNIXIO,
> >     STDIO,
> >     PIPE,
> >     FT_DB
> > } ft_t ;
> >
> >
> > union f_handle {
> >     int fd;
> >     FILE *stream;
> >     pbuf *pbuf;
> > #ifndef NO_DB
> > # ifdef USE_GDBM
> >     GDBM_FILE db;
> > # else
> >     DBM *db;
> > # endif
> > #endif /*!NO_DB*/
> > };
> > typedef union f_handle f_handle;
> >
> >
> > struct fl_entry {
> >     int flags;
> >     ft_t type;
> >     struct fl_entry *next;
> >     struct fl_entry *prev;
> >     struct fl_ops *ops;
> >     f_handle handle;
> >     long serial;    /* a serial number for this connection */
> >     unsigned long private;          /* pid, hstat *, read/write flg */
> >     char path[PATH_MAX+1];
> > };
> > typedef struct fl_entry fl_entry;
> >
> > #if defined(__cplusplus) || defined(__STDC__)
> > struct fl_ops {
> >     int (*cmp)(fl_entry *, int, char**);
> >     int (*open)(fl_entry *, int, char**);
> >     void (*close)(fl_entry *);
> >     int (*sync)(fl_entry *, int);
> >     int (*put)(fl_entry *, const char *,
> >             const void *, size_t );
> > };
> > #else /* Old Style C */
> > struct fl_ops {
> >     int (*cmp)();
> >     int (*open)();
> >     void (*close)();
> >     int (*sync)();
> >     int (*dbufput)();
> > };
> > #endif
> >
> >
> > /*
> >  * the one global list of of open files
> >  */
> > static struct fl {
> >     int size;
> >     fl_entry *head;
> >     fl_entry *tail;
> > } thefl[] = {
> >     0 ,
> >     NULL ,
> >     NULL
> > };
> >
> >
> > #define TO_HEAD(entry) \
> >     if(thefl->head != entry) to_head(entry)
> >
> > static void
> > to_head(fl_entry *entry)
> > {
> >     if(thefl->head == entry)
> >             return;
> >
> >     if(entry->prev != NULL)
> >             entry->prev->next = entry->next;
> >     if(entry->next != NULL)
> >             entry->next->prev = entry->prev;
> >
> >     if(thefl->head != NULL)
> >             thefl->head->prev = entry;
> >     if(thefl->tail == entry)
> >             thefl->tail = entry->prev;
> >
> >     entry->next = thefl->head;
> >     entry->prev = NULL;
> >     thefl->head = entry;
> >     if(thefl->tail == NULL)
> >             thefl->tail = entry;
> > }
> >
> >
> > static void
> > free_fl_entry(fl_entry *entry)
> > {
> >     if(entry == NULL) return;
> >
> >     if(entry->ops != NULL)
> >     {
> >             entry->ops->close(entry);
> >     }
> >     free(entry);
> > }
> >
> >
> > /*
> >  * generate a serial number, used to validate
> >  * Action cache entries.
> >  */
> > static long
> > newSerial(void)
> > {
> >     struct timeval now;
> >     if( gettimeofday(&now, NULL) < 0)
> >     {
> >             serror("newSerial: gettimeofday");
> >             return ((long) rand() );
> >     }
> >     return ( ((now.tv_sec % 1000000) * 1000) + (now.tv_usec / 1000));
> > }
> >
> > /* forward reference */
> > static fl_entry * new_fl_entry(ft_t type, int argc, char **argv);
> >
> > #ifdef FL_DEBUG
> > static void
> > dump_fl(void)
> > {
> >     fl_entry *entry;
> >     int fd;
> >
> >     udebug("      thefl->size %d", thefl->size);
> >     for(entry = thefl->head; entry != NULL;
> >                     entry = entry->next )
> >     {
> >             switch (entry->type) {
> >             case UNIXIO :
> >                     fd = entry->handle.fd;
> >                     break;
> >             case STDIO :
> >                     fd = entry->handle.stream == NULL
> >                             ? -1 : fileno(entry->handle.stream);
> >                     break;
> >             case PIPE :
> >                     fd = entry->handle.pbuf == NULL
> >                             ? -1 : entry->handle.pbuf->pfd;
> >                     break;
> >             case FT_DB :
> > #ifndef NO_DB
> >                     fd = entry->handle.db == NULL
> >                             ? -1 : -2;
> >                     break;
> > #endif /* !NO_DB */
> >             default :
> >                     fd = -2;
> >             }
> >             udebug("       %d %s", fd, entry->path);
> >     }
> > }
> > #endif
> >
> > static fl_entry *
> > lookup_fl_entry(ft_t type, int argc, char **argv)
> > {
> >     fl_entry *entry = NULL;
> >
> >     for(entry = thefl->head; entry != NULL;
> >                     entry = entry->next )
> >     {
> >             if(entry->type == type &&
> >                             entry->ops->cmp(entry, argc, argv) == 0)
> >                     break;
> >     }
> >     return entry;
> > }
> >
> >
> > static void
> > delete_entry(fl_entry *entry)
> > {
> >     /* assert(thefl->size >= 1); */
> >     if(entry == NULL) return;
> >
> >     if(entry->prev != NULL)
> >             entry->prev->next = entry->next;
> >     if(entry->next != NULL)
> >             entry->next->prev = entry->prev;
> >     if(thefl->head == entry)
> >             thefl->head = entry->next;
> >     if(thefl->tail == entry)
> >             thefl->tail = entry->prev;
> >     thefl->size--;
> >
> >     free_fl_entry(entry);
> > }
> >
> >
> > /*
> >  * sync up to nentries entries, tail to head.
> >  */
> > void
> > fl_sync(int nentries,
> >      int block) /* bool_t, FALSE => nonblocking */
> > {
> >     fl_entry *entry, *prev;
> >
> > /*  udebug("  fl_sync"); */
> >
> >     if(thefl->size <= 0)
> >             return;
> >     if(nentries == -1)      /* sync everyone */
> >             nentries = thefl->size;
> >
> >     for(entry = thefl->tail;
> >             entry != NULL && nentries >= 0; entry = prev, nentries--)
> >     {
> >             prev = entry->prev;
> >             if(entry->flags & FL_NEEDS_SYNC)
> >             {
> >                     if(entry->ops->sync(entry, block) == -1)
> >                             delete_entry(entry);
> >             }
> >     }
> > }
> >
> >
> > /*
> >  * close the "least recently used" entry
> >  */
> > void
> > close_lru(int skipflags)
> > {
> >     fl_entry *entry, *prev;
> >
> >     if(thefl->size <= 0)
> >             return;
> >     entry = thefl->tail;
> >
> >
> >     for(entry = thefl->tail;
> >             entry != NULL; entry = prev)
> >     {
> >             prev = entry->prev;
> >             /* twisted logic */
> >             if(entry->flags & skipflags)
> >                     continue;
> >             /* else */
> >     /*      udebug("   close_lru: %s", entry->path); */
> >             delete_entry(entry);
> >             return;
> >     }
> > }
> >
> >
> > void
> > fl_close_all(void)
> > {
> >     while(thefl->size > 0)
> >     {
> >             close_lru(0);
> >     }
> > }
> >
> >
> > /*
> >  * Look for an fl_entry in the list.
> >  * If there isn't one there that matches what you need, make a new one.
> >  */
> > static fl_entry *
> > get_fl_entry(ft_t type, int argc, char **argv)
> > {
> >     fl_entry *entry;
> >
> >     static int open_max = 0; /* number of descriptors */
> >     if (!open_max)
> >     {
> > #ifdef _SC_OPEN_MAX
> >             open_max = (int) sysconf(_SC_OPEN_MAX);
> >         /* Hold some aside for std[in/out/err], shm handles and libraries */
> >         if (open_max > 32) {
> >             open_max = open_max - 16;
> >         } else {
> >             open_max = open_max/2;
> >         }
> > #else
> >             open_max = 32; /* punt */
> > #endif
> >     }
> >
> >     entry = lookup_fl_entry(type, argc, argv);
> >     if( entry != NULL )
> >     {
> >             TO_HEAD(entry);
> > #ifdef FL_DEBUG
> >             dump_fl();
> > #endif
> >             return entry;
> >     }
> >     /* else */
> >
> >     if(thefl->size >= open_max)
> >             close_lru(0);
> >
> >     entry = new_fl_entry(type, argc, argv);
> >     if( entry == NULL )
> >     {
> >             return NULL; /* malloc or open failed */
> >     }
> >
> >     /* to front */
> >     if(thefl->head != NULL)
> >             thefl->head->prev = entry;
> >     entry->next = thefl->head;
> >     entry->prev = NULL;
> >     thefl->head = entry;
> >     if(thefl->tail == NULL)
> >             thefl->tail = entry;
> >     thefl->size++;
> >
> > #ifdef FL_DEBUG
> >     dump_fl();
> > #endif
> >     return entry;
> > }
> >
> >
> > static int
> > atFinishedArgs(int ac,
> >     char *av[],
> >     fl_entry *entry)
> > {
> >     int status = 0;
> >     int syncflag = 0;
> >     int closeflag = 0;
> >     for(; ac > 1 && *av[0] == '-'; ac-- , av++)
> >     {
> >             if( strncmp(*av,"-close",3) == 0)
> >             {
> >                     closeflag = !0;
> >             }
> >             else if( strncmp(*av,"-flush",3) == 0)
> >             {
> >                     syncflag = !0;
> >             }
> >     }
> >     if(syncflag)
> >             status = (*entry->ops->sync)(entry, syncflag);
> >     if(closeflag)
> >             delete_entry(entry);
> >     return status;
> > }
> >
> >
> > /*
> >  * Given a dbuf, return a copy with the non '\n'
> >  * control characters removed.
> >  * Remember to free the result.
> >  */
> > static void *
> > dupstrip(const void *in, size_t len, size_t *outlenp)
> > {
> >     void *out;
> >     size_t blen;
> >     const unsigned char *ip;
> >     char *op;
> >
> >     if(in == NULL || len == 0)
> >             return NULL;
> >
> >     out = malloc(len);
> >     if(out == NULL)
> >     {
> >             serror("dupstrip: malloc %ld failed", (long) len);
> >             return NULL;
> >     }
> >
> >     for(blen = len, ip = in, op = out, *outlenp = 0;
> >             blen != 0; blen--, ip++)
> >     {
> >             if(((int)*ip) > 127
> >                             || (iscntrl(*ip) && *ip != '\n'))
> >                     continue;
> >             /* else */
> >             *op++ = *ip;
> >             (*outlenp)++;
> >     }
> >
> >     return out;
> > }
> >
> >
> > /* Begin UNIXIO */
> > static int
> > str_cmp( fl_entry *entry, int argc, char **argv)
> > {
> >     char *path;
> >
> >     assert(argc > 0);
> >     assert(argv[argc -1] != NULL);
> >     assert(*argv[argc -1] != 0);
> >
> >     path = argv[argc-1];
> >     return(strcmp(path, entry->path));
> > }
> >
> >
> > static int
> > unio_open(fl_entry *entry, int ac, char **av)
> > {
> >     char *path;
> >     int flags = (O_WRONLY|O_CREAT);
> >
> >     assert(ac > 0);
> >     assert(av[ac -1] != NULL);
> >     assert(*av[ac -1] != 0);
> >
> >     entry->handle.fd = -1;
> >
> >     for(; ac > 1 && *av[0] == '-'; ac-- , av++)
> >     {
> >             if( strncmp(*av,"-overwrite",3) == 0)
> >             {
> >                     flags |= O_TRUNC;
> >             }
> >             else if( strncmp(*av,"-strip",3) == 0)
> >             {
> >                     entry->flags |= FL_STRIP;
> >             }
> >     }
> >
> >     path = av[ac-1];
> >
> >     entry->handle.fd = mkdirs_open(path, flags, 0666);
> >     if(entry->handle.fd == -1)
> >     {
> >             if(errno == EMFILE)
> >             {
> >                     /* Too many open files */
> >                     close_lru(0);
> >                     close_lru(0);
> >             }
> >             serror("unio_open: %s", path);
> >             return -1;
> >     }
> >     if(!(flags & O_TRUNC))
> >             if(lseek(entry->handle.fd, 0, SEEK_END) < 0)
> >                     serror("unio_open:lseek: %s", path);
> >                             /* fatal ? what about devices? */
> >
> >     strncpy(entry->path, path, PATH_MAX);
> >     entry->path[PATH_MAX] = 0; /* just in case */
> >     udebug("    unio_open: %d", entry->handle.fd);
> >     return entry->handle.fd;
> > }
> >
> >
> > static void
> > unio_close(fl_entry *entry)
> > {
> >     udebug("    unio_close: %d", entry->handle.fd);
> >     if(entry->handle.fd != -1)
> >     {
> >             if(close(entry->handle.fd) == -1)
> >             {
> >                     serror("close: %s", entry->path);
> >             }
> >     }
> >     entry->path[0] = 0;
> >     entry->handle.fd = -1;
> > }
> >
> >
> > static int
> > unio_sync(fl_entry *entry, int block)
> > {
> >     /*
> >      * Some systems may not have an fsync(2) call.
> >      * The best you can do then would be to make this
> >      * routine a noop which returns 0.
> >      */
> >     int status  = 0;
> >     udebug("    unio_sync: %d %s",
> >             entry->handle.fd, block ? "" : "non-block");
> >     if(block)
> >     {
> > #ifndef NO_FSYNC
> >             if(entry->handle.fd != -1)
> >                     status = fsync(entry->handle.fd);
> >             if(status == -1)
> >             {
> >                     serror("fsync: %s", entry->path);
> >             }
> > #endif
> >             entry->flags &= ~FL_NEEDS_SYNC;
> >     }
> >     return status;
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > unio_put(fl_entry *entry, const char *ignored,
> >             const void *data, size_t sz)
> > {
> >     int nwrote;
> >
> >     TO_HEAD(entry);
> >     udebug("    unio_dbufput: %d", entry->handle.fd);
> >
> >     nwrote = (int) write(entry->handle.fd, data, sz);
> >     if(nwrote != sz)
> >     {
> >             serror("unio_put: %s write error",
> >                     entry->path);
> >             /* don't waste time syncing an errored entry */
> >             entry->flags &= ~FL_NEEDS_SYNC;
> >             delete_entry(entry);
> >             return -1;
> >     }
> >     /* else */
> >     entry->flags |= FL_NEEDS_SYNC;
> >     return 0;
> > }
> >
> >
> > static struct fl_ops unio_ops = {
> >     str_cmp,
> >     unio_open,
> >     unio_close,
> >     unio_sync,
> >     unio_put,
> > };
> >
> >
> > /*ARGSUSED*/
> > int
> > unio_prodput(const product *prodp, int argc, char **argv,
> >             const void *ignored, size_t also_ignored)
> > {
> >     int status = 0;
> >     void *data = prodp->data;
> >     size_t sz = prodp->info.sz;
> >     fl_entry *entry = get_fl_entry(UNIXIO, argc, argv);
> >
> >     udebug("    unio_prodput: %d %s",
> >             entry == NULL ? -1 : entry->handle.fd , prodp->info.ident);
> >     if(entry == NULL)
> >             return -1;
> >
> >     if(entry->flags & FL_STRIP)
> >     {
> >             data = dupstrip(prodp->data, prodp->info.sz, &sz);
> >             if(data == NULL)
> >                     return -1;
> >     }
> >
> >     status = unio_put(entry, prodp->info.ident, data, sz);
> >     if(data != prodp->data)
> >             free(data);
> >     if(status != -1)
> >             status = atFinishedArgs(argc, argv, entry);
> >     return status;
> > }
> >
> >
> > /* End UNIXIO */
> >
> > /* Begin STDIO */
> > static int
> > stdio_open(fl_entry *entry, int ac, char **av)
> > {
> >     char *path;
> >     int flags = (O_WRONLY|O_CREAT);
> >     int fd;
> >     char *mode = "a";
> >     /* extern FILE *fdopen(int, const char *); */
> >
> >     assert(ac > 0);
> >     assert(av[ac -1] != NULL);
> >     assert(*av[ac -1] != 0);
> >
> >     entry->handle.stream = NULL;
> >
> >     for(; ac > 1 && *av[0] == '-'; ac-- , av++)
> >     {
> >             if( strncmp(*av,"-overwrite",3) == 0)
> >             {
> >                     flags |= O_TRUNC;
> >                     mode = "w";
> >             }
> >             else if( strncmp(*av,"-strip",3) == 0)
> >             {
> >                     entry->flags |= FL_STRIP;
> >             }
> >     }
> >
> >     path = av[ac-1];
> >
> >     fd = mkdirs_open(path, flags, 0666);
> >     if(fd == -1)
> >     {
> >             if(errno == EMFILE)
> >             {
> >                     /* Too many open files */
> >                     close_lru(0);
> >                     close_lru(0);
> >             }
> >             serror("mkdirs_open: %s", path);
> >             return -1;
> >     }
> >     entry->handle.stream = fdopen(fd, mode);
> >     if(entry->handle.stream == NULL)
> >     {
> >             serror("fdopen: %s", path);
> >             return -1;
> >     }
> >     strncpy(entry->path, path, PATH_MAX);
> >     entry->path[PATH_MAX] = 0; /* just in case */
> >     udebug("    stdio_open: %d", fileno(entry->handle.stream));
> >     return fileno(entry->handle.stream);
> > }
> >
> >
> > static void
> > stdio_close(fl_entry *entry)
> > {
> >     udebug("    stdio_close: %d",
> >             entry->handle.stream ? fileno(entry->handle.stream) : -1);
> >     if(entry->handle.stream != NULL)
> >     {
> >             if(fclose(entry->handle.stream) == EOF)
> >             {
> >                     serror("fclose: %s", entry->path);
> >             }
> >     }
> >     entry->path[0] = 0;
> >     entry->handle.stream = NULL;
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > stdio_sync(fl_entry *entry, int block)
> > {
> >     int status  = 0;
> >     udebug("    stdio_sync: %d",
> >             entry->handle.stream ? fileno(entry->handle.stream) : -1);
> >     if(fflush(entry->handle.stream) == EOF)
> >     {
> >             serror("fflush: %s", entry->path);
> >             status = -1;
> >     }
> >     entry->flags &= ~FL_NEEDS_SYNC;
> >     return status;
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > stdio_put(fl_entry *entry, const char *ignored,
> >             const void *data, size_t sz)
> > {
> >     int nwrote;
> >
> >     TO_HEAD(entry);
> >     udebug("    stdio_dbufput: %d", fileno(entry->handle.stream));
> >
> >     /* else */
> >     nwrote = (int) fwrite(data, sz, 1,
> >                      entry->handle.stream);
> >     if(nwrote != 1)
> >     {
> >             serror("stdio_put: %s fwrite error",
> >                     entry->path);
> >             /* don't waste time syncing an errored entry */
> >             entry->flags &= ~FL_NEEDS_SYNC;
> >             delete_entry(entry);
> >             return -1;
> >     }
> >     /* else */
> >     entry->flags |= FL_NEEDS_SYNC;
> >     return 0;
> > }
> >
> >
> > static struct fl_ops stdio_ops = {
> >     str_cmp,
> >     stdio_open,
> >     stdio_close,
> >     stdio_sync,
> >     stdio_put,
> > };
> >
> >
> > /*ARGSUSED*/
> > int
> > stdio_prodput(const product *prodp, int argc, char **argv,
> >             const void *ignored, size_t also_ignored)
> > {
> >     int status = 0;
> >     void *data = prodp->data;
> >     size_t sz = prodp->info.sz;
> >     fl_entry *entry = get_fl_entry(STDIO, argc, argv);
> >
> >     udebug("    stdio_prodput: %d %s",
> >             entry == NULL ? -1 :
> >                      fileno(entry->handle.stream) , prodp->info.ident);
> >     if(entry == NULL)
> >             return -1;
> >
> >     if(entry->flags & FL_STRIP)
> >     {
> >             data = dupstrip(prodp->data, prodp->info.sz, &sz);
> >             if(data == NULL)
> >                     return -1;
> >     }
> >
> >     status = stdio_put(entry, prodp->info.ident, data, sz);
> >     if(data != prodp->data)
> >             free(data);
> >     if(status != -1)
> >             status = atFinishedArgs(argc, argv, entry);
> >     return status;
> > }
> >
> > /* End STDIO */
> >
> >
> > /* Begin PIPE */
> > static int
> > argcat(char *buf, int len, int argc, char **argv)
> > {
> >     int cnt = 0;
> >     char *cp;
> >
> >     while(argc-- > 0 && (cp = *argv++) != NULL)
> >     {
> >             while(*cp != 0)
> >             {
> >                     buf[cnt++] = *cp++;
> >                     if(cnt >= len)
> >                             break;
> >             }
> >     }
> >     buf[cnt] = 0;
> >     return cnt;
> > }
> >
> >
> > static int
> > argcat_cmp(fl_entry *entry, int argc, char **argv)
> > {
> >     char buf[PATH_MAX+1];
> >
> >     assert(argc > 0);
> >     assert(argv[0] != NULL);
> >     assert(*argv[0] != 0);
> >
> >     argcat(buf, sizeof(buf), argc, argv);
> >     return(strcmp(buf, entry->path));
> > }
> >
> >
> > /*
> >  * Set to non-root privilege if possible.
> >  * Do it in such a way that it is safe to fork.
> >  * TODO: this is duplicated from ../server/priv.c
> >  */
> > void
> > endpriv(void)
> > {
> >     const uid_t euid = geteuid();
> >     const uid_t uid = getuid();
> >
> >     /* if either euid or uid is unprivileged, use it */
> >     if(euid > 0)
> >             setuid(euid);
> >     else if(uid > 0)
> >             setuid(uid);
> >
> >     /* else warn??? or set to nobody??? */
> > }
> >
> >
> > static int
> > pipe_open(fl_entry *entry, int argc, char **argv)
> > {
> >     int ac = argc;
> >     char **av = argv;
> >     int pfd[2];
> >     pid_t pid;
> >
> >     assert(argc >= 1);
> >     assert(argv[0] != NULL && *argv[0] != 0);
> >
> >     entry->handle.pbuf = NULL;
> >
> >     entry->flags |= FL_NOTRANSIENT;
> >     /* handle any options */
> >     for(; ac > 1 && *av[0] == '-'; ac-- , av++)
> >     {
> >             if( strncmp(*av,"-transient",3) == 0)
> >             {
> >                     entry->flags &= ~FL_NOTRANSIENT;
> >             }
> >             else if( strncmp(*av,"-strip",3) == 0)
> >             {
> >                     entry->flags |= FL_STRIP;
> >             }
> >     }
> >
> >     if( pipe(pfd) == -1 )
> >     {
> >             if(errno == EMFILE)
> >             {
> >                     /* Too many open files */
> >                     close_lru(0);
> >                     close_lru(0);
> >             }
> >             serror("pipe");
> >             return -1;
> >     }
> >
> >     pid = fork();
> >     if(pid == -1)
> >     {       /* failure */
> >             serror("pipe_open: fork");
> >             goto error_out;
> >     }
> >     /* else */
> >
> >     if(pid == 0)
> >     {       /* child */
> >
> >             (void)signal(SIGCHLD, SIG_DFL);
> >             (void)signal(SIGTERM, SIG_DFL);
> >
> >             if( dup2(pfd[0] , 0) == -1 )
> >             {
> >                     serror("pipe: child dup2");
> >                     _exit(-1);
> >             }
> >             close_all();
> >             /* Set up fd 1, stdout */
> >             (void) close(1);
> >             {
> >                     int fd = open("/dev/null", O_WRONLY);
> >                     if(fd >= 0 && fd != 1)
> >                     {
> >                             (void) dup2(fd, 1);
> >                             (void) close(fd);
> >                     }
> >             }
> >             /* we leave stderr alone */
> >
> >             endpriv();
> >
> >             assert(av[ac] == NULL);
> >             (void) execvp(av[0], &av[0]);
> >             serror("pipe: execvp: %s", av[0]);
> >             _exit(127);
> >     }
> >     /* else, parent */
> >
> >     if(close(pfd[0]) == -1)
> >     {
> >             /* How can this ever happen? */
> >             serror("pipe: parent close");
> >             /* not fatal ? */
> >     }
> >
> >     /* set up pfd[1] as output descriptor */
> >     entry->handle.pbuf = new_pbuf(pfd[1], 512); /* _POSIX_PIPE_BUF */
> >     if(entry->handle.pbuf == NULL)
> >             goto error_out;
> >
> >     entry->private = pid;
> >     argcat(entry->path, PATH_MAX, argc, argv);
> >
> >     udebug("    pipe_open: %d %d", pfd[1], pid);
> >
> >     return pfd[1];
> >
> > error_out:
> >     (void)close(pfd[1]);
> >     (void)close(pfd[0]);
> >     return -1;
> > }
> >
> >
> > /*
> >  * Used by reap() to delete a PIPE entry
> >  */
> > static void
> > tag_pid_entry(pid_t pid)
> > {
> >     fl_entry *entry = NULL;
> >
> >     if(pid == -1)
> >             return;
> >
> >     for(entry = thefl->tail; entry != NULL;
> >                     entry = entry->prev )
> >     {
> >             if(entry->type == PIPE
> >                             && pid == entry->private)
> >                     break;
> >     }
> >
> >     if(entry != NULL)
> >     {
> >             udebug("       reap(%d): %s", pid, entry->path);
> >             /* mark the entry as dead */
> >             entry->private = -1;
> >             /* and delete it. Unsafe in interrupt context */
> >             delete_entry(entry);
> >     }
> >     else
> >     {
> >             udebug("       reap(%d): proc not on filel", pid);
> >     }
> > }
> >
> >
> > int
> > reap(pid_t pid, int options)
> > {
> >     pid_t wpid = 0;
> >     int status = 0;
> >
> > #ifndef NO_WAITPID
> >     wpid = waitpid(pid, &status, options);
> > #else
> >     if(options == 0)
> >             wpid = wait(&status);
> >     /* customize here for older systems, use wait3 or whatever */
> > #endif
> >     if(wpid == -1)
> >     {
> >             if(!(errno == ECHILD && pid == -1)) /* Only complain when 
> > relevant */
> >                     serror("waitpid");
> >             return -1;
> >     }
> >     /* else */
> >
> >     if(wpid != 0)
> >     {
> >
> > #if !defined(WIFSIGNALED) && !defined(WIFEXITED)
> > #error "Can't decode wait status"
> > #endif
> >
> > #if defined(WIFSTOPPED)
> >             if(WIFSTOPPED(status))
> >             {
> >                     unotice("child %d stopped by signal %d",
> >                             wpid, WSTOPSIG(status));
> >             }
> >             else
> > #endif
> > #if defined(WIFSIGNALED)
> >             if(WIFSIGNALED(status))
> >             {
> >                     tag_pid_entry(wpid);
> >                     unotice("child %d terminated by signal %d",
> >                             wpid, WTERMSIG(status));
> >             }
> >             else
> > #endif
> > #if defined(WIFEXITED)
> >             if(WIFEXITED(status))
> >             {
> >                     tag_pid_entry(wpid);
> >                     if(WEXITSTATUS(status) != 0)
> >                             unotice("child %d exited with status %d",
> >                                     wpid, WEXITSTATUS(status));
> >             }
> > #endif
> >     }
> >
> >     return wpid;
> > }
> >
> >
> > static int
> > pipe_sync(fl_entry *entry, int block)
> > {
> >     int status = ENOERR;
> >     udebug("    pipe_sync: %d %s",
> >             entry->handle.pbuf ? entry->handle.pbuf->pfd : -1,
> >             block ? "" : "non-block");
> >     status = pbuf_flush(entry->handle.pbuf, block, pipe_timeo);
> >     if(status != ENOERR)
> >             entry->flags &= ~FL_NEEDS_SYNC;
> >     return status;
> > }
> >
> >
> > static void
> > pipe_close(fl_entry *entry)
> > {
> >     pid_t pid = (pid_t)entry->private;
> >     int pfd = -1;
> >
> >     udebug("    pipe_close: %d, %d",
> >             entry->handle.pbuf ? entry->handle.pbuf->pfd : -1, pid);
> >     if(entry->handle.pbuf != NULL)
> >     {
> >             if(pid >= 0 && (entry->flags & FL_NEEDS_SYNC))
> >             {
> >                     (void) pipe_sync(entry, TRUE);
> >             }
> >             pfd = entry->handle.pbuf->pfd;
> >             free_pbuf(entry->handle.pbuf);
> >     }
> >     if(pfd != -1)
> >     {
> >             if(close(pfd) == -1)
> >             {
> >                     serror("pipe close: %s", entry->path);
> >             }
> >             /*
> >              * The close should cause termination of the child
> >              * as the child reads EOF. The child is wait()'ed
> >              * upon asynchronous in a SIGCHLD handler.
> >              */
> >     }
> >     entry->path[0] = 0;
> >     entry->handle.pbuf = NULL;
> >     entry->private = 0;
> > }
> >
> >
> > /*
> >  * N.B. New return convention:
> >  * returns ENOERR (0) or, on failure, the errno.
> >  */
> > /*ARGSUSED*/
> > static int
> > pipe_put(fl_entry *entry, const char *ignored,
> >             const void *data, size_t sz)
> > {
> >     int status = ENOERR;
> >
> >     udebug("    pipe_put: %d",
> >             entry->handle.pbuf ? entry->handle.pbuf->pfd : -1);
> >     TO_HEAD(entry);
> >     if(entry->handle.pbuf == NULL)
> >             return EINVAL;
> >
> >     status = pbuf_write(entry->handle.pbuf,
> >             data, sz, pipe_timeo);
> >
> >     if(status != ENOERR)
> >     {
> >             uerror("pipe_dbufput: %s write error",
> >                             entry->path);
> >             /* don't waste time syncing an errored entry */
> >             entry->flags &= ~FL_NEEDS_SYNC;
> >             delete_entry(entry);
> >             return status;
> >     }
> >     entry->flags |= FL_NEEDS_SYNC;
> >     return ENOERR;
> > }
> >
> >
> > static struct fl_ops pipe_ops = {
> >     argcat_cmp,
> >     pipe_open,
> >     pipe_close,
> >     pipe_sync,
> >     pipe_put,
> > };
> >
> >
> > /*ARGSUSED*/
> > int
> > pipe_prodput(const product *prodp, int argc, char **argv,
> >             const void *ignored, size_t also_ignored)
> > {
> >     int status = 0;
> >     void *data = prodp->data;
> >     size_t sz = prodp->info.sz;
> >     fl_entry *entry = get_fl_entry(PIPE, argc, argv);
> >
> >     udebug("    pipe_prodput: %d %s",
> >             (entry != NULL && entry->handle.pbuf)
> >             ?  entry->handle.pbuf->pfd : -1,
> >             prodp->info.ident);
> >
> >     if(entry == NULL)
> >             return -1;
> >
> >     if(entry->flags & FL_STRIP)
> >     {
> >             data = dupstrip(prodp->data, prodp->info.sz, &sz);
> >             if(data == NULL)
> >                     return -1;
> >     }
> >
> >     status = pipe_put(entry, prodp->info.ident, data, sz);
> >     if(status == EPIPE)
> >     {
> >             /*
> >              * In case the decoder exited and we haven't yet reaped,
> >              * try again once.
> >              */
> >             uerror("pipe_prodput: trying again");
> >             entry = get_fl_entry(PIPE, argc, argv);
> >             if(entry == NULL)
> >                     return -1;
> >             status = pipe_put(entry, prodp->info.ident, data, sz);
> >     }
> >     if(data != prodp->data)
> >             free(data);
> >
> >     if(status != ENOERR)
> >             return -1;
> >
> >     return atFinishedArgs(argc, argv, entry);;
> > }
> >
> >
> > /*ARGSUSED*/
> > int
> > spipe_prodput(const product *prod, int argc, char **argv,
> >             const void *ignored, size_t also_ignored)
> > {
> >     fl_entry *entry;
> >         char *buffer;
> >     size_t len;
> >         unsigned long offset;
> >     int status = ENOERR;
> >
> >     typedef union {
> >             unsigned long u_long;
> >             char cu_long[sizeof(unsigned long)];
> >     } conv;
> >     conv key_len;
> >     conv data_len;
> >     conv sync;
> >
> >
> >     entry = get_fl_entry(PIPE, argc, argv);
> >     udebug("    spipe_prodput: %d %s",
> >             (entry != NULL && entry->handle.pbuf)
> >                     ?  entry->handle.pbuf->pfd : -1,
> >                     prod->info.ident);
> >     if(entry == NULL)
> >             return -1;
> >
> >     /*
> >         **---------------------------------------------------------
> >     ** Place the following information into dbuf_val for
> >     ** writing to the pipe:
> >     **
> >     ** unsigned long SPIPE_SYNC
> >     ** unsigned long key_len
> >     ** char *key
> >     ** unsigned long data_len  (this includes ETX/RS makers)
> >     ** char *data
> >     ** char SPIPE_ETX
> >     ** char SPIPE_RS
> >     **
> >     ** First, get lengths of key and data to allocate space
> >     ** in a temporary buffer.
> >     **
> >         **---------------------------------------------------------
> >     */
> > #ifndef SPIPE_SYNC
> > #define SPIPE_SYNC 0x1DFCCF1A
> > #endif /* !SPIPE_SYNC */
> >
> > #ifndef SPIPE_ETX
> > #define SPIPE_ETX '\003'
> > #endif /* !SPIPE_ETX */
> >
> > #ifndef SPIPE_RS
> > #define SPIPE_RS '\036'
> > #endif /* !SPIPE_ETX */
> >
> >     key_len.u_long = strlen(prod->info.ident);
> >     data_len.u_long = prod->info.sz + 2;
> >     sync.u_long = SPIPE_SYNC;
> >
> >     len = (unsigned ) (sizeof(unsigned long) +
> >       sizeof(key_len.cu_long) + strlen(prod->info.ident) +
> >       sizeof(data_len.cu_long) + prod->info.sz + 2);
> >
> >     buffer = calloc(1, len);
> >
> >         /*---------------------------------------------------------
> >         ** Now place the individual items into the buffer
> >         **-------------------------------------------------------*/
> >
> >     offset = 0;
> >
> >         memcpy (buffer+offset, sync.cu_long, sizeof(sync.cu_long));
> >     offset = offset + sizeof(unsigned long);
> >
> >     memcpy(buffer+offset, key_len.cu_long, sizeof(key_len.cu_long));
> >     offset = offset + sizeof(key_len);
> >
> >     memcpy(buffer+offset, prod->info.ident, key_len.u_long);
> >     offset = offset + key_len.u_long;
> >
> >     memcpy(buffer+offset, data_len.cu_long, sizeof(data_len.cu_long));
> >     offset = offset + sizeof(data_len);
> >
> >         memcpy(buffer+offset, prod->data, prod->info.sz);
> >
> >         /*---------------------------------------------------------
> >         ** Terminate the message with ETX & RS
> >         **-------------------------------------------------------*/
> >         buffer[len - 2] = SPIPE_ETX;
> >         buffer[len - 1] = SPIPE_RS;
> >
> >     uerror("spipe_prodput: size = %d\t%d %d %d", prod->info.sz, buffer[len 
> > -3],
> >           buffer[len -2], buffer[len  -1]);
> >
> >         /*---------------------------------------------------------
> >         ** Send this stuff and tidy up
> >         **-------------------------------------------------------*/
> >     status = pipe_put(entry, prod->info.ident, buffer, len);
> >     if(status == EPIPE)
> >     {
> >             /*
> >              * In case the decoder exited and we haven't yet reaped,
> >              * try again once.
> >              */
> >             uerror("spipe_prodput: trying again");
> >             entry = get_fl_entry(PIPE, argc, argv);
> >             if(entry == NULL)
> >                     return -1;
> >             status = pipe_put(entry, prod->info.ident, buffer, len);
> >     }
> >         free(buffer);
> >     if(status != ENOERR)
> >             return -1;
> >
> >     return atFinishedArgs(argc, argv, entry);
> >
> > }
> >
> >
> > int
> > xpipe_prodput(const product *prod, int argc, char **argv,
> >             const void *xprod, size_t xlen)
> > {
> >     int status = ENOERR;
> >     fl_entry *entry;
> >
> >     entry = get_fl_entry(PIPE, argc, argv);
> >     udebug("    xpipe_prodput: %d %s",
> >             (entry != NULL && entry->handle.pbuf)
> >                     ?  entry->handle.pbuf->pfd : -1, prod->info.ident);
> >     if(entry == NULL)
> >             return -1;
> >
> >     status = pipe_put(entry, prod->info.ident, xprod, xlen);
> >     if(status == EPIPE)
> >     {
> >             /*
> >              * In case the decoder exited and we haven't yet reaped,
> >              * try again once.
> >              */
> >             uerror("xpipe_prodput: trying again");
> >             entry = get_fl_entry(PIPE, argc, argv);
> >             if(entry == NULL)
> >                     return -1;
> >             status = pipe_put(entry, prod->info.ident, xprod, xlen);
> >     }
> >
> >     if(status != ENOERR)
> >             return -1;
> >
> >     return atFinishedArgs(argc, argv, entry);
> > }
> > /* End PIPE */
> >
> >
> > #ifndef NO_DB
> > # ifdef USE_GDBM
> > /* namespace conflict with gdbm_open, etc, so using prefix ldmdb_ */
> >
> >
> > /*
> >  * called in gdbm when it tries to punt
> >  * If we didn't provide this function, gdbm would print the
> >  * message and call exit(-1).
> >  */
> > static void
> > ldmdb_fatal ( char * str)
> > {
> >     serror("%s", str);
> > }
> >
> >
> > /*
> >  * two or 3 args:
> >  *          pathname flag [dblocksize]
> >  *  if flag is 0 open read/write/create, otherwise open readonly
> >  */
> > static int
> > ldmdb_open(fl_entry *entry, int argc, char **argv)
> > {
> >     char *path;
> >     GDBM_FILE db;
> >     long tmp = 0;
> >     int read_write = GDBM_WRCREAT;
> >     /* default: choose to optimize for space over time */
> > #define DEFAULT_DBLOCKSIZE 512
> >     int dblocksize = DEFAULT_DBLOCKSIZE;
> >
> >     entry->handle.db = NULL;
> >     path = argv[0];
> >     read_write = atoi(argv[1]);
> >
> >     if(argc > 2)
> >     {
> >             if ( (tmp = atoi(argv[2])) > 0 )
> >             {
> >                     dblocksize = (int)tmp;
> >             }
> >             else
> >             {
> >                     uerror("%s: ldmdb_open: -dblocksize %s invalid",
> >                             path, argv[1] );
> >             }
> >     }
> >
> >     if(read_write != GDBM_READER) /* not read only */
> >     {
> >             /* create directories if needed */
> >             if(diraccess(path, (R_OK | W_OK), !0) == -1)
> >             {
> >                     serror("Couldn't access directories leading to %s", 
> > path);
> >                     return -1;
> >             }
> >     }
> >
> >     db = gdbm_open(path, dblocksize, read_write, 0664, ldmdb_fatal);
> >     if(db == NULL)
> >     {
> >             if(errno == EMFILE)
> >             {
> >                     /* Too many open files */
> >                     close_lru(0);
> >                     close_lru(0);
> >             }
> >             serror("gdbm_open: %s", path);
> >             return -1;
> >     }
> >     entry->handle.db = db;
> >     entry->private = read_write;
> >     strncpy(entry->path, path, PATH_MAX);
> >     entry->path[PATH_MAX] = 0; /* just in case */
> >     udebug("    ldmdb_open: %s", entry->path);
> >     return 0;
> > }
> >
> >
> > static void
> > ldmdb_close(fl_entry *entry)
> > {
> >     udebug("    ldmdb_close: %s", entry->path);
> >     if(entry->handle.db != NULL)
> >             gdbm_close(entry->handle.db);
> >     entry->private = 0;
> >     entry->path[0] = 0;
> >     entry->handle.db = NULL;
> > }
> >
> >
> > static int
> > ldmdb_cmp(fl_entry *entry, int argc, char **argv)
> > {
> >     char *path;
> >     int read_write;
> >     int cmp;
> >
> >     assert(argc > 1);
> >     assert(argv[0] != NULL);
> >     assert(*argv[0] != 0);
> >
> >     path = argv[0];
> >     read_write = atoi(argv[1]);
> >
> >     cmp = strcmp(path, entry->path);
> >     if(cmp == 0)
> >     {
> >             if(read_write != GDBM_READER &&
> >                     read_write != entry->private)
> >             {
> >                     /*
> >                      * the flags don't match, so close and reopen
> >                      */
> >                     ldmdb_close(entry);
> >                     if(ldmdb_open(entry, argc, argv) < 0)
> >                             cmp = -1;
> >             }
> >     }
> >     return cmp;
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > ldmdb_sync(fl_entry *entry, int block)
> > {
> >     /* there is no gdbm_sync */
> >     udebug("    ldmdb_sync: %s",
> >             entry->handle.db ? entry->path : "");
> >     entry->flags &= ~FL_NEEDS_SYNC;
> >     return(0);
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > ldmdb_put(fl_entry *entry, const char *keystr,
> >             const void *data, size_t sz)
> > {
> >     datum key, content;
> >     int status;
> >
> >     key.dptr = (char *) keystr /* N.B. cast away const */;
> >     key.dsize = (int) strlen(key.dptr) + 1; /* include the \0 */
> >
> >     content.dptr = (char *) data; /* N.B. cast away const */
> >     content.dsize = (int)sz;
> >
> > #if defined(DB_CONCAT) && !DB_XPROD
> >     /* concatenate duplicate keys  */
> >     /*
> >      * Code for concatenating data when the key is a duplicate.
> >      * Contributed 9/17/91 JCaron/PNeilley/LCarson
> >      * Wrecks idea of "product" when applied at this layer, so
> >      * only define DB_CONCAT when DB_XPROD is not defined.
> >      */
> >
> >     status = gdbm_store(entry->handle.db, key, content, GDBM_INSERT);
> >         if (status == 1 )
> >             {
> >             int             size;
> >             datum       old_stuff, new_stuff;
> >             old_stuff = gdbm_fetch(entry->handle.db, key);
> >                     udebug("\tConcatenating data under key %s", key.dptr);
> >             if (NULL == old_stuff.dptr)
> >                 {
> >                 serror("ldmdb_prodput: Inconsistent Duplicate Key storage");
> >                 return -1;
> >                 }
> >             size = content.dsize+old_stuff.dsize;
> >             if (NULL == (new_stuff.dptr = malloc(size)))
> >                 {
> >                 serror("ldmdb_prodput: malloc failed");
> >                 free (old_stuff.dptr);
> >                 return -1;
> >                 }
> >             memcpy(new_stuff.dptr, old_stuff.dptr, old_stuff.dsize);
> >             memcpy(&new_stuff.dptr[old_stuff.dsize], content.dptr, 
> > content.dsize);
> >             new_stuff.dsize = size;
> >             status = gdbm_store(entry->handle.db, key, new_stuff, 
> > GDBM_REPLACE);
> >             free (new_stuff.dptr);
> >             free (old_stuff.dptr);
> >             }
> >
> > #else
> >     /* TODO: replace flag */
> >     status = gdbm_store(entry->handle.db, key, content, GDBM_REPLACE);
> > #endif
> >     return status;
> > }
> >
> > # else /*USE_GDBM*/
> >
> > /*
> >  * two or 3 args:
> >  *          pathname flag [dblocksize]
> >  *  if flag is 0 open read/write/create, otherwise open readonly
> >  */
> > static int
> > ldmdb_open(fl_entry *entry, int ac, char **av)
> > {
> >     const char *path;
> >     int flags = (O_WRONLY|O_CREAT);
> >
> >     assert(ac > 0);
> >     assert(av[ac -1] != NULL);
> >     assert(*av[ac -1] != 0);
> >
> >     entry->handle.db = NULL;
> >
> >     for(; ac > 1 && *av[0] == '-'; ac-- , av++)
> >     {
> >             if( strncmp(*av,"-overwrite",3) == 0)
> >             {
> >                     flags |= O_TRUNC;
> >             }
> >             else if( strncmp(*av,"-strip",3) == 0)
> >             {
> >                     entry->flags |= FL_STRIP;
> >             }
> >     }
> >
> >     path = av[ac-1];
> >
> >     /* create directories if needed */
> >     if(diraccess(path, (R_OK | W_OK), !0) == -1)
> >     {
> >             serror("Couldn't access directories leading to %s", path);
>               return -1;
> >     }
> >
> >     entry->handle.db = dbm_open(path, flags, 0666);
> >     if(entry->handle.db == NULL)
> >     {
> >             if(errno == EMFILE)
> >             {
> >                     /* Too many open files */
> >                     close_lru(0);
> >                     close_lru(0);
> >                     close_lru(0);
> >                     close_lru(0);
> >             }
> >             serror("ldmdb_open: %s", path);
> >             return -1;
> >     }
> >     strncpy(entry->path, path, PATH_MAX);
> >     entry->path[PATH_MAX] = 0; /* just in case */
> >     udebug("    ldmdb_open: %s", entry->path);
> >     return 0;
> > }
> >
> >
> > static void
> > ldmdb_close(fl_entry *entry)
> > {
> >     udebug("    ldmdb_close: %s", entry->path);
> >     if(entry->handle.db != NULL)
> >             dbm_close(entry->handle.db);
> >     entry->private = 0;
> >     entry->path[0] = 0;
> >     entry->handle.db = NULL;
> > }
> >
> >
> > static int
> > ldmdb_cmp(fl_entry *entry, int argc, char **argv)
> > {
> >     return str_cmp(entry, argc, argv);
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > ldmdb_sync(fl_entry *entry, int block)
> > {
> >     /* there is no dbm_sync */
> >     udebug("    ldmdb_sync: %s",
> >             entry->handle.db ? entry->path : "");
> >     entry->flags &= ~FL_NEEDS_SYNC;
> >     return(0);
> > }
> >
> >
> > /*ARGSUSED*/
> > static int
> > ldmdb_put(fl_entry *entry, const char *keystr,
> >             const void *data, size_t sz)
> > {
> >     datum key, content;
> >     int status;
> >
> >     key.dptr = (char *) keystr /* N.B. cast away const */;
> >     key.dsize = (int) strlen(key.dptr) + 1; /* include the \0 */
> >
> >     content.dptr = (char *) data; /* N.B. cast away const */
> >     content.dsize = (int)sz;
> >
> > #if defined(DB_CONCAT) && !DB_XPROD
> >     /* concatenate duplicate keys  */
> >     /*
> >      * Code for concatenating data when the key is a duplicate.
> >      * Contributed 9/17/91 JCaron/PNeilley/LCarson
> >      * Wrecks idea of "product" when applied at this layer, so
> >      * only define DB_CONCAT when DB_XPROD is not defined.
> >      */
> >
> >     status = dbm_store(entry->handle.db, key, content, DBM_INSERT);
> >         if (status == 1 )
> >             {
> >             int             size;
> >             datum       old_stuff, new_stuff;
> >             old_stuff = dbm_fetch(entry->handle.db, key);
> >                     udebug("\tConcatenating data under key %s", key.dptr);
> >             if (NULL == old_stuff.dptr)
> >                 {
> >                 serror("ldmdb_prodput: Inconsistent Duplicate Key storage");
> >                 return -1;
> >                 }
> >             size = content.dsize+old_stuff.dsize;
> >             if (NULL == (new_stuff.dptr = malloc(size)))
> >                 {
> >                 serror("ldmdb_prodput: malloc failed");
> >                 free (old_stuff.dptr);
> >                 return -1;
> >                 }
> >             memcpy(new_stuff.dptr, old_stuff.dptr, old_stuff.dsize);
> >             memcpy(&((char *)new_stuff.dptr)[old_stuff.dsize],
> >                     content.dptr, content.dsize);
> >             new_stuff.dsize = size;
> >             status = dbm_store(entry->handle.db, key, new_stuff, 
> > DBM_REPLACE);
> >             free (new_stuff.dptr);
> >             free (old_stuff.dptr);
> >             }
> >
> > #else
> >     /* TODO: replace flag */
> >     status = dbm_store(entry->handle.db, key, content, DBM_REPLACE);
> > #endif
> >     return status;
> > }
> > # endif /*USE_GDBM*/
> >
> >
> > static struct fl_ops ldmdb_ops = {
> >     ldmdb_cmp,
> >     ldmdb_open,
> >     ldmdb_close,
> >     ldmdb_sync,
> >     ldmdb_put,
> > };
> >
> >
> > /*ARGSUSED*/
> > int
> > ldmdb_prodput(const product *prod, int ac, char **av,
> >             const void *xp, size_t xlen)
> > {
> >     fl_entry *entry;
> >     int status;
> >     int closeflag = 0;
> >
> >     const char *keystr;
> >     char *dblocksizep = NULL;
> >     char *gdbm_wrcreat = "2";
> >
> >     for(; ac > 1 && *av[0] == '-'; ac-- , av++)
> >     {
> >             if( strncmp(*av,"-close",3) == 0)
> >                     closeflag = !0;
> >             else if( strncmp(*av,"-dblocksize",3) == 0)
> >             {
> >                     ac--; av++;
> >                     dblocksizep = *av;
> >             } else
> >                     uerror("dbfile: Invalid argument %s", *av);
> >
> >     }
> >
> >     {
> >             /* set up simple argc, argv for ldmdb_open */
> >             int argc = 0;
> >             char *argv[4];
> >             argv[argc++] = av[0];
> >             argv[argc++] = gdbm_wrcreat;
> >             if(dblocksizep != NULL)
> >                     argv[argc++] = dblocksizep;
> >             argv[argc] = NULL;
> >             entry = get_fl_entry(FT_DB, argc, argv);
> >             udebug("    ldmdb_prodput: %s %s",
> >                     entry == NULL ? "" : entry->path, prod->info.ident);
> >             if(entry == NULL) return -1;
> >     }
> >
> >     ac--; av++;
> >
> >     if(ac >= 0 && av[0] != NULL && *av[0] != 0)
> >     {
> >             /* use command line arg as key */
> >             keystr = av[0];
> >     }
> >     else
> >     {
> >             /* use product->ident */
> >             keystr = prod->info.ident;
> >     }
> >
> > #if DB_XPROD
> >     status = ldmdb_put(entry, keystr, xp, xlen);
> > #else
> >     status = ldmdb_put(entry, keystr, prod->data, prod->info.sz);
> > #endif
> >
> >     if(status == -1)
> >     {
> >             uerror("db_put: %s error for %s, dbkey %s",
> >                     entry->path, prod->info.ident, keystr);
> >     }
> >     if(closeflag || status == -1)
> >     {
> >             delete_entry(entry);
> >     }
> >
> >     return status;
> > }
> >
> > #endif /* !NO_DB */
> >
> >
> > static fl_entry *
> > new_fl_entry(ft_t type, int argc, char **argv)
> > {
> >     fl_entry *entry = NULL;
> >
> >     entry = Alloc(1, fl_entry);
> >     if(entry == NULL)
> >     {
> >             serror("new_fl_entry: malloc");
> >             return NULL;
> >     }
> >     entry->path[0] = 0;
> >
> >     switch (type) {
> >     case UNIXIO :
> >             entry->ops = &unio_ops;
> >             break;
> >     case STDIO :
> >             entry->ops = &stdio_ops;
> >             break;
> >     case PIPE :
> >             entry->ops = &pipe_ops;
> >             break;
> >     case FT_DB :
> > #ifndef NO_DB
> >             entry->ops = &ldmdb_ops;
> > #else
> >             uerror("new_fl_entry: DB type not enabled");
> >             goto err;
> >             /*NOTREACHED*/
> > #endif /* !NO_DB */
> >             break;
> >     default :
> >             uerror("new_fl_entry: unknown type %d", type);
> >             goto err;
> >     }
> >
> >     entry->flags = 0;
> >     entry->type = type;
> >     entry->next = NULL;
> >     entry->prev = NULL;
> >     entry->path[0] = 0;
> >     entry->private = 0;
> >
> >     if( entry->ops->open(entry, argc, argv) == -1 )
> >             goto err;
> >     entry->serial = newSerial();
> >
> >     return entry;
> > err :
> >     free_fl_entry(entry);
> >     return NULL;
> > }
> >
> > --------------070609090501010602000202--
>