[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: FXUS43 continued



Greg,

I didn't sent the lastest code, here's the code that will work.  Use these
files in the src/pqsurf  directory.


Robb...

Output:

zero.unidata.ucar.edu.rkambic> m 00030617.wmo
METAR^M
KHOX 1720Z 061719Z AUTO 20012KT 16/00 A2982 RMK A01 T0161 PCPN 000=^M
METAR^M
KCBK 1729Z 061726Z AUTO 18016G26KT 16/M01 A2979 RMK A01 T0161 PCPN 000=^M
METAR^M
KCBK 1744Z 061743Z AUTO 20017G24KT 17/01 A2979 RMK A01 T0167 PCPN 000=^M
METAR^M
KHOX 1750Z 061749Z AUTO 18012KT 18/M02 A2983 RMK A01 T0178 PCPN 000=^M
METAR^M
KTRB 1756Z 061752Z 22016KT 18/M02 RMK AO1 T01831017=^M
METAR^M
KYMA 1757Z 061753Z 23008KT 14/M06 RMK AO1 T01441061=^M
METAR^M
KGNL 1758Z 061758Z AUTO 27008KT 18/01 A2988 RMK A01 T01800010 PCPN000=^M
METAR^M
KNRN 1758Z 061758Z AUTO 16010KT 17/00 A2986 RMK A01 T017000000 PCPN000=^M
METAR^M
KSTR 1758Z 061758Z AUTO 14012KT 16/M01 A2978 RMK A01 T01601010 PCPN000=^M
METAR^M
KCBK 1759Z 061758Z AUTO 20019KT 17/01 A2980 RMK A01 T0172 PCPN 000=^M





On Fri, 3 Mar 2000, Greg Thompson wrote:

> 
> Robb,
> 
> thanks so much for helping with the FXUS43 issue.  I compiled your latest
> code into our version of pqsurf but I'm still not getting any data with
> those headers into my "metar_gdb" files.  Here's what I'm using in the
> ldmd.conf file:
> 
> exec    "pqsurf -l /data/ldm/logs/pqsurf.log -f WMO -p ^(S[AP]|FXUS43) -Q 
> /data/ldm...
> 
> and here's the important line from pqsurf.conf:
> 
> WMO ^metar (....) ([0-3][0-9])([0-2][0-9])  DBFILE  /data/ldm/ddp........
> 
> Also, there are new METARs coming out of Cheyenne, WY and look like:
> 
> 344 ^M^M
> SAUS45 KCYS 032002^M^M
> MTRCTD^M^M
> METAR KCTD 031945Z AUTO 23010G13KT 07/M06 RMK AO1 RH% = 42 ^M^M
> ^M^M
> ^M^M
> ^M^M
> ^C
> 
> Even though these clearly have "SA" as their starting header, they are
> also not appearing in my "metar_gdb" files.  I'd very much like to get
> these into our system appropriately.  Can you help?
> 
> Thanks!
> 
> -- 
> +--------------------------------------------------------------+
> | Greg Thompson       http://www.rap.ucar.edu/staff/gthompsn/  |
> |                     Research Applications Program            |
> | (303) 497-2805      National Center for Atmospheric Research |
> |    (fax) -8401      P.O. Box 3000  Boulder, CO 80307-3000    |
> +--------------------------------------------------------------+
> 

===============================================================================
Robb Kambic                                Unidata Program Center
Software Engineer III                      Univ. Corp for Atmospheric Research
address@hidden             WWW: http://www.unidata.ucar.edu/
===============================================================================
/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: wmo_header.h,v 1.11 1999/06/03 21:13:26 rkambic Exp $ */
#ifndef _WMO_HEADER_H_
#define _WMO_HEADER_H_

#include "xbuf.h"
#include "dtime.h"

typedef enum {
        MESSAGE_TYPE_UNKNOWN = 0 ,
        SYNOP,  /* FM 12 */
        SHIP,   /* FM 13 */
        METAR,  /* FM 15 or SAO!! */
        SPECI   /* FM 16 */
} message_type_t;


typedef enum {
        ORIGINAL = 0, 
        RTD ,   /* delayed */
        COR ,   /* correction */
        AMD ,   /* amended */
        PIE     /* pieces */
} retransmit_t;


/* abbreviated heading, 2.3.2 */
typedef struct {
        char TT[3];     /* Tsub1Tsub2 : Data type and/or form */
        char AA[3];     /* Asub1Asub2 : Geograph. and/or time */
        int ii;
        char CCCC[5];  /* station of origin or compilation */
        char PIL[10];
        dtime *time;
        retransmit_t retransmit; /* BBB delay, correction or amendment ind */
        int retrans_seq; /* the sequence from BBB */
} wmo_header_t;

extern void free_wmo_header(wmo_header_t *hdr);
extern wmo_header_t *new_wmo_header(char **line);
extern wmo_header_t *get_wmo_header(xbuf *buf, wmo_header_t *hdr);

extern char *
s_wmo_header(wmo_header_t *hdr);

/* 
extern int
fprint_wmo_header(FILE *fp , const wmo_header_t *hdr);
*/

extern char *sRetransmit(wmo_header_t *hdr );
extern char *sMessage_type(message_type_t type);
extern message_type_t decode_type(char *tt, char *aa, char *PIL);

#endif /* _WMO_HEADER_H_ */
/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: wmo_header.c,v 1.30 1999/06/21 22:15:21 rkambic Exp $ */
#include <stdio.h>
#include <string.h>
#include "wmo_header.h"
#include "alloc.h"
#include "tokens.h"
#include "xbuf.h"

int usePil=0;


void
free_wmo_header(wmo_header_t *hdr)
{
        if(hdr == NULL) return;
        free_dtime(hdr->time);
        free(hdr);
}


static void
clear_wmo_header(wmo_header_t *hdr)
{
        hdr->TT[0]  = hdr->AA[0] = hdr->ii = 
                hdr->CCCC[0] = hdr->PIL[0] = 0;
        hdr->retransmit = ORIGINAL;
        hdr->retrans_seq = MESSAGE_TYPE_UNKNOWN;
        clear_dtime(hdr->time);
}


wmo_header_t *
get_wmo_header(xbuf *buf, wmo_header_t *hdr)
{

        clear_wmo_header(hdr);

        if( get_wstr(buf, hdr->TT, 2) == EOB ) return NULL;
        if( get_wstr(buf, hdr->AA, 2) == EOB ) return NULL;
        if( get_wnum(buf, &hdr->ii, 2) == EOB ) return NULL;
        if(hdr->ii == -1) hdr->ii = 0;
        if( get_wstr(buf, hdr->CCCC, 4) == EOB ) return NULL;

        {
        int YY, GG, gg;
        if( get_wnum(buf, &YY, 2) == EOB ) return NULL;
        if( get_wnum(buf, &GG, 2) == EOB ) return NULL;
        if( get_wnum(buf, &gg, 2) == EOB ) return NULL;
        /* uses current time on parse errors */
        hdr->time = set_dtime(hdr->time, YY,GG,gg);
        }


        /* decode BBB feild */
        { /* inline */
        char line[16];
        char pilstr[16];
        int allnum, nonalph, ich;
        
        if( get_wline(buf, line, sizeof(line)) == EOB ) return NULL;

        { /* inline inner */
        const char *cp = &line[0];
        const char *const end = &line[strlen(line)];
        

        /* N.B. twisted flow here */
        for(cp = &line[0]; *cp != 0 && *cp != CR && cp + 2 < end; cp++)
        {
                switch( cp[0] ) {
                case 'R' :
                        if(cp[1] == 'R')
                        {
                                hdr->retransmit = RTD;          
                                hdr->retrans_seq = cp[2];               
                        }
                        else if(cp[1] == 'T')
                        {
                                hdr->retransmit = RTD;          
                        }
                        break;
                case 'C' :
                        if(cp[1] == 'C')
                        {
                                hdr->retransmit = COR;          
                                hdr->retrans_seq = cp[2];               
                        }
                        else if(cp[1] == 'O')
                        {
                                hdr->retransmit = COR;          
                        }
                        break;
                case 'A' :
                        if(cp[1] == 'A')
                        {
                                hdr->retransmit = AMD;          
                                hdr->retrans_seq = cp[2];               
                        }
                        else if(cp[1] == 'M')
                        {
                                hdr->retransmit = AMD;          
                        }
                        break;
                case 'P' :
                        hdr->retransmit = PIE;          
                        hdr->retrans_seq = (cp[1] - 'A') * 26
                                 + (cp[2] - 'A');
                        goto done;
                case 0 :
                        break;
                default :
                        continue; /* loop, look some more */
                }
                /* only arrrive here if we got an acceptable BBB string */
                if(hdr->retrans_seq == 0)
                {
                        int tmp;
                        /* see if they used old style, eg RTD01 */
                        cp += 3;
                        tmp = atoi(cp);
                        /* Attachment II-14 */
                        if(tmp >= 0 && tmp < 24 )
                                hdr->retrans_seq = tmp + 'A';
                        else
                                hdr->retrans_seq = 'Y';
                }
done:
                break;
        } /* end for */
        } /* end inline inner */
        if((usePil == 1) && ( get_wstr(buf, pilstr, sizeof(pilstr)) != EOB ))
           {
           if((pilstr[0] != NULL) && ( get_wline(buf, line, sizeof(line)) != 
EOB ))
              {
              if((line[0] == NULL)&&(strlen(pilstr) > 4)&&(strlen(pilstr) < 7))
                 {
                 nonalph = 0; allnum = 1;
                 for(ich = 0;ich < strlen(pilstr);ich++)
                    {
                    if(isalnum(pilstr[ich]) == 0) nonalph = 1;
                    if(isdigit(pilstr[ich]) == 0) allnum = 0;
                    }
                 if((nonalph == 0)&&(allnum == 0)) sprintf(hdr->PIL," 
/p%s\0",pilstr);
                 }
              else if((line[0] == NULL)&&(strncmp(pilstr,"^NMC",4) == 0)&&
                 (strlen(pilstr+4) > 4)&&(strlen(pilstr+4) < 7))
                 sprintf(hdr->PIL," /p%s\0",pilstr+4);
              
              }
           }
        
        } /* end inline */

        return hdr;
}


char *
sRetransmit(wmo_header_t *hdr)
{
        static char buf[4];
        int seq = hdr->retrans_seq;
        if(hdr->retransmit == PIE)
        {
                if(seq < 0 || seq > 675) /* 26 * 26 -1 */
                        seq = 675;
        }
        else if(seq < 'A' || seq > 'Z')
                seq = 0;
        switch ( hdr->retransmit ) {
        case RTD :
                if(seq)
                {
                        sprintf(buf,"RR%c", seq);
                        return buf;
                }
                /* else */
                return "RTD";
        case COR :
                if(seq)
                {
                        sprintf(buf,"CC%c", seq);
                        return buf;
                }
                /* else */
                return "COR";
        case AMD :
                if(seq)
                {
                        sprintf(buf,"AA%c", seq);
                        return buf;
                }
                /* else */
                return "AMD";
        case PIE :
                {
                        const char c1 = seq/26 + 'A';
                        const char c2 = seq%26 + 'A';
                        sprintf(buf,"P%c%c", c1, c2);
                        return buf;
                }
        }
        /* default */
        return NULL;
}


char *
s_wmo_header(wmo_header_t *hdr)
{
#ifndef KEYSIZE
#define KEYSIZE 255
#endif
        static char sp[KEYSIZE+1];
        char *cp = sp;

        (void) memset(sp,0,sizeof(sp));

        sprintf(cp,
                "%2s%2s%02d %4s",
                hdr->TT, hdr->AA, hdr->ii, hdr->CCCC);
        cp += 11;
        if(hdr->time != NULL)
        {
                sprintf(cp,
                        " %02d%02d%02d",
                        hdr->time->mday, hdr->time->hour,
                        hdr->time->min );
        } 
        else
        {
                sprintf(cp, " DDHHMM");
        }
        cp += 7;

        if(hdr->retransmit != ORIGINAL)
                sprintf(cp, " %s", sRetransmit(hdr));

        if(hdr->PIL[0] != NULL)
           strcat(sp,hdr->PIL);

        return sp;
}


#if 0
int
fprint_wmo_header(FILE *fp, const wmo_header_t *hdr)
{
#if 1
        fprintf(fp,
                "%2s%2s%02d %4s",
                hdr->TT, hdr->AA, hdr->ii, hdr->CCCC);
        if(hdr->time != NULL)
        {
                fprintf(fp,
                        " %02d%02d%02d",
                        hdr->time->mday, hdr->time->hour,
                        hdr->time->min );
        } 
        else
        {
                
                fprintf(fp, " DDHHMM");
        }

        if(hdr->retransmit != ORIGINAL)
                fprintf(fp, " %s", sRetransmit(hdr));
#else
        fputs( s_wmo_header(hdr) , fp );
#endif

        return ferror(fp);
}
#endif


#if USED
static void 
clear_wmo_header(wmo_header_t *hdr)
{
        hdr->TT[0]  = hdr->AA[0] = hdr->ii = 
                hdr->CCCC[0] = hdr->retransmit = hdr->retrans_seq  = 0;
        free_dtime(hdr->time);
        hdr->time = NULL;
}
#endif /* USED */


char *
sMessage_type(message_type_t type)
{
        switch(type) {
        case SYNOP : return "SYNOP";
        case SHIP : return "SHIP";
        case METAR : return "METAR";
        case SPECI : return "SPECI";
        case MESSAGE_TYPE_UNKNOWN : return "MESSAGE_TYPE_UNKNOWN";
        }
        /* default */
        return NULL;
}


message_type_t
decode_type(char *tt, char *aa, char *pil)
{
        if( pil[0] == '/' && pil[1] == 'p' && pil[2] == 'M' && pil[3] == 'T')
        {
                if( pil[4] == 'R' || pil[4] == 'T')
                {
                        uerror("HDR + PIL: %s%s %s", tt, aa, pil ) ;
                        return METAR; 
                }
        }
        if( tt[0] == 'S' || tt[0] == 'F')
        {
                /* Table B1 */
                switch( tt[1] ) {
                case 'I' :
                case 'M' :
                case 'N' :
                        if(aa[0] == 'W' || aa[0] == 'V' )
                        { 
                                /* table C2 */
                                switch( aa[1] ) {
                                case 'A' :
                                case 'B' :
                                case 'C' :
                                case 'D' :
                                case 'E' :
                                case 'F' :
                                case 'J' :
                                case 'X' :
                                        return SHIP;
                                }
                                /* else */
                        }
                        /* else */
                        return SYNOP;
                case 'A' :
                        return METAR; /* might be an 'sao' */
                case 'X' :
                        if(*aa == 'U' && *(aa +1) == 'S')
                                return METAR; /* 'sao' */
                        break;
                case 'P' :
                        return SPECI;
                }
                /* else */
        }
        /* else */
        return MESSAGE_TYPE_UNKNOWN;
        
}
/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: surf_split.c,v 1.30 1999/12/02 23:21:26 rkambic Exp $   */

#include <ldmconfig.h>
#include <stdio.h>
#include <ctype.h>
#include <string.h>
#include "ldm.h"
#include "ulog.h"
#include "wmo_header.h"
#include "tokens.h"
#include "xbuf.h"
#include "surface.h" /* wind_units_t, CALL_SIGN_LEN */

#include "md5.h"

static double md5ctx[16]; /* 88 would be big enough */
static MD5_CTX *md5ctxp = (MD5_CTX *)md5ctx;

static int
get_yygg(xbuf *buf, dtime *time)
{
        int status;
        int YY = -1;
        int GG = -1;

        if((status = dget_wnum(buf, &YY, 2)) < 0) return status;
        if((status = dget_num(buf, &GG, 2)) < 0) return status;

        set_dtime(time, YY, GG, 0);
        return status;
}

/* For METAR, check if the HHMMZ time string is present */
static int
whas_yyggZ(xbuf *buf)
{
        int ch;

        /* skip white space */
        do{
                ch = nextc(buf);
        }while((isascii(ch) && !isgraph(ch)));
        unnextc(buf,ch);

        if(buf->cnt < 5)
                return 0; /* not enough characters */
        if(buf->get[4] != 'Z'
                         || !isdigit(buf->get[3])
                         || !isdigit(buf->get[2])
                         || !isdigit(buf->get[1])
                         || !isdigit(buf->get[0]))
                return 0;
        return 1; /* passed */
}

/* For METAR, check if "NIL" */
static int
has_NIL(xbuf *buf)
{
        char nilstr[]  = "NIL";
        char *np = (char *)&buf->base[buf->bufsiz - 1 - (sizeof(nilstr) -1 -1)];

        if(strncmp(np, nilstr, sizeof(nilstr) -1) == 0)
                return 1;
        return 0;
}

/* For METAR, get the bulletin time, if possible */
static void
get_wyyggZ(xbuf *buf, dtime *time)
{
        int ch;
        if(!whas_yyggZ(buf))
                return;
        (void)get_yygg(buf, time);
        ch = nextc(buf); /* eat the 'Z' */
        return;
}


/*
 *      Takes a WMO format product which is a
 *   SAO, SYNOP, SHIP, METAR, or SPECI message, splits it into
 *   individual observations. The observations are each encapsulated in a
 *   new product which inherits most of its description from the
 *   original product.
 *  The new product pkey is derived from the observation type
 *   and has the following form:
 *
 *              SAO -   "sao tt ccc ddhhmm"
 *                      where:
 *                              tt is SA, SP or RS 
 *                              ccc is the station ID like SFO, LXV, etc
 *                              ddhhmm is the time stamp.
 *
 *              SYNOP - "aaxx nnnnn ddhhmm"
 *                      where:
 *                              nnnnn is the WMO station id (5 digit number)
 *
 *              SHIP -  "bbxx c* ddhhmm"
 *                      where:
 *                              c* is the call sign
 *
 *              METAR - "metar cccc ddhhmm"
 *                      where:
 *                              cccc is the call sign
 *
 *              SPECI - "speci cccc ddhhmm" 
 *
 *  The new product sequence number is original sequence number times 1000
 *   plus the sequence of the individual observation within the product.
 *
 *      'doit' is called on each of the new products. It is presumed
 * this function return  zero upon success.
 * 
 *  Returns the number of successful calls to 'doit', eg, the
 *  number of splits. Returns -1 on error.
 */
int
surf_split(const prod_info *infop, const void *datap,
                int (*doit)(const prod_info *, const void *))
{
        int action = -1;
        wmo_header_t hdr;
        message_type_t mtype;
        dtime dt;
        xbuf buf[1];
        unsigned char dbuf[8192]; /* TODO */
        int nsplit = 0;

        enum {
                SURFACE_BOGUS ,
                AAXX,
                US_AAXX,
                BBXX,
                SAO,
                sMETAR,
                sSPECI
        } subtype = SURFACE_BOGUS;

        hdr.time = &dt;

        if(infop->sz > sizeof(dbuf))
                return -1; /* TODO: too big */

        memcpy(dbuf, datap, infop->sz);

        if( cbuftoxbuf(buf, (unsigned char *)dbuf,
                        infop->sz) == NULL)
                return -1;
        
        skipline(buf, 4); /* SOH */
        skipline(buf, 12); /* start */

        if( get_wmo_header(buf, &hdr) == NULL)
        {
                return -1;
        } 
#if DEBUG
        fputs("\t", stderr);
        fprint_wmo_header(stderr, &hdr);
        fputs("\n", stderr);
#endif

        mtype = decode_type(hdr.TT,hdr.AA,hdr.PIL);
        
        /* #### */
        {
        char cbuf[8];
        int digit;
        dtime time;
        wind_units_t wind_units = WIND_UNAVAIL;

        time = *hdr.time; /* default the ob time to the time in the header */

        /* delve into section 0 */

        switch(mtype) {
        case SYNOP :
                if(get_wstr(buf, cbuf, 1) < 0 ) return -1;
                if(cbuf[0] == 'A')
                {
                        subtype = AAXX;
                        if(get_str(buf, &cbuf[1], 3) < 0 ) return -1;
                        if( cbuf[3] != 'X' )
                        {
                                /* punt */
                                uerror("surface_split: Unknown type: %s\n", 
cbuf);
                                return 0;
                        }
                        if(get_yygg(buf, &time) < 0 ) return -1; /* YYGG */
                        if(dget_num(buf, &digit, 1) < 0 ) return -1; /* isubw */
                        if(digit >= 0 && digit <= 4) wind_units = 
(wind_units_t)digit;
                }
                else if(isascii(cbuf[0]) && isdigit(cbuf[0])) /* US Stations 
7NNNN */
                {
                        unnextc(buf,cbuf[0]);
                        subtype = US_AAXX;
                        /* 
                         * Some US reports leave off AAXX YYGGisubw, so we use 
the
                         * time from the wmo header. 
                         */
                        wind_units = KNOTS;
                }
                else
                {
                        unnextc(buf,cbuf[0]);
                        return 0; /* ?? */
                }
                break;
        case SHIP :
                if(get_wstr(buf, cbuf, 4) < 0 ) return -1;
                if(cbuf[0] == 'B')
                {
                        if( cbuf[3] != 'X' )
                        {
                                /* punt */
                                uerror("surface_split: Unknown type: %s\n", 
cbuf);
                                return 0;
                        }
                        subtype = BBXX;
                        /* get time below */
                }
                else
                {
                        unnextc(buf,cbuf[0]);
                        return 0;
                }
                break;
        case METAR :
                if(whasSTR(buf, "METAR"))
                {
                        subtype = sMETAR;
                        get_wyyggZ(buf, &time);
                }
                else if(hdr.PIL[0] == 'M' && hdr.PIL[1] == 'T')
                {
                                uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA, 
hdr.PIL ) ;
                                /* skip 6 char PIL */
                                if(get_wstr(buf, cbuf, CALL_SIGN_LEN) < 0)
                                        return 0;
                                subtype = sMETAR;
                                get_wyyggZ(buf, &time);
                }
                else
                        subtype = SAO; /* may actually be a METAR, check below 
*/
                break;  
        case SPECI :
                if(whasSTR(buf, "SPECI"))
                {
                        subtype = sSPECI;
                        get_wyyggZ(buf, &time);
                }
                break;  
        default :
                uerror("surface_split: Can't handle %s", 
                        sMessage_type(mtype) );
                uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA, hdr.PIL ) ;
                return -1;
        }

        { /* while block */
        static char newkey[KEYSIZE];
        xbuf subbuf[1];
        prod_info newinfo = *infop;
#define MAX_SURF_LEN 511
#undef MIN
#define MIN(a,b) ((a) <= (b) ? (a) : (b))
        char pbuf[MAX_SURF_LEN + 1];
        int l1, l2;
        static char ident[CALL_SIGN_LEN+1];
        static char type[4];
        u_int subseq = infop->seqno * 1000; 
        unsigned char *pp;

        while( get_weqxbuf(buf, subbuf) > 0 )
        {
                (void)memset(newkey,0,KEYSIZE);
                (void)memset(pbuf,0,MAX_SURF_LEN + 1);
                (void)memset(ident,0,CALL_SIGN_LEN+1);
                pp = subbuf->base;

                switch(subtype) {
                case AAXX :
                case US_AAXX :
                        strcpy(newkey, "aaxx ");
                        strcpy(pbuf, "AAXX");
                        sprintf(&pbuf[strlen(pbuf)], " %02d%02d%1d\r\r\n",
                                time.mday, time.hour, (int)wind_units);
                                        /* WMO station no. */
                        if(get_wstr(subbuf, ident, 5) < 0)
                                continue;
                        strcat(newkey, ident);
                        break;
                case BBXX :
                        strcpy(newkey, "bbxx ");
                        strcpy(pbuf, "BBXX\r\r\n");
                        /* call sign */
                        if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                continue;
                        strcat(newkey, ident);
                        if(get_yygg(subbuf, &time) < 0) continue; /* YYGG */
                        break;
                case sSPECI :
                        /* call sign */
                        if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                continue;
                        if(strcmp(ident, "SPECI") == 0)
                        {
                                /* They package each ob with a tag */
                                pp = (subbuf->get +1);
                                if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                continue;
                        }
                        if(!whas_yyggZ(subbuf))
                        {
                                /* Have to insert the date */
                                sprintf(pbuf, "SPECI\r\r\n%s %02d%02dZ ",
                                        ident, time.hour, time.min);
                                pp = subbuf->get;
                        }
                        else
                                strcpy(pbuf, "SPECI\r\r\n");
                        strcpy(newkey, "speci ");
                        strcat(newkey, ident);
                        break;
                case sMETAR :
                        if(has_NIL(subbuf))
                                continue;
                        /* call sign */
                        if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                continue;
                        if(strcmp(ident, "METAR") == 0)
                        {
                                /* They package each ob with a tag */
                                pp = (subbuf->get +1);
                                if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                continue;
                        }
                        if(!whas_yyggZ(subbuf))
                        {
                                /* Have to insert the date */
                                sprintf(pbuf, "METAR\r\r\n%s %02d%02dZ ",
                                        ident, time.hour, time.min);
                                pp = subbuf->get;
                        }
                        else
                                strcpy(pbuf, "METAR\r\r\n");
                        strcpy(newkey, "metar ");
                        strcat(newkey, ident);
                        break;
                case SAO :
                        /* call sign */
                        if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                continue;
                        if(hdr.AA[0] == 'U' && hdr.AA[1] == 'S'
                                        && strlen(ident) == 6)
                        {
                                /* skip 6 char US "AFOS code" */
                                if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
                                        continue;
                        }
                                
                        /* SA, SP, RS, USP or XP */
                        if(get_wstr(subbuf, type, 3) < 0)
                                continue;
                        if((type[0] == 'S'
                                         && (type[1] == 'A' || type[1] == 'P'))
                                || (type[0] == 'R' && type[1] == 'S')
                                || (type[0] == 'U' && type[1] == 'S'
                                         && type[2] == 'P')
                                || (type[0] == 'X' && type[1] == 'P')
                                || (type[0] == 'T' &&
                                         (type[1] == 'A' || type[1] == 'S'))
                                )
                        {
                                strcpy(newkey, "sao ");
                                strcat(newkey, type);
                                strcat(newkey, " ");
                                strcat(newkey, ident);
                        } 
                        else if(isdigit(type[0]) && isdigit(type[1]))
                        {
                                /* it is a METAR really */
                                subtype = sMETAR;
                                strcpy(newkey, "metar ");
                                strcat(newkey, ident);
                                strcpy(pbuf, "METAR\r\r\n");
                        }
                        else
                                continue; /* don't know what it is, "NIL=" */
                        break;
                }

                /* safety net */
                if(strlen(ident) == 0)
                {
                        continue;
                }
                /* else */

                sprintf(&newkey[strlen(newkey)], " %02d%02d%02d",
                        time.mday, time.hour, time.min);
                if(hdr.retransmit != ORIGINAL)
                        sprintf(&newkey[strlen(newkey)], " %s",
                                sRetransmit(&hdr));
                newinfo.ident = newkey;
                newinfo.seqno = ++subseq;

                l1 = strlen(pbuf);
                l2 = MIN(MAX_SURF_LEN - l1 - 4, subbuf->bufsiz - (pp - 
subbuf->base));
                /* N.B.: silent truncation */
                strncat(pbuf, (char *)pp, l2 );
                strcat(pbuf,"=\r\r\n");

                newinfo.sz = l1 + l2 + 4;

#if DEBUG
                fprintf(stderr,"\t\t%s\n", newinfo.ident);
#endif
                
#if PRINT
                {
                        char *cp = pbuf;
                        char *end =  &cp[newinfo.sz];
                        while(cp < end)
                        {
                                putc(*cp, stderr);
                                cp++;
                        }
                }
                putc('\n', stderr);
#endif

                MD5Init(md5ctxp);
                MD5Update(md5ctxp, (const unsigned char *)pbuf, newinfo.sz);
                MD5Final(newinfo.signature, md5ctxp);
                
                /*
                 * process the single ob in the requested fashion
                 */
                if((*doit)(&newinfo, pbuf) == 0)
                        nsplit++;

        } /* end while */

#if PRINT
                putc('\n', stderr);
#endif
        } /* end while block */
        } /* end #### block */

        return nsplit;
}
/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: pqsurf.c,v 1.51 1999/04/02 23:16:35 davis Exp $ */

