[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Forwarded mail....



Chad,

UPC made some mods to pqing.c so there are PILS on the HDS feed.  The
amount of products that SSEC ingest into the IDD can reach about 30%, so
those products are missing the pils.  I'll attach the pqing.c code so you
can make the update and then SSEC  products will not miss the PILS. 

I installed the lastest SDI code last week, was impressed how easy the
upgrade went.   


Chiz noticed that some of the RCM binary products are
still being distributed under IDS instead of HDS.  Is there a fix for this
problem?

Robb...

===============================================================================
Robb Kambic                                Unidata Program Center
Software Engineer III                      Univ. Corp for Atmospheric Research
address@hidden             WWW: http://www.unidata.ucar.edu/
===============================================================================

---------- Forwarded message ----------
Date: Mon, 13 Mar 2000 09:44:52 -0700
From: Steve Chiswell <address@hidden>
To: address@hidden


SDUS40 PHFO is an RCM from Hawaii. Should be in HDS awith everything else.

Mar 13 16:35:54 notifyme[867256]:     1364 20000313162557.674     HDS 191
SDUS41 KRLX 131639 /pRCMRLX
Mar 13 16:35:54 notifyme[867256]:     1364 20000313162557.689     HDS 192
SDUS41 KBOX 131625 /pRCMBOX
Mar 13 16:35:54 notifyme[867256]:       59 20000313162601.218 IDS|DDPLUS 178
SDUS40 PHFO 131609 /pRCMHWA
Mar 13 16:35:54 notifyme[867256]:     1581 20000313162602.957     HDS 199
SDUS46 KPQR 131610 /pRCMRTX
Mar 13 16:35:54 notifyme[867256]:     1294 20000313162602.973     HDS 200
SDUS45 KREV 131623 /pRCMRGX

/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: pqing.c,v 1.69 2000/01/28 20:41:04 rkambic Exp $ */
static char version[] =
"$Revision: 1.69 $ built "__DATE__" "__TIME__;

#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <limits.h> /* PATH_MAX */
#ifndef PATH_MAX
#define PATH_MAX 255
#endif /* !PATH_MAX */
#include <sys/types.h>
#include <sys/time.h>
#include <rpc/rpc.h>
#include <errno.h>

#include "ulog.h"
#include "inetutil.h"
#include "ldm.h"
#include "feed.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "wmo_message.h"
#include "afos_message.h"
#include "faa604_message.h"
#include "paths.h"
#include "pq.h"
#include "md5.h"

#ifdef NO_ATEXIT
#include "atexit.h"
#endif

#if NET
#define RETRY_DELAY     (10)            /* delay factor between retries */
#define MAX_RETRIES     (30)
#endif /* NET */

/* set by command line */
char *baud = NULL;
char *rawfname = NULL;
char *parity = NULL;
enum { CHK_UNSET, CHK_CHECK, CHK_DONT} chkflag = CHK_UNSET;
static feedtypet feedtype = NONE; /* deduce from av[0]  */
static const char *progname = NULL;
static char *logfname = "";
static char feedfname[PATH_MAX];
static char myname[HOSTNAMESIZE];
static const char *pqfname = DEFAULT_QUEUE;
extern int usePil;  /* 1/0 flag to signal use of AFOS like pil identifier */

static pqueue *pq = NULL;
static int ifd = -1; 

static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;

static void (*prod_stats)(void) = wmo_stats;
static unsigned long ndups = 0;

static MD5_CTX *md5ctxp = NULL;

#if NET
static int port_error = 0;      /* indicate sigpipe condition was raised */
#       ifndef DEFAULT_RESET_SECS
#       define DEFAULT_RESET_SECS 600
#       endif
static int reset_secs = DEFAULT_RESET_SECS;
#endif

/*
 * called at exit
 */
static void
cleanup(void)
{
        unotice("Exiting"); 
        if(!intr)
        {
                /* We are not in the interrupt context */

                if(md5ctxp != NULL)
                {
                        free_MD5_CTX(md5ctxp);  
                }

                if(pq != NULL)
                {
                        off_t highwater = 0;
                        size_t maxregions = 0;
                        (void) pq_highwater(pq, &highwater, &maxregions);
                        (void) pq_close(pq);
                        pq = NULL;

                        if(feed_close)
                                (*feed_close)(ifd);
                        ifd = -1;
                        unotice("  Queue usage (bytes):%8ld",
                                                (long)highwater);
                        unotice("           (nregions):%8ld",
                                                (long)maxregions);
                        unotice("  Duplicates rejected:%8lu", ndups);
                }
                (*prod_stats)();
                (*feed_stats)();
        }
        (void) closeulog();
}


/*
 * called upon receipt of signals
 */
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
        /* 
         * Some systems reset handler to SIG_DFL upon entry to handler.
         * In that case, we reregister our handler.
         */
        (void) signal(sig, signal_handler);
#endif
        switch(sig) {
        case SIGINT :
                unotice("Interrupt");
                intr = !0;
                exit(0);
        case SIGTERM :
                udebug("SIGTERM");
                done = !0;
                return;
        case SIGPIPE :
#if NET
                if(INPUT_IS_SOCKET)
                {
                        unotice("SIGPIPE");
                        port_error = !0;
                }
#endif
                return;
        case SIGUSR1 :
                udebug("SIGUSR1");
                stats_req = !0;
                return;
        case SIGUSR2 :
                udebug("SIGUSR2");
                if (toggleulogpri(LOG_INFO))
                        unotice("Going verbose");
                else
                        unotice("Going silent");
                return;
        }
        udebug("signal_handler: unhandled signal: %d", sig);
}


