[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: 19991103: LDM 5.0.8 and 'expanding input buffer size' problemwithAFOS feed



Gregory,


I actually found the patch file, added it as a attachment. Here's the tag
on how to apply it:

http://www.unidata.ucar.edu/glimpse/ldm/2821


According to the patch, pqing.c 1.66 should work, then the PIL code can be
added. I'll attach 1.66


I forgot about David Copley problem, here's logs for pqing.c. It looks 
like the PIL code was added after NMC2 stuff.

----------------------------
revision 1.68
date: 1999/06/17 16:11:07;  author: steve;  state: Exp;  lines: +3 -2
Replaced missing "#include <ldmconfig.h>" directive that was removed at
the last checkin.
----------------------------
revision 1.67
date: 1999/06/03 21:12:38;  author: rkambic;  state: Exp;  lines: +27 -8
PIL header addition
----------------------------
revision 1.66
date: 1999/02/09 03:17:09;  author: davis;  state: Exp;  lines: +6 -6
Fix CONDUIT hack.
----------------------------
revision 1.65
date: 1998/10/16 19:28:49;  author: steve;  state: Exp;  lines: +3 -2
Ported to HP-UX B.11.00.
Modified configuration mechanism: now uses configuration file
    "ldmconfig.h.in".
----------------------------

You might have add the line to pqing.c:

#include <ldmconfig.h>



Robb...


On Thu, 4 Nov 1999, Gregory Grosshans wrote:

> Robb,
> 
> I tried a feedtest and received the following:
> 
> bin/feedtest -vxl - -b 9600 -p none -f AFOS /dev/ttyC13
> Nov 04 21:09:31 feedtest[23658]: Starting Up
>         $Revision: 1.68 $ built Aug 13 1999 11:23:12
> Nov 04 21:09:31 feedtest[23658]: TERMIOS "/dev/ttyC13": 9600 baud, none parity
>         garbage encountered while searching for header
> Nov 04 21:10:03 feedtest[23658]: Expanding input buffer size to 20480
> Nov 04 21:10:13 feedtest[23658]: Expanding input buffer size to 24576
> Nov 04 21:10:16 feedtest[23658]: Interrupt
> Nov 04 21:10:16 feedtest[23658]: Exiting
> leslie 47:
> 
> So I looked through the Unidata searchable web pages using 'afos'.  I found 
> some email from
> this past February between Unidata and David Copley.  He appeared to have the 
> same problem
> as I am only he was using 5.0.6.
> 
> I called David and he said that Unidata noticed a bug in pqing.c and that 
> Unidata provided a
> patch and then LDM ingested AFOS
> data just fine.  Looking at some other email it appears pqing was updated to 
> support the
> CONDUIT project.  Can you provide me the patch necessary for pqing to accept 
> AFOS data
> instead of the CONDUIT NMC2 data?
> 
> Thanks,
> Gregg
> 
> 
> Robb Kambic wrote:
> 
> > Gregory,
> >
> > I don't have any good solutions to your problem.  I created an source code
> > release for ldm-5.0.2 in the standard ldm source ftp directory.  You could
> > compile the source, test it and then modify the PIL fixes to the code. One
> > word of caution, save all the other working releases so no overwrites
> > occur.  At this time, I can't really expend any more time because AFOS
> > stream is not really used in the IDD. If I think of any new ideas about
> > the problems, I'll let you know.  I had numerous problems with HPUX, if
> > fact I can't get it compiled on HPUX 11.0 because of a bug in the rpc
> > library. For HPUX 11.0 users, they have to use the version compiled on
> > HPUX 10.2   I sympathize with your situation, it's frustrating.
> >
> > Robb...
> >
> > On Thu, 4 Nov 1999, Gregory Grosshans wrote:
> >
> > > I'm running LDM 5.0.2, and trying to get 5.0.8 to work on an HP 712 HPUX 
> > > 10.20
> > > machine.  Up until this past spring the LDM was running on an HP 715 with 
> > > HPUX 9.x.
> > > The feeds into this machine are AFOS, DDS and PPS via serial connections.
> > >
> > > I just grabbed the HPUX 10.2 LDM 5.0.8 binaries off the web site and 
> > > installed them and
> > > set the queue at 100 MB.  Again I received
> > > the "Expanding input buffer size" problem.
> > >
> > > I've attached ldmd.log.1 and ldmd.log.2.  ldmd.log.1 was created when I 
> > > tried to run
> > > the 5.0.8 binaries and you will notice the
> > > expanding buffer size messages.  The interesting item at the end of this 
> > > log is under
> > > afos it lists 'nregions    35'.  While the PPS and
> > > DDS feeds don't list an nregions at all.  Then if you look at the end of 
> > > ldmd.log.2
> > > (created from 5.0.2) under AFOS it doesn't list
> > > 'nregions...'.    Do you know what causes LDM to generate this message?
> > >
> > > Outside of just a few HP's with Mux ports the other alternatives are 
> > > configuring a
> > > linux box.   Operations is just an HP shop.
> > >
> > > The router configuration would involve NCEP HQ so I'd like to wait until 
> > > absolutely
> > > necessary to go that route.
> > >
> > > Gregg
> > >
> > >
> > >
> > > Robb Kambic wrote:
> > >
> > > > Gregory,
> > > >
> > > > At this point, I'm really starting to speculate. Is it possible to 
> > > > install
> > > > the ldm on another box that's not IRIX?  My thinking is some library/etc
> > > > has changed to make the ldm behave abnormal. If we could duplicate this 
> > > > on
> > > > another platform then it's definitely a bug in the ldm.  Could be OS
> > > > specific.
> > > >
> > > > Robb...
> > > >
> > > > On Thu, 4 Nov 1999, Gregory Grosshans wrote:
> > > >
> > > > > On the machine in question the only feeds are DDS, PPS and AFOS.  On 
> > > > > this machine
> > > > > I've had a LDM queue of 25 MB for three years and it worked fine with 
> > > > > all three
> > > > > feed types.
> > > > >
> > > > > There is a seperate machine with LDM ingesting NOAAPORT and on this 
> > > > > machine the
> > > > > queue is 700+ MB since I'm ingesting three
> > > > > channels of NOAAPORT data.
> > > > >
> > > > > I've actually set the size of the ldm queue in the 
> > > > > $LDMHOME/bin/ldmadmin script to
> > > > > 250000000.  I've verified several times that the queue is of 
> > > > > sufficient length.
> > > > > When I switch back to 5.0.2 I change the queue back to 25 MB and 
> > > > > everything works
> > > > > fine.
> > > > >
> > > > > I tried commenting out the PPS and DDS feed types, leaving only AFOS 
> > > > > on and
> > > > > starting 5.0.8 and still received the 'expanding input buffer size'.  
> > > > > While LDM
> > > > > was running with just the AFOS feed I tried pqcat.  PQCAT started and 
> > > > > exited right
> > > > > away and indicated the number of products were 0.
> > > > >
> > > > > Gilbert, If I were able to have the routers and firewall changed to 
> > > > > allow your LDM
> > > > > access to this machine I don't understand how the AFOS
> > > > > data would be able to get to you.  Right now there are no AFOS 
> > > > > products entering
> > > > > the queue, the only thing occurring is the 'Expanding input buffer 
> > > > > size.'  If
> > > > > there were AFOS products entering the queue they would also show up 
> > > > > in one of the
> > > > > three other LDM machines here.
> > > > >
> > > > > Actually, that may be a key, the buffer is expanding.  Robb, do you 
> > > > > know how the
> > > > > buffer plays into the LDM?   Does LDM load a product
> > > > > into a new dynamic buffer prior to placing it in the queue and then 
> > > > > does it free
> > > > > the memory?\
> > > > >
> > > > > Thanks for your timely responses.
> > > > >
> > > > > Gregg
> > > > >
> > > > >
> > > > > Gilbert Sebenste wrote:
> > > > >
> > > > > > On Thu, 4 Nov 1999, Robb Kambic wrote:
> > > > > >
> > > > > > > Gregory,
> > > > > > >
> > > > > > > After looking at the log messages, it doesn't appear your queue 
> > > > > > > is 250
> > > > > > > megabytes.  Look in the data dir, what's the:
> > > > > > >
> > > > > > > % ls -l ldm.pq
> > > > > > >
> > > > > > > If it's 250 megabytes then up it again. There might be other 
> > > > > > > products
> > > > > > > coming in on your afos stream, etc.  If it isn't then change it 
> > > > > > > in the
> > > > > > > bin/ldmadmin file to 250.  Then the standard
> > > > > > >
> > > > > > > % ldmadmin stop
> > > > > > > % ldmadmin delqueue
> > > > > > > % ldmadmin mkqueue
> > > > > > > % ldmadmin start
> > > > > > >
> > > > > > > I just put the commands here for my sanity sake.
> > > > > >
> > > > > > DUH! I forgot he was also ingesting NOAAPORT. Whoa, that's 175 MB 
> > > > > > right
> > > > > > there. Up, up and away! Use 400 MB at least.
> > > > > >
> > > > > > > Also, if you comment out the dds and pps does it make any 
> > > > > > > difference?
> > > > > > >
> > > > > > > Robb...
> > > > > >
> > > > > > I think that's the key here. NOAAPORT takes up 150 MB of my 
> > > > > > queue/hour,
> > > > > > and of course, we filter out scrambled NIDS and other stuff. Give 
> > > > > > it a go,
> > > > > > and then let us know!
> > > > > >
> > > > > > *******************************************************************************
> > > > > > Gilbert Sebenste                                                    
> > > > > >  ********
> > > > > > Internet: address@hidden    (My opinions only!)                     
> > > > > > ******
> > > > > > Staff Meteorologist, Northern Illinois University                   
> > > > > >    ****
> > > > > > Work phone: 815-753-5492                                            
> > > > > >     ***
> > > > > > *******************************************************************************
> > > > > >
> > > > >
> > > >
> > > > ===============================================================================
> > > > Robb Kambic                                Unidata Program Center
> > > > Software Engineer III                      Univ. Corp for Atmospheric 
> > > > Research
> > > > address@hidden                   WWW: http://www.unidata.ucar.edu/
> > > > ===============================================================================
> > >
> >
> > ===============================================================================
> > Robb Kambic                                Unidata Program Center
> > Software Engineer III                      Univ. Corp for Atmospheric 
> > Research
> > address@hidden                   WWW: http://www.unidata.ucar.edu/
> > ===============================================================================
> 

===============================================================================
Robb Kambic                                Unidata Program Center
Software Engineer III                      Univ. Corp for Atmospheric Research
address@hidden             WWW: http://www.unidata.ucar.edu/
===============================================================================
Index: pqing.c
===================================================================
RCS file: /upc/share/CVS/ldm5/pqing/pqing.c,v
retrieving revision 1.65
retrieving revision 1.66
diff -c -r1.65 -r1.66
*** pqing.c     1998/10/16 19:28:49     1.65
--- pqing.c     1999/02/09 03:17:09     1.66
***************
*** 2,10 ****
   *   Copyright 1993, University Corporation for Atmospheric Research
   *   See ../COPYRIGHT file for copying and redistribution conditions.
   */
! /* $Id: pqing.c,v 1.65 1998/10/16 19:28:49 steve Exp $ */
  static char version[] =
! "$Revision: 1.65 $ built "__DATE__" "__TIME__;
  
  #include <ldmconfig.h>
  #include <stdio.h>
--- 2,10 ----
   *   Copyright 1993, University Corporation for Atmospheric Research
   *   See ../COPYRIGHT file for copying and redistribution conditions.
   */
! /* $Id: pqing.c,v 1.66 1999/02/09 03:17:09 davis Exp $ */
  static char version[] =
! "$Revision: 1.66 $ built "__DATE__" "__TIME__;
  
  #include <ldmconfig.h>
  #include <stdio.h>
***************
*** 512,525 ****
                else
                        setTheScanner(scan_wmo_binary);
        }