/* 
 * 
 */

/*
 * Need to create a queue before running this:
 * pqcreate -c -s 2M -S 13762 /usr/local/ldm/data/pqsurf.pq
 */

#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rpc/rpc.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>
#include <assert.h>
#include <regex.h>
#include "ldm.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "ulog.h"
#include "pq.h"
#include "paths.h"
#include "surface.h"

#ifdef NO_ATEXIT
#include "atexit.h"
#endif

extern int usePil;  /* 1/0 flag to signal use of AFOS like pil identifier */

#ifndef DEFAULT_INTERVAL
#define DEFAULT_INTERVAL 15
#endif

static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;

#ifndef DEFAULT_PATTERN
#define DEFAULT_PATTERN  "^S[AIMNP]"
#endif
#ifndef DEFAULT_FEEDTYPE
#define DEFAULT_FEEDTYPE  (IDS|DDS)
#endif

static const char *pqfname = DEFAULT_QUEUE;
static pqueue *pq = NULL;

/* set in paths.h by configure */
#ifndef DEFAULT_SURF_OUTQUEUE
#define DEFAULT_SURF_OUTQUEUE "/usr/local/ldm/data/pqsurf.pq"
#endif
static char *opqfname = DEFAULT_SURF_OUTQUEUE;
static pqueue *opq = NULL;