/*
 * register the signal_handler
 */
static void
set_sigactions(void)
{
        struct sigaction sigact;

        sigemptyset(&sigact.sa_mask);
        sigact.sa_flags = 0;

        /* Ignore these */
        sigact.sa_handler = SIG_IGN;
        (void) sigaction(SIGHUP, &sigact, NULL);
        (void) sigaction(SIGALRM, &sigact, NULL);
        (void) sigaction(SIGCHLD, &sigact, NULL);

        /* Handle these */
#ifdef SA_RESTART       /* SVR4, 4.3+ BSD */
        /* usually, restart system calls */
        sigact.sa_flags |= SA_RESTART;
#endif
        sigact.sa_handler = signal_handler;
        (void) sigaction(SIGTERM, &sigact, NULL);
        (void) sigaction(SIGUSR1, &sigact, NULL);
        (void) sigaction(SIGUSR2, &sigact, NULL);

        /* Don't restart after interrupt */
        sigact.sa_flags = 0;
#ifdef SA_INTERRUPT     /* SunOS 4.x */
        sigact.sa_flags |= SA_INTERRUPT;
#endif
        (void) sigaction(SIGINT, &sigact, NULL);
        (void) sigaction(SIGPIPE, &sigact, NULL);
}


static void
usage(
        char *av0 /*  id string */
)
{
        (void)fprintf(stderr,
                "Usage: %s [options] feedname\t\nOptions:\n", av0);
        (void)fprintf(stderr,
                "\t-v           Verbose, tell me about each product\n");
        (void)fprintf(stderr,
                "\t-r rawfile   Stash 'raw' data in \"rawfile\"\n");
        (void)fprintf(stderr,
                "\t-l logfile   Default logs to syslogd\n");
        (void)fprintf(stderr,
                "\t-f type      Claim to be feedtype \"type\", one of \"hds\", 
\"ddplus\", ...\n");
        (void)fprintf(stderr,
                "\t-b baud      Set baudrate for tty feed.\n");
        (void)fprintf(stderr,
                "\t-q queue     default \"%s\"\n", DEFAULT_QUEUE);
        (void)fprintf(stderr,
                "\t-p [even|odd|none]  Set parity for tty feed.\n");
        (void)fprintf(stderr,
                "\t-c           Enable checksum or parity check on non tty 
feed\n");
        (void)fprintf(stderr,
                "\t-n           Disable checksum or parity check on tty 
feed\n");
        (void)fprintf(stderr,
                "\t-i           Do not include a PIL-like /p identifier for 
suitable products\n");
#if NET
        (void)fprintf(stderr,
                "\t-P port      Get input via TCP connection to host 
\"feedname\" at \"port\"\n");
        (void)fprintf(stderr,
                "\t-T timeout   Idle timeout before TCP reconnect, in 
seconds\n");
        (void)fprintf(stderr,
                "\t             (defaults to %d, 0 disables timeout)\n",
                        DEFAULT_RESET_SECS);
#endif
        exit(1);
}


void
toClients(timestampt arrival,
        unsigned seqno,
        const char *ident,
        unsigned len,
        const char *buf)
{
        static struct product prod;
        int status;

        prod.info.arrival = arrival;
        MD5Init(md5ctxp);
        MD5Update(md5ctxp, (const unsigned char *)buf, len);
        MD5Final(prod.info.signature, md5ctxp);
        prod.info.origin = myname;
        prod.info.feedtype = feedtype;
        prod.info.seqno = seqno;
        prod.info.ident = (char *)ident; /* cast away const */
        prod.info.sz = len;
        prod.data = (void *)buf; /* cast away const */

        if(ulogIsVerbose())
                uinfo("%s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug()));

        if(pq == NULL)          /* if we are "feedtest", do nothing else */
                return;

        status = pq_insert(pq, &prod);
        if(status == ENOERR)
                return; /* Normal return */

        /* else */
        if(status == PQUEUE_DUP)
        {
                ndups++;
                uinfo("Product already in queue");
                return;
        }

        /* else, error */
        uerror("pq_insert: %s\n",
                status > 0 ? strerror(status) : "Internal error");
        exit(1); /* ??? */
}


static void
setFeedDefaults(feedtypet type)
{
        /* set up defaults for feed according to type */
        switch (type) {
        case DDPLUS :
                baud = "19200";
                parity = "even";
                break;
        case PPS :
        case DDS :
        case IDS :
                baud = "9600";
                parity = "even";
                break;
        case HDS :
                baud = "19200";
                parity = "none";
                break;
        case AFOS :
                baud = "4800"; /* ??? */
                parity = "none";
                break;
        case FAA604 :
                baud = "1200";
                parity = "even";
                break;
        }
}


static feedtypet 
whatami(const char *av0)
{
        feedtypet type;
#define SEP     '/' /* separates components of path */
        /* strip off leading path */
        if ((progname = strrchr(av0, SEP)) == NULL)
                progname = av0;
        else
            progname++;
        
        type = atofeedtypet(progname);
        if(type == NONE)
                type = WMO; /* default for wmo ingestd */
        setFeedDefaults(type);
        return type;    
}


int
main(int ac, char *av[])
{

        int logfd;
        int width;
        int ready;
        unsigned long idle;
        fd_set readfds;
        fd_set exceptfds;
        struct timeval timeo;

        feedtype = whatami(av[0]);

        /*
         * Check the environment for some options.
         * May be overridden by command line switches below.
         */
        {
                const char *ldmpqfname = getenv("LDMPQFNAME");
                if(ldmpqfname != NULL)
                        pqfname = ldmpqfname;
        }

        {
        extern int optind;
        extern int opterr;
        extern char *optarg;
        int ch;
        int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));

        opterr = 1;
        usePil = 1;

        while ((ch = getopt(ac, av, "vxcnil:b:p:P:T:q:r:f:")) != EOF)
                switch (ch) {
                case 'v':
                        logmask |= LOG_MASK(LOG_INFO);
                        break;
                case 'x':
                        logmask |= LOG_MASK(LOG_DEBUG);
                        break;
                case 'c':
                        chkflag = CHK_CHECK;
                        break;
                case 'n':
                        chkflag = CHK_DONT;
                        break;
                case 'i':
                        usePil = 0;
                        break;
                case 'l':
                        if(optarg[0] == '-' && optarg[1] != 0)
                        {
                                fprintf(stderr, "logfile \"%s\" ??\n",
                                        optarg);
                                usage(av[0]);
                        }
                        /* else */
                        logfname = optarg;
                        break;
                case 'b':
                        baud = optarg;
                        break;
                case 'p':
                        parity = optarg;
                        break;
#if NET
                case 'P':
                        *((int *)&server_port) = atoi(optarg); /* cast away 
const */
                        if(server_port <= 0 || server_port > 65536)
                        {
                                fprintf(stderr, "invalid server_port %s\n",
                                        optarg);
                                usage(av[0]);
                        }
                        break;
                case 'T':
                        reset_secs = atoi(optarg);
                        if(reset_secs < 0)
                        {
                                fprintf(stderr, "invalid timeout %s\n",
                                        optarg);
                                usage(av[0]);
                        }
                        break;
#endif /* NET */
                case 'q':
                        pqfname = optarg;
                        break;
                case 'r':
                        rawfname = optarg;
                        break;
                case 'f':
                        {
                                feedtypet type;
                                type = atofeedtypet(optarg);
                                if(type != NONE)
                                {
                                        feedtype = type;
                                        if(!parity && !baud)
                                                setFeedDefaults(type);
                                }
                        }
                        break;
                case '?':
                        usage(av[0]);
                        break;
                }

        /* last arg, feedfname, is required */
        if(ac - optind <= 0)
                usage(av[0]);
        strncat(feedfname,av[optind], sizeof(feedfname) - 6 );

        (void) setulogmask(logmask);
        }


        /*
         * initialize logger
         */
        if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
                (void) fclose(stderr);
        logfd = openulog(ubasename(av[0]),
                (LOG_CONS|LOG_PID), LOG_LDM, logfname);
        unotice("Starting Up");
        udebug(version);

        if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
        {
                setbuf(fdopen(logfd, "a"), NULL);
        }       

        /*
         * register exit handler
         */
        if(atexit(cleanup) != 0)
        {
                serror("atexit");
                return 1;
        }

        /*
         * set up signal handlers
         */
        set_sigactions();

        /*
         * open the product queue, unless we were invoked as "feedtest"
         */
        if(strcmp(progname, "feedtest") != 0)
        {
                if(ready = pq_open(pqfname, PQ_DEFAULT, &pq))
                {
                        uerror("pq_open: \"%s\" failed: %s",
                                pqfname, strerror(ready));
                        return 1;
                }
        }

        /*
         * who am i, anyway
         */
        (void) strcpy(myname, ghostname());

        /*
         * open the feed
         */
        if(!(*feedfname == '-' && feedfname[1] == 0) && logfd != 0)
                (void) close(0);

        if(open_feed(feedfname, &ifd) != ENOERR)
                return 1;

        if(usePil == 1)
           {
           if ((feedtype & DDS)||(feedtype & PPS)||(feedtype & IDS)||
                (feedtype & HRS))
              {
              usePil = 1;
              uinfo("Creating AFOS-like pil tags\0");
              }
           else
              {
              usePil = 0;
              }
           }

        if (feedtype & HDS)
        {
                if(chkflag == CHK_CHECK
                                || (isatty(ifd) && chkflag != CHK_DONT))
                        setTheScanner(scan_wmo_binary_crc);
                else
                        setTheScanner(scan_wmo_binary);
        }
        else if (feedtype & NMC)
        {
                setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == AFOS)
        {
                prod_stats = afos_stats;
                setTheScanner(scan_afos);
        }
        else if (feedtype == FAA604)
        {
                prod_stats = faa604_stats;
                if(chkflag == CHK_CHECK
                        || (isatty(ifd)
                                 && chkflag != CHK_DONT
                                 && parity != NULL
                                 && *parity != 'n')
                        )
                {
                        setTheScanner(scan_faa604_parity);
                }
                else
                {
                        setTheScanner(scan_faa604);
                }
        }
        else
        {
                if(chkflag == CHK_CHECK
                        || (isatty(ifd)
                                 && chkflag != CHK_DONT
                                 && parity != NULL
                                 && *parity != 'n')
                        )
                {
                        setTheScanner(scan_wmo_parity);
                }
                else
                {
                        setTheScanner(scan_wmo);
                }
        }

        /*
         * Allocate an MD5 context
         */
        md5ctxp = new_MD5_CTX();
        if(md5ctxp == NULL)
        {
                serror("new_md5_CTX failed");
                return 1;
        }


        /*
         * Main Loop
         */
        idle = 0;
        while(!done)
        {
#if NET
if (INPUT_IS_SOCKET)
{
                if (port_error)
                {
                        /*
                         * lost connection => close
                         */
                        if (ifd >= 0)
                        {
                                if(feed_close)
                                        (*feed_close)(ifd);
                                ifd = -1;
                        }
                        port_error = 0;
                        sleep (2);      /* allow things to settle down */
                        continue;
                }
}
#endif
                if(stats_req)
                {
                        unotice("Statistics Request"); 
                        if(pq != NULL)
                        {
                                off_t highwater = 0;
                                size_t maxregions = 0;
                                (void) pq_highwater(pq, &highwater,
                                         &maxregions);
                                unotice("  Queue usage (bytes):%8ld",
                                                        (long)highwater);
                                unotice("           (nregions):%8ld",
                                                        (long)maxregions);
                        }
                        unotice("       Idle: %8lu seconds", idle);
#if NET
if (INPUT_IS_SOCKET)
{
                        unotice("    Timeout: %8d", reset_secs);
}
#endif
                        unotice("%21s: %s", "Status",
                                (ifd < 0) ?
                                "Not connected or input not open." :
                                "Connected.");
                        (*prod_stats)();
                        (*feed_stats)();
                        stats_req = 0;
                }
#if NET
if (INPUT_IS_SOCKET)
{
                if (ifd < 0)
                {
                        /* Attempt reconnect */
                        static int retries = 0;
                        if (retries > MAX_RETRIES)
                        {
                                uerror ("maximum retry attempts %d, aborting",
                                        MAX_RETRIES);
                                done = !0;
                                continue;
                        }
                        /* Try to reopen on tcp read errors */
                        unotice("Trying to re-open connection on port %d", 
                                server_port);
                        ++retries;
                        if(open_feed(feedfname, &ifd) != ENOERR)
                        {
                                unotice ("sleeping %d seconds before retry %d",
                                         retries * RETRY_DELAY, retries+1);
                                sleep (retries * RETRY_DELAY);
                                continue;
                        }
                        retries = 0;
                }
}
#endif /* NET */
                timeo.tv_sec = 3;
                timeo.tv_usec = 0;
                FD_ZERO(&readfds);
                FD_ZERO(&exceptfds);
                FD_SET(ifd, &readfds);
                FD_SET(ifd, &exceptfds);
                width =  ifd + 1;
                ready = select(width, &readfds, 0, &exceptfds, &timeo);
                if(ready < 0 )
                {
                        /* handle EINTR as a special case */
                        if(errno == EINTR)
                        {
                                errno = 0;
                                continue;
                        }
                        serror("select");
                        return 1;
                }
                /* else */
#if 0
                if (FD_ISSET(ifd, &exceptfds))
                {
                        uerror("Exception on input fd %d, select returned %d",
                               ifd, ready);
                }
#endif
                if(ready > 0)
                {
                        /* do some work */
                        if(FD_ISSET(ifd, &readfds) || 
                           FD_ISSET(ifd, &exceptfds))
                        {
                                idle = 0;
                                if(feedTheXbuf(ifd) != ENOERR)
                                {
#if NET
if (INPUT_IS_SOCKET)
{
                                        port_error = !0;
                                        continue;
}                                       /* else */
#endif /* NET */
                                        done = !0;
                                }
                                FD_CLR(ifd, &readfds);
                                FD_CLR(ifd, &exceptfds);
                        }
                        else
                        {
                                uerror("select returned %d but ifd not set",
                                        ready);
                                idle += timeo.tv_sec;
                        }
                }
                else    /* ready == 0 */
                {
                        idle += timeo.tv_sec;
#if NET
if (INPUT_IS_SOCKET)
{
                        /* VOODOO
                         * This is necessary to stimulate
                         * 'Connection reset by peer'
                         * when the Portmaster goes down and comes
                         * back up.
                         */
                        static char zed[1] = {0};
                        if(write(ifd, zed, sizeof(zed)) < 0)
                        {
                                port_error = !0;
                                continue;
                        }

}
#endif
                }
#if NET
if (INPUT_IS_SOCKET)
{
                if ((reset_secs > 0) && (idle >= reset_secs))
                {
                        unotice("Idle for %ld seconds, reconnecting",
                                idle);
                        /* force reconnect */
                        port_error = !0;
                        idle = 0;
                        continue;
                }
}
#endif /* NET */
                (void) scanTheXbuf();
        }

        return 0;
}