-       else if (feedtype & NMC)
-       {
-               setTheScanner(scan_wmo_binary);
-       }
        else if (feedtype == AFOS)
        {
                prod_stats = afos_stats;
                setTheScanner(scan_afos);
        }
        else if (feedtype == FAA604)
        {
--- 512,525 ----
                else
                        setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == AFOS)
        {
                prod_stats = afos_stats;
                setTheScanner(scan_afos);
+       }
+       else if (feedtype == NMC2) /* CONDUIT */
+       {
+               setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == FAA604)
        {
/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: pqing.c,v 1.66 1999/02/09 03:17:09 davis Exp $ */
static char version[] =
"$Revision: 1.66 $ built "__DATE__" "__TIME__;

#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <limits.h> /* PATH_MAX */
#ifndef PATH_MAX
#define PATH_MAX 255
#endif /* !PATH_MAX */
#include <sys/types.h>
#include <sys/time.h>
#include <rpc/rpc.h>
#include <errno.h>

#include "ulog.h"
#include "inetutil.h"
#include "ldm.h"
#include "feed.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "wmo_message.h"
#include "afos_message.h"
#include "faa604_message.h"
#include "paths.h"
#include "pq.h"
#include "md5.h"

#ifdef NO_ATEXIT
#include "atexit.h"
#endif

#if NET
#define RETRY_DELAY     (10)            /* delay factor between retries */
#define MAX_RETRIES     (30)
#endif /* NET */

/* set by command line */
char *baud = NULL;
char *rawfname = NULL;
char *parity = NULL;
enum { CHK_UNSET, CHK_CHECK, CHK_DONT} chkflag = CHK_UNSET;
static feedtypet feedtype = NONE; /* deduce from av[0]  */
static const char *progname = NULL;
static char *logfname = "";
static char feedfname[PATH_MAX];
static char myname[HOSTNAMESIZE];
static const char *pqfname = DEFAULT_QUEUE;

static pqueue *pq = NULL;
static int ifd = -1; 

static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;

static void (*prod_stats)(void) = wmo_stats;
static unsigned long ndups = 0;

static MD5_CTX *md5ctxp = NULL;

#if NET
static int port_error = 0;      /* indicate sigpipe condition was raised */
#       ifndef DEFAULT_RESET_SECS
#       define DEFAULT_RESET_SECS 600
#       endif
static int reset_secs = DEFAULT_RESET_SECS;
#endif

/*
 * called at exit
 */
static void
cleanup(void)
{
        unotice("Exiting"); 
        if(!intr)
        {
                /* We are not in the interrupt context */

                if(md5ctxp != NULL)
                {
                        free_MD5_CTX(md5ctxp);  
                }

                if(pq != NULL)
                {
                        off_t highwater = 0;
                        size_t maxregions = 0;
                        (void) pq_highwater(pq, &highwater, &maxregions);
                        (void) pq_close(pq);
                        pq = NULL;

                        if(feed_close)
                                (*feed_close)(ifd);
                        ifd = -1;
                        unotice("  Queue usage (bytes):%8ld",
                                                (long)highwater);
                        unotice("           (nregions):%8ld",
                                                (long)maxregions);
                        unotice("  Duplicates rejected:%8lu", ndups);
                }
                (*prod_stats)();
                (*feed_stats)();
        }
        (void) closeulog();
}


/*
 * called upon receipt of signals
 */
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
        /* 
         * Some systems reset handler to SIG_DFL upon entry to handler.
         * In that case, we reregister our handler.
         */
        (void) signal(sig, signal_handler);
#endif
        switch(sig) {
        case SIGINT :
                unotice("Interrupt");
                intr = !0;
                exit(0);
        case SIGTERM :
                udebug("SIGTERM");
                done = !0;
                return;
        case SIGPIPE :
#if NET
                if(INPUT_IS_SOCKET)
                {
                        unotice("SIGPIPE");
                        port_error = !0;
                }
#endif
                return;
        case SIGUSR1 :
                udebug("SIGUSR1");
                stats_req = !0;
                return;
        case SIGUSR2 :
                udebug("SIGUSR2");
                if (toggleulogpri(LOG_INFO))
                        unotice("Going verbose");
                else
                        unotice("Going silent");
                return;
        }
        udebug("signal_handler: unhandled signal: %d", sig);
}