/* set in paths.h by configure */
#ifndef DEFAULT_SURF_CONFFILE
#define DEFAULT_SURF_CONFFILE "/usr/local/ldm/etc/pqsurf.conf"
#endif

/* set in paths.h by configure */
#ifndef DEFAULT_SURF_DATADIR
#ifndef DEFAULT_DATADIR
#define DEFAULT_SURF_DATADIR "/usr/local/ldm"
#else
#define DEFAULT_SURF_DATADIR DEFAULT_DATADIR
#endif
#endif

#ifndef DEFAULT_PIPE_TIMEO
#define DEFAULT_PIPE_TIMEO 60
#endif

#ifndef DEFAULT_AGE
#define DEFAULT_AGE (1. + (double)(DEFAULT_INTERVAL)/3600.)
#endif

static pid_t act_pid;

/*
 * Set to non-root privilege if possible.
 * Do it in such a way that it is safe to fork.
 * TODO: this is duplicated from ../server/priv.c
 */
static void
endpriv(void)
{
        const uid_t euid = geteuid();
        const uid_t uid = getuid();

        /* if either euid or uid is unprivileged, use it */
        if(euid > 0)
                setuid(euid);
        else if(uid > 0)
                setuid(uid);

        /* else warn??? or set to nobody??? */
}

static pid_t
run_child(int argc, char *argv[])
{
        pid_t pid;

        if(ulogIsDebug())
        {
                char command[1024];
                char *cp = command;
                int ii = 0;
                command[0] = 0;

                while (ii < argc)
                {
                        strcpy(cp, argv[ii]);   
                        cp += strlen(argv[ii]);
                        if(++ii == argc)
                                break;
                        *cp++ = ' ';
                        *cp = 0;
                }
                udebug("exec'ing: \"%s\"", command);
        }

        pid = fork();
        if(pid == -1)
        {
                serror("run_child: fork failed");
                return pid;
        }

        if(pid == 0)
        {       /* child */

                (void)signal(SIGCHLD, SIG_DFL);
                (void)signal(SIGTERM, SIG_DFL);

                /* keep same descriptors as parent */

                /* don't let child get real privilege */
                endpriv();

                (void) execvp(argv[0], &argv[0]);
                serror("run_child: execvp: %s", argv[0]);
                _exit(127);
        }
        /* else, parent */

        return pid;
}


static int nprods = 0;
static int nsplit = 0;
static int ndups = 0;

static void
dump_stats(void)
{
        unotice("Number of products %d", nprods);
        unotice("Number of observations %d", nsplit);
        unotice("Number of dups %d", ndups);
}