/*
 * register the signal_handler
 */
static void
set_sigactions(void)
{
        struct sigaction sigact;

        sigemptyset(&sigact.sa_mask);
        sigact.sa_flags = 0;

        /* Ignore these */
        sigact.sa_handler = SIG_IGN;
        (void) sigaction(SIGHUP, &sigact, NULL);
        (void) sigaction(SIGALRM, &sigact, NULL);
        (void) sigaction(SIGCHLD, &sigact, NULL);

        /* Handle these */
#ifdef SA_RESTART       /* SVR4, 4.3+ BSD */
        /* usually, restart system calls */
        sigact.sa_flags |= SA_RESTART;
#endif
        sigact.sa_handler = signal_handler;
        (void) sigaction(SIGTERM, &sigact, NULL);
        (void) sigaction(SIGUSR1, &sigact, NULL);
        (void) sigaction(SIGUSR2, &sigact, NULL);

        /* Don't restart after interrupt */
        sigact.sa_flags = 0;
#ifdef SA_INTERRUPT     /* SunOS 4.x */
        sigact.sa_flags |= SA_INTERRUPT;
#endif
        (void) sigaction(SIGINT, &sigact, NULL);
        (void) sigaction(SIGPIPE, &sigact, NULL);
}


static void
usage(
        char *av0 /*  id string */
)
{
        (void)fprintf(stderr,
                "Usage: %s [options] feedname\t\nOptions:\n", av0);
        (void)fprintf(stderr,
                "\t-v           Verbose, tell me about each product\n");
        (void)fprintf(stderr,
                "\t-r rawfile   Stash 'raw' data in \"rawfile\"\n");
        (void)fprintf(stderr,
                "\t-l logfile   Default logs to syslogd\n");
        (void)fprintf(stderr,
                "\t-f type      Claim to be feedtype \"type\", one of \"hds\", 
\"ddplus\", ...\n");
        (void)fprintf(stderr,
                "\t-b baud      Set baudrate for tty feed.\n");
        (void)fprintf(stderr,
                "\t-q queue     default \"%s\"\n", DEFAULT_QUEUE);
        (void)fprintf(stderr,
                "\t-p [even|odd|none]  Set parity for tty feed.\n");
        (void)fprintf(stderr,
                "\t-c           Enable checksum or parity check on non tty 
feed\n");
        (void)fprintf(stderr,
                "\t-n           Disable checksum or parity check on tty 
feed\n");
#if NET
        (void)fprintf(stderr,
                "\t-P port      Get input via TCP connection to host 
\"feedname\" at \"port\"\n");
        (void)fprintf(stderr,
                "\t-T timeout   Idle timeout before TCP reconnect, in 
seconds\n");
        (void)fprintf(stderr,
                "\t             (defaults to %d, 0 disables timeout)\n",
                        DEFAULT_RESET_SECS);
#endif
        exit(1);
}


void
toClients(timestampt arrival,
        unsigned seqno,
        const char *ident,
        unsigned len,
        const char *buf)
{
        static struct product prod;
        int status;

        prod.info.arrival = arrival;
        MD5Init(md5ctxp);
        MD5Update(md5ctxp, (const unsigned char *)buf, len);
        MD5Final(prod.info.signature, md5ctxp);
        prod.info.origin = myname;
        prod.info.feedtype = feedtype;
        prod.info.seqno = seqno;
        prod.info.ident = (char *)ident; /* cast away const */
        prod.info.sz = len;
        prod.data = (void *)buf; /* cast away const */

        if(ulogIsVerbose())
                uinfo("%s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug()));

        if(pq == NULL)          /* if we are "feedtest", do nothing else */
                return;

        status = pq_insert(pq, &prod);
        if(status == ENOERR)
                return; /* Normal return */

        /* else */
        if(status == PQUEUE_DUP)
        {
                ndups++;
                uinfo("Product already in queue");
                return;
        }

        /* else, error */
        uerror("pq_insert: %s\n",
                status > 0 ? strerror(status) : "Internal error");
        exit(1); /* ??? */
}


static void
setFeedDefaults(feedtypet type)
{
        /* set up defaults for feed according to type */
        switch (type) {
        case DDPLUS :
                baud = "19200";
                parity = "even";
                break;
        case PPS :
        case DDS :
        case IDS :
                baud = "9600";
                parity = "even";
                break;
        case HDS :
                baud = "19200";
                parity = "none";
                break;
        case AFOS :
                baud = "4800"; /* ??? */
                parity = "none";
                break;
        case FAA604 :
                baud = "1200";
                parity = "even";
                break;
        }
}


static feedtypet 
whatami(const char *av0)
{
        feedtypet type;
#define SEP     '/' /* separates components of path */
        /* strip off leading path */
        if ((progname = strrchr(av0, SEP)) == NULL)
                progname = av0;
        else
            progname++;
        
        type = atofeedtypet(progname);
        if(type == NONE)
                type = WMO; /* default for wmo ingestd */
        setFeedDefaults(type);
        return type;    
}


int
main(int ac, char *av[])
{

        int logfd;
        int width;
        int ready;
        unsigned long idle;
        fd_set readfds;
        fd_set exceptfds;
        struct timeval timeo;

        feedtype = whatami(av[0]);

        /*
         * Check the environment for some options.
         * May be overridden by command line switches below.
         */
        {
                const char *ldmpqfname = getenv("LDMPQFNAME");
                if(ldmpqfname != NULL)
                        pqfname = ldmpqfname;
        }

        {
        extern int optind;
        extern int opterr;
        extern char *optarg;
        int ch;
        int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));

        opterr = 1;

        while ((ch = getopt(ac, av, "vxcnl:b:p:P:T:q:r:f:")) != EOF)
                switch (ch) {
                case 'v':
                        logmask |= LOG_MASK(LOG_INFO);
                        break;
                case 'x':
                        logmask |= LOG_MASK(LOG_DEBUG);
                        break;
                case 'c':
                        chkflag = CHK_CHECK;
                        break;
                case 'n':
                        chkflag = CHK_DONT;
                        break;
                case 'l':
                        if(optarg[0] == '-' && optarg[1] != 0)
                        {
                                fprintf(stderr, "logfile \"%s\" ??\n",
                                        optarg);
                                usage(av[0]);
                        }
                        /* else */
                        logfname = optarg;
                        break;
                case 'b':
                        baud = optarg;
                        break;
                case 'p':
                        parity = optarg;
                        break;
#if NET
                case 'P':
                        *((int *)&server_port) = atoi(optarg); /* cast away 
const */
                        if(server_port <= 0 || server_port > 65536)
                        {
                                fprintf(stderr, "invalid server_port %s\n",
                                        optarg);
                                usage(av[0]);
                        }
                        break;
                case 'T':
                        reset_secs = atoi(optarg);
                        if(reset_secs < 0)
                        {
                                fprintf(stderr, "invalid timeout %s\n",
                                        optarg);
                                usage(av[0]);
                        }
                        break;
#endif /* NET */
                case 'q':
                        pqfname = optarg;
                        break;
                case 'r':
                        rawfname = optarg;
                        break;
                case 'f':
                        {
                                feedtypet type;
                                type = atofeedtypet(optarg);
                                if(type != NONE)
                                {
                                        feedtype = type;
                                        if(!parity && !baud)
                                                setFeedDefaults(type);
                                }
                        }
                        break;
                case '?':
                        usage(av[0]);
                        break;
                }

        /* last arg, feedfname, is required */
        if(ac - optind <= 0)
                usage(av[0]);
        strncat(feedfname,av[optind], sizeof(feedfname) - 6 );

        (void) setulogmask(logmask);
        }


        /*
         * initialize logger
         */
        if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
                (void) fclose(stderr);
        logfd = openulog(ubasename(av[0]),
                (LOG_CONS|LOG_PID), LOG_LDM, logfname);
        unotice("Starting Up");
        udebug(version);

        if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
        {
                setbuf(fdopen(logfd, "a"), NULL);
        }       

        /*
         * register exit handler
         */
        if(atexit(cleanup) != 0)
        {
                serror("atexit");
                return 1;
        }

        /*
         * set up signal handlers
         */
        set_sigactions();

        /*
         * open the product queue, unless we were invoked as "feedtest"
         */
        if(strcmp(progname, "feedtest") != 0)
        {
                if(ready = pq_open(pqfname, PQ_DEFAULT, &pq))
                {
                        uerror("pq_open: \"%s\" failed: %s",
                                pqfname, strerror(ready));
                        return 1;
                }
        }

        /*
         * who am i, anyway
         */
        (void) strcpy(myname, ghostname());

        /*
         * open the feed
         */
        if(!(*feedfname == '-' && feedfname[1] == 0) && logfd != 0)
                (void) close(0);

        if(open_feed(feedfname, &ifd) != ENOERR)
                return 1;

        if (feedtype & HDS)
        {
                if(chkflag == CHK_CHECK
                                || (isatty(ifd) && chkflag != CHK_DONT))
                        setTheScanner(scan_wmo_binary_crc);
                else
                        setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == AFOS)
        {
                prod_stats = afos_stats;
                setTheScanner(scan_afos);
        }
        else if (feedtype == NMC2) /* CONDUIT */
        {
                setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == FAA604)
        {
                prod_stats = faa604_stats;
                if(chkflag == CHK_CHECK
                        || (isatty(ifd)
                                 && chkflag != CHK_DONT
                                 && parity != NULL
                                 && *parity != 'n')
                        )
                {
                        setTheScanner(scan_faa604_parity);
                }
                else
                {
                        setTheScanner(scan_faa604);
                }
        }
        else
        {
                if(chkflag == CHK_CHECK
                        || (isatty(ifd)
                                 && chkflag != CHK_DONT
                                 && parity != NULL
                                 && *parity != 'n')
                        )
                {
                        setTheScanner(scan_wmo_parity);
                }
                else
                {
                        setTheScanner(scan_wmo);
                }
        }

        /*
         * Allocate an MD5 context
         */
        md5ctxp = new_MD5_CTX();
        if(md5ctxp == NULL)
        {
                serror("new_md5_CTX failed");
                return 1;
        }


        /*
         * Main Loop
         */
        idle = 0;
        while(!done)
        {
#if NET
if (INPUT_IS_SOCKET)
{
                if (port_error)
                {
                        /*
                         * lost connection => close
                         */
                        if (ifd >= 0)
                        {
                                if(feed_close)
                                        (*feed_close)(ifd);
                                ifd = -1;
                        }
                        port_error = 0;
                        sleep (2);      /* allow things to settle down */
                        continue;
                }
}
#endif
                if(stats_req)
                {
                        unotice("Statistics Request"); 
                        if(pq != NULL)
                        {
                                off_t highwater = 0;
                                size_t maxregions = 0;
                                (void) pq_highwater(pq, &highwater,
                                         &maxregions);
                                unotice("  Queue usage (bytes):%8ld",
                                                        (long)highwater);
                                unotice("           (nregions):%8ld",
                                                        (long)maxregions);
                        }
                        unotice("       Idle: %8lu seconds", idle);
#if NET
if (INPUT_IS_SOCKET)
{
                        unotice("    Timeout: %8d", reset_secs);
}
#endif
                        unotice("%21s: %s", "Status",
                                (ifd < 0) ?
                                "Not connected or input not open." :
                                "Connected.");
                        (*prod_stats)();
                        (*feed_stats)();
                        stats_req = 0;
                }
#if NET
if (INPUT_IS_SOCKET)
{
                if (ifd < 0)
                {
                        /* Attempt reconnect */
                        static int retries = 0;
                        if (retries > MAX_RETRIES)
                        {
                                uerror ("maximum retry attempts %d, aborting",
                                        MAX_RETRIES);
                                done = !0;
                                continue;
                        }
                        /* Try to reopen on tcp read errors */
                        unotice("Trying to re-open connection on port %d", 
                                server_port);
                        ++retries;
                        if(open_feed(feedfname, &ifd) != ENOERR)
                        {
                                unotice ("sleeping %d seconds before retry %d",
                                         retries * RETRY_DELAY, retries+1);
                                sleep (retries * RETRY_DELAY);
                                continue;
                        }
                        retries = 0;
                }
}
#endif /* NET */
                timeo.tv_sec = 3;
                timeo.tv_usec = 0;
                FD_ZERO(&readfds);
                FD_ZERO(&exceptfds);
                FD_SET(ifd, &readfds);
                FD_SET(ifd, &exceptfds);
                width =  ifd + 1;
                ready = select(width, &readfds, 0, &exceptfds, &timeo);
                if(ready < 0 )
                {
                        /* handle EINTR as a special case */
                        if(errno == EINTR)
                        {
                                errno = 0;
                                continue;
                        }
                        serror("select");
                        return 1;
                }
                /* else */
#if 0
                if (FD_ISSET(ifd, &exceptfds))
                {
                        uerror("Exception on input fd %d, select returned %d",
                               ifd, ready);
                }
#endif
                if(ready > 0)
                {
                        /* do some work */
                        if(FD_ISSET(ifd, &readfds) || 
                           FD_ISSET(ifd, &exceptfds))
                        {
                                idle = 0;
                                if(feedTheXbuf(ifd) != ENOERR)
                                {
#if NET
if (INPUT_IS_SOCKET)
{
                                        port_error = !0;
                                        continue;
}                                       /* else */
#endif /* NET */
                                        done = !0;
                                }
                                FD_CLR(ifd, &readfds);
                                FD_CLR(ifd, &exceptfds);
                        }
                        else
                        {
                                uerror("select returned %d but ifd not set",
                                        ready);
                                idle += timeo.tv_sec;
                        }
                }
                else    /* ready == 0 */
                {
                        idle += timeo.tv_sec;
#if NET
if (INPUT_IS_SOCKET)
{
                        /* VOODOO
                         * This is necessary to stimulate
                         * 'Connection reset by peer'
                         * when the Portmaster goes down and comes
                         * back up.
                         */
                        static char zed[1] = {0};
                        if(write(ifd, zed, sizeof(zed)) < 0)
                        {
                                port_error = !0;
                                continue;
                        }

}
#endif
                }
#if NET
if (INPUT_IS_SOCKET)
{
                if ((reset_secs > 0) && (idle >= reset_secs))
                {
                        unotice("Idle for %ld seconds, reconnecting",
                                idle);
                        /* force reconnect */
                        port_error = !0;
                        idle = 0;
                        continue;
                }
}
#endif /* NET */
                (void) scanTheXbuf();
        }

        return 0;
}