/* defined in surf_split.c */
extern int surf_split(const prod_info *infop, const void *datap,
                int (*doit)(const prod_info *, const void *));

static int
doOne(const prod_info *infop, const void *datap)
{
        struct product prod;
        int status = ENOERR;

        if(ulogIsDebug())
                udebug("%s", s_prod_info(NULL, 0, infop, 1));
        
        prod.info = *infop;
        prod.data = (void *)datap; /* cast away const */

        nsplit++; /* ?? Do it here on only on success ?? */

        status = pq_insertNoSig(opq, &prod);
        if(status == ENOERR)
        {
                return status; /* Normal return */
        }

        /* else */
        if(status == PQUEUE_DUP)
        {
                ndups++;
                if(ulogIsVerbose())
                        uinfo("Product already in queue: %s",
                                s_prod_info(NULL, 0, &prod.info,
                                         ulogIsDebug()));
                return status;
        }

        /* else, error */
        uerror("pq_insert: %s\n", strerror(status));

        return status;
}


/*
 */
static int
split_prod(const prod_info *infop, const void *datap,
                void *xprod, size_t size,  void *vp)
{
        size_t *nsp = (size_t *)vp;
        int ns;

        if(ulogIsVerbose())
                uinfo("%s", s_prod_info(NULL, 0, infop, ulogIsDebug()));

        ns = surf_split(infop, datap, doOne);

        nprods++;

        (void)kill(SIGCONT, act_pid);

        if(nsp != NULL && ns >= 0)
                *nsp = (size_t)ns;

        return 0;
}


static void
usage(const char *av0) /*  id string */
{
        (void)fprintf(stderr,
                "Usage: %s [options] [confilename]\t\nOptions:\n",
                av0);
        (void)fprintf(stderr,
                "\t-v           Verbose, log each match (SIGUSR2 toggles)\n");
        (void)fprintf(stderr,
                "\t-x           Debug mode\n");
        (void)fprintf(stderr,
                "\t-l logfile   Send log info to file (default uses 
syslogd)\n");
        (void)fprintf(stderr,
                "\t-d datadir   cd to \"datadir\" before interpreting filenames 
in\n");
        (void)fprintf(stderr,
                "\t             conffile (default %s)\n",
                DEFAULT_SURF_DATADIR);
        (void)fprintf(stderr,
                "\t-q queue     default \"%s\"\n", DEFAULT_QUEUE);
        (void)fprintf(stderr,
                "\t-p pattern   Interested in products matching \"pattern\" 
(default \"%s\")\n", DEFAULT_PATTERN);
        (void)fprintf(stderr,
                "\t-f feedtype  Interested in products from feed \"feedtype\" 
(default %s)\n", s_feedtypet(DEFAULT_FEEDTYPE));
        (void)fprintf(stderr,
                "\t-i interval  loop, polling each \"interval\" seconds 
(default %d)\n", DEFAULT_INTERVAL);
        (void)fprintf(stderr,
                "\t-a age       Expire products older than \"age\" hours 
(default %.4f)\n", DEFAULT_AGE);
        (void)fprintf(stderr,
                "\t-t timeo     set write timeo for PIPE subprocs to \"timeo\" 
secs (default %d)\n", DEFAULT_PIPE_TIMEO);
        (void)fprintf(stderr,
                "\t-o offset    the oldest product we will consider is 
\"offset\" secs before now (default: most recent in output queue)\n");
        (void)fprintf(stderr,
                "\t-Q outQueue    default \"%s\"\n", DEFAULT_SURF_OUTQUEUE);
        (void)fprintf(stderr,
                "\t(default conffilename is %s)\n",
                DEFAULT_SURF_CONFFILE);
        exit(1);
}


static pid_t
reap_act(int options)
{
        pid_t wpid = 0;
        int status = 0;

#ifndef NO_WAITPID
        wpid = waitpid(act_pid, &status, options);
#else
        if(options == 0)
                wpid = wait(&status);
        /* customize here for older systems, use wait3 or whatever */
#endif
        if(wpid == -1)
        {
                if(!(errno == ECHILD && act_pid == -1))
                {
                         /* Only complain if relevant */
                        serror("waitpid");
                }
                return -1;
        }
        /* else */

        if(wpid != 0) 
        {
                /* tag_pid_entry(wpid); */

#ifndef NO_WAITPID
                if(WIFSTOPPED(status))
                {
                        unotice("child %d stopped by signal %d",
                                wpid, WSTOPSIG(status));
                }
                else if(WIFSIGNALED(status))
                {
                        unotice("child %d terminated by signal %d",
                                wpid, WTERMSIG(status));
                        /* DEBUG */
                        switch(WTERMSIG(status)) {
                        /*
                         * If a child dumped core,
                         * shut everything down.
                         */
                        case SIGQUIT:
                        case SIGILL:
                        case SIGTRAP: /* ??? */
                        case SIGABRT:
#if defined(SIGEMT)
                        case SIGEMT: /* ??? */
#endif
                        case SIGFPE: /* ??? */
                        case SIGBUS:
                        case SIGSEGV:
#if defined(SIGSYS)
                        case SIGSYS: /* ??? */
#endif
#ifdef SIGXCPU
                        case SIGXCPU:
#endif
#ifdef SIGXFSZ
                        case SIGXFSZ:
#endif
                                act_pid = -1;
                                exit(1);
                                break;
                        }
                }
                else if(WIFEXITED(status))
                {
                        if(WEXITSTATUS(status) != 0)
                                unotice("child %d exited with status %d",
                                        wpid, WEXITSTATUS(status));
                        else
                                udebug("child %d exited with status %d",
                                        wpid, WEXITSTATUS(status));
                        act_pid = -1;
                        exit(WEXITSTATUS(status));
                }
#endif
        }

        return wpid;
}


void
cleanup(void)
{
        unotice("Exiting"); 

        if(act_pid != -1)
        {
                (void)signal(SIGCHLD, SIG_IGN);
                kill(act_pid, SIGTERM);
                (void) reap_act(0);
        }

        if(opq != NULL)
        {
                off_t highwater = 0;
                size_t maxregions = 0;
                (void) pq_highwater(opq, &highwater, &maxregions);
                (void) pq_close(opq);
                opq = NULL;

                unotice("  Queue usage (bytes):%8ld",
                                        (long)highwater);
                unotice("           (nregions):%8ld",
                                        (long)maxregions);
        }

        if(pq != NULL)
        {
                (void) pq_close(pq);
                pq = NULL;
        }

        dump_stats();

        (void) closeulog();
}


static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
        /* 
         * Some systems reset handler to SIG_DFL upon entry to handler.
         * In that case, we reregister our handler.
         */
        (void) signal(sig, signal_handler);
#endif
        switch(sig) {
        case SIGINT :
                unotice("Interrupt");
                intr = !0;
                exit(0);
        case SIGTERM :
                udebug("SIGTERM");
                (void) sleep(0);
                done = !0;      
                return;
        case SIGUSR1 :
                udebug("SIGUSR1");
                stats_req = !0;
                return;
        case SIGUSR2 :
                udebug("SIGUSR2");
                rollulogpri();
                return;
        case SIGCHLD :
                (void) reap_act(WNOHANG);
                /* usually calls exit */
                return;
        }
        udebug("signal_handler: unhandled signal: %d", sig);
}


/*
 * register the signal_handler
 */
static void
set_sigactions(void)
{
        struct sigaction sigact;

        sigemptyset(&sigact.sa_mask);
        sigact.sa_flags = 0;

        /* Ignore these */
        sigact.sa_handler = SIG_IGN;
        (void) sigaction(SIGHUP, &sigact, NULL);
        (void) sigaction(SIGPIPE, &sigact, NULL);
        (void) sigaction(SIGALRM, &sigact, NULL);

        /* Handle these */
#ifdef SA_RESTART       /* SVR4, 4.3+ BSD */
        /* usually, restart system calls */
        sigact.sa_flags |= SA_RESTART;
#endif
        sigact.sa_handler = signal_handler;
        (void) sigaction(SIGTERM, &sigact, NULL);
        (void) sigaction(SIGUSR1, &sigact, NULL);
        (void) sigaction(SIGUSR2, &sigact, NULL);
        (void) sigaction(SIGCHLD, &sigact, NULL);

        /* Don't restart after interrupt */
        sigact.sa_flags = 0;
#ifdef SA_INTERRUPT     /* SunOS 4.x */
        sigact.sa_flags |= SA_INTERRUPT;
#endif
        (void) sigaction(SIGINT, &sigact, NULL);
}


static int
expire(pqueue *epq, const unsigned interval, const double age)
{
        int status = ENOERR;
        static timestampt now;
        static prod_class eclss;
        static prod_spec spec;
        timestampt ts;
        timestampt cursor;
        double diff = 0.;
        double max_latency = 0.;
        size_t nr;

        if(eclss.psa.psa_val == 0)
        {
                /* first time */
                eclss.from = TS_ZERO;
                eclss.psa.psa_len = 1;
                eclss.psa.psa_val = &spec;
                spec.feedtype = ANY;
                spec.pattern = ".*";
                regcomp(&spec.rgx, spec.pattern, REG_EXTENDED|REG_NOSUB);
        }

        (void) set_timestamp(&now);
        if(d_diff_timestamp(&now, &eclss.to) < interval + age)
        {
                /* only run this routine every interval seconds */
                udebug("not yet");
                return ENOERR;
        }
        /* else */
        eclss.to = now;
        eclss.to.tv_sec -= age;

if(ulogIsDebug())
{
        char cp[64];
        sprint_timestampt(cp, sizeof(cp), &eclss.to);
        udebug("to %s", cp);
}


        pq_cset(epq, &TS_ZERO);

        while(!done && !stats_req)
        {
                nr = 0;
                status = pq_seqdel(epq, TV_GT, &eclss, 0, &nr, &ts);

                switch(status) {
                case ENOERR:
                        pq_ctimestamp(epq, &cursor);
                        diff = d_diff_timestamp(&cursor, &ts);
                        if(diff > max_latency)
                        {
                                max_latency = diff;
                                udebug("max_latency %.3f", max_latency);
                        }
                        
                        if(nr == 0)
                        {
                                diff = d_diff_timestamp(&cursor, &eclss.to);
                                udebug("diff %.3f", diff);
                                if(diff > interval + max_latency)
                                {
                                        udebug("heuristic depth break");
                                        break;
                                }

                        }
                        continue; /* N.B., other cases break and return */
                case PQUEUE_END:
                        udebug("expire: End of Queue");
                        break;
                case EAGAIN:
                case EACCES:
                        udebug("Hit a lock");
                        break;
#if defined(EDEADLOCK) && EDEADLOCK != EDEADLK
                case EDEADLOCK:
#endif
                case EDEADLK:
                        uerror("%s", strerror(status));
                        break;
                default:
                        uerror("pq_seqdel failed: %s (errno = %d)",
                                strerror(status), status);
                        break;
                }
                break;
        }
        return status;
}


main(int ac, char *av[])
{
        const char *progname = ubasename(av[0]);
        char *logfname;
        prod_class clss;
        prod_spec spec;
        int status = 0;
        unsigned interval = DEFAULT_INTERVAL;
        int logoptions = (LOG_CONS|LOG_PID);
        double age = DEFAULT_AGE;
        /* these are containers for the pqact args */
        char *argv[16];
        int argc = 0;
        int toffset = TOFFSET_NONE;

        logfname = "";

        if(set_timestamp(&clss.from) != ENOERR) /* corrected by toffset below */
        {
                int errnum = errno;
                fprintf(stderr, "Couldn't set timestamp: %s", 
                        strerror(errnum));
                exit(1);
        }
        clss.to = TS_ENDT;
        clss.psa.psa_len = 1;
        clss.psa.psa_val = &spec;
        
        spec.feedtype = DEFAULT_FEEDTYPE;
        spec.pattern = DEFAULT_PATTERN;


        memset(argv, 0, sizeof(argv));
        argv[0] = "pqact";
        argc++;
        
        /*
         * Check the environment for some options.
         * May be overridden by command line switches below.
         */
        {
                const char *ldmpqfname = getenv("LDMPQFNAME");
                if(ldmpqfname != NULL)
                        pqfname = ldmpqfname;
        }

        {
        extern int optind;
        extern int opterr;
        extern char *optarg;
        int ch;
        int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));
        int fterr;
        char *conffilename = DEFAULT_SURF_CONFFILE;
        char *datadir = DEFAULT_SURF_DATADIR;

        usePil = 1;
        opterr = 1;

        while ((ch = getopt(ac, av, "vxl:d:f:p:q:Q:o:i:a:t:")) != EOF)
                switch (ch) {
                case 'v':
                        argv[argc++] = "-v";
                        logmask |= LOG_MASK(LOG_INFO);
                        break;
                case 'x':
                        argv[argc++] = "-x";
                        logmask |= LOG_MASK(LOG_DEBUG);
                        break;
                case 'l':
                        argv[argc++] = "-l";
                        argv[argc++] = optarg;
                        logfname = optarg;
                        break;
                case 'd':
                        datadir = optarg;
                        break;
                case 'f':
                        fterr = strfeedtypet(optarg, &spec.feedtype);
                        if(fterr != FEEDTYPE_OK)
                        {
                                fprintf(stderr, "%s: %s: \"%s\"\n",
                                        av[0], strfeederr(fterr), optarg);
                                usage(progname);        
                        }
                        argv[argc++] = "-f";
                        argv[argc++] = optarg;
                        break;
                case 'p':
                        spec.pattern = optarg;
                        /* compiled below */
                        break;
                case 'q':
                        pqfname = optarg;
                        break;
                case 'Q':
                        opqfname = optarg;
                        break;
                case 'o':
                        toffset = atoi(optarg);
                        if(toffset == 0 && *optarg != '0')
                        {
                                fprintf(stderr, "%s: invalid offset %s\n",
                                         av[0], optarg);
                                usage(av[0]);   
                        }
                        argv[argc++] = "-o";
                        argv[argc++] = optarg;
                        break;
                case 'i':
                        interval = atoi(optarg);
                        if(interval == 0 && *optarg != '0')
                        {
                                fprintf(stderr, "%s: invalid interval \"%s\"\n",
                                        av[0], optarg);
                                usage(av[0]);
                        }
                        /* N.B. -i just used for input queue. */
                        break;
                case 'a':
                        age = atof(optarg);
                        if(age < 0.)
                        {
                            (void) fprintf(stderr,
                                        "age (%s) must be non negative\n",
                                        optarg);
                                usage(av[0]);   
                        }
                        break;
                case 't':
                        /* pipe_timeo */
                        argv[argc++] = "-t";
                        argv[argc++] = optarg;
                        break;
                case '?':
                        usage(progname);
                        break;
                }

        (void) setulogmask(logmask);

        status = regcomp(&spec.rgx,
                spec.pattern,
                REG_EXTENDED|REG_NOSUB);
        if(status != 0)
        {
                fprintf(stderr, "Bad regular expression \"%s\"\n",
                        spec.pattern);
                usage(av[0]);
        }

        if(ac - optind == 1)
                conffilename = av[optind];

        argv[argc++] = "-d";
        argv[argc++] = datadir;
        argv[argc++] = "-q";
        argv[argc++] = opqfname;
        argv[argc++] = conffilename;

        age *= 3600.;

        }

        if(toffset != TOFFSET_NONE)
        {
                clss.from.tv_sec -= toffset;
        }
        else
        {
                clss.from.tv_sec -= (age - interval);
        }


        /*
         * Set up error logging.
         * N.B. log ident is the remote
         */
        (void) openulog(progname,
                logoptions, LOG_LDM, logfname);
        unotice("Starting Up (%d)", getpgrp());

        /*
         * register exit handler
         */
        if(atexit(cleanup) != 0)
        {
                serror("atexit");
                exit(1);
        }

        /*
         * set up signal handlers
         */
        set_sigactions();


        /*
         * Open the Output product que
         */
        status = pq_open(opqfname, PQ_DEFAULT, &opq);
        if(status)
        {
                uerror("pq_open failed: %s: %s\n",
                        opqfname, strerror(status));
                exit(1);
        }


        act_pid = run_child(argc, argv);
        if(act_pid == (pid_t)-1)
                exit(1);

        /*
         * Open the product que
         */
        status = pq_open(pqfname, PQ_READONLY, &pq);
        if(status)
        {
                uerror("pq_open failed: %s: %s\n",
                        pqfname, strerror(status));
                exit(1);
        }
        if(toffset == TOFFSET_NONE)
        {
                /* Jump to the end of the queue */
                timestampt sav;
                sav = clss.from;
                clss.from = TS_ZERO;
                (void) pq_last(pq, &clss, NULL);
                clss.from = sav;
        }
        else
        {
                pq_cset(pq, &clss.from);
        }

        if(ulogIsVerbose())
        {
                char buf[1984];
                uinfo("%s",
                         s_prod_class(buf, sizeof(buf), &clss));
        }

        while(!done)
        {
                if(stats_req)
                {
                        dump_stats();
                        stats_req = 0;
                }

                status = pq_sequence(pq, TV_GT, &clss, split_prod, NULL);

                switch(status) {
                case 0: /* no error */
                        continue; /* N.B., other cases sleep */
                case PQUEUE_END:
                        udebug("surf: End of Queue");
                        break;
                case EAGAIN:
                case EACCES:
                        udebug("Hit a lock");
                        break;
                default:
                        uerror("pq_sequence failed: %s (errno = %d)",
                                strerror(status), status);
                        exit(1);
                        break;
                }

                if(interval == 0)
                {
                        break;
                }


                (void) expire(opq, interval, age);

                pq_suspend(interval);
                        
        }

        /*
         * TODO: how can we determine that pqact has finished
         *       the work in opq?
         */
        sleep(5);

        exit(0);
}