libmqmsgque 9.0
Loading...
Searching...
No Matches
Example: Filter6

Documentation of the Filter6 tool used for trans2.test.

INTRODUCTION

The Filer6 tool is used to test the filter-feature of libmqmsgque.

To run the filter test, a first client, one or more filters and a final server are created. All are connected to the libmqmsgque protocol.

The trans2.test carries out common filter tests and special stress tests. A stress test is performed by exiting one or more filters or servers and observing the response and behavior when reconnecting.

The GOAL for this test is:

CODE filter

/**
 *   @file         example/c/Filter6.c
 *   @brief        Filter6.c - 02 May 2023 - aotto1968
 *   @copyright    (C) NHI - #1 - Project - Group
 *                 This software has NO permissions to copy,
 *                 please contact AUTHOR for additional information
 *   @version      d2cd8f6ec2179f5e5583ae98faed11359996631d
 *   @date         Tue May 2 21:28:46 2023 +0200
 *   @author       aotto1968 <aotto1968@t-online.de>
 */

/* VAR-START */

#define NHI1_FILE_NAME "Filter6.c"

/* VAR-END */

#include <regex.h>

#include "common.h"

#define TRANSCTX ((struct FilterCtxS*const)mqctx)
#define META_CONTEXT_S mqctx
#define SETUP_ctx struct FilterCtxS*const ctx = TRANSCTX
#define CHECK_MQ_CALLBACK_SERVICE_CALL_ARGS(s) \
  if (MqReadGetNumItems(mqctx))  { \
    return MkErrorSetV_2M (mqctx, "usage: %s (%s)\n", __func__, s); \
  }
#define check_MqError(E) if (MkErrorCheckI(E))
#define ftrctx(x) ((struct FilterCtxS*const)x)

struct FilterCtxS {
  struct MqContextS   mqctx;
  struct MkLogFileS   *FH;
  regex_t             rgx;
  bool                isWriter;
};

static MK_INT retryCnt = 3;

static MK_TYP Filter6TT = NULL;
#define Filter6T  ((typeof(MqContextC_T)) Filter6TT)

/*****************************************************************************/
/*                                                                           */
/*                               Request Handler                             */
/*                                                                           */
/*****************************************************************************/

static void __attribute__ ((noreturn))
FilterHelp (const char * base)
{
  fputs("\n", stderr);
  fprintf(stderr, "usage: %s [OPTION]... [ARGUMENT]...\n", base);
  fputs("\n", stderr);
  fputs("  This tool is database (MqStorage...) test tool of NHI1.\n", stderr);
  fputs("\n", stderr);
  fprintf(stderr, "  %s [ARGUMENT]... syntax:\n", base);
  fprintf(stderr, "    aclient... %c %s ... %c aserver\n", MK_ALFA, base, MK_ALFA);
  fputs("\n", stderr);
  fputs(MqHelp (NULL), stderr);
  fputs("\n", stderr);
  fprintf(stderr, "  %s [OPTION]:\n", base);
  fputs(          "    -h, --help       print this help\n", stderr);
  fputs("\n", stderr);
  fputs(          "    --retryCnt       how often an item should be send on error (default: 3)\n", stderr);
  fputs("\n", stderr);

  exit(EXIT_SUCCESS);
}

static void ErrorWrite (
  MK_RT_ARGS MQ_CTX const mqctx
)
{
  SETUP_ctx;
  MkLogFileWriteV(ctx->FH,"ERROR: %s\n", MkErrorGetText_0E());
  MkErrorReset_1X (mqctx);
}

static enum MkErrorE FilterEvent ( MQ_CALLBACK_SERVICE_CALL_ARGS )
{
  //X0(mqctx)
  MQ_LTR Id = 0LL;
//MqStorageLog(mqctx,__func__);

  // check if an item is available
  if (MqStorageCount_e(mqctx) == 0) {
    // no data available
    MkRtSetup_XN(mqctx);
    MkErrorSetCONTINUE_0E();
    return MkErrorGetCode_0E();
  } else {
    //MQ_LTR Id = 0LL;
    MQ_CTX ftr;
    MqSlaveGetFilter_E (mqctx, &ftr);

    // if connection is down -> connect again
    //MkErrorCheck1 (MqLinkConnect (ftr));

    // fill the read-buffer from storage
    check_MkErrorE (MqStorageImport (mqctx, &Id)) goto error2;
//printXV(mqctx, "START: tok<%s>, Id<%i>\n", MqServiceGetToken(mqctx), Id);
//MqStorageLog(mqctx,"FilterEvent");

    // send BDY data to the link-target, on error write message but do not stop processing
    if (MkErrorCheckI (MqProxyForward(mqctx, ftr, NULL, 10))) {
      MK_INT errCnt;
      check_MqError (MqStorageErrCnt(mqctx,Id,&errCnt)) goto error2;
      if (errCnt <= retryCnt) {
        check_MqError (MqStorageDecrRef(mqctx,Id)) goto error2;
        MkErrorReset_1X(mqctx);
//printXV(mqctx, "CATCH: reset → %i\n", Id);
        return MK_OK;
      } else {
        goto error2;
      }
    }
//printXV(mqctx, "END: delete → %i\n", Id);
    MqStorageDelete_E (mqctx, &Id);
    return MK_OK;
error2:
//printXV(mqctx,"ERR: error2 → %i\n", Id);
    ErrorWrite ( MK_RT_CALL mqctx);
    if (Id != 0LL) MqStorageDelete_E (mqctx, &Id);
    return MK_OK;
  }
  return MK_OK;
error:
//printXV(mqctx,"ERR: error → %i\n", Id);
  return MkErrorStack_1X(mqctx);
}

static enum MkErrorE LOGF ( MQ_CALLBACK_SERVICE_CALL_ARGS ) {
  SETUP_ctx;
  if (  regexec(&ctx->rgx, MqConfigGetName (mqctx), 0, NULL, 0) == 0 ) {
    ctx->isWriter = true;
    MK_STRN file = MqReadC_e (mqctx);
    ctx->FH = MkLogFileOpen_e(MkOBJ(mqctx),file);
  } else {
    MQ_CTX ftr = MqSlaveGetFilter_e (mqctx);
    MqProxyForward_E (mqctx, ftr, NULL,MK_TIMEOUT_DEFAULT);
    ctx->FH = MkLogFileOpen_e(MkOBJ(mqctx),"stderr");
  }
error:
  return MqSendRETURN(mqctx);
}

static enum MkErrorE WRIT ( MQ_CALLBACK_SERVICE_CALL_ARGS ) {
  MQ_CTX master = MqSlaveGetMaster  (mqctx);
  struct FilterCtxS *ftrCtx = ftrctx(master);
  if (  ftrCtx->isWriter ) {
    MkLogFileWriteV_E   (ftrCtx->FH, "%s\n", MqReadC_e(mqctx));
  } else {
    MqProxyForward_E    (mqctx, master, NULL, MK_TIMEOUT_DEFAULT);
  }
error:
  return MqSendRETURN(mqctx);
}

static enum MkErrorE EXIT ( MQ_CALLBACK_SERVICE_CALL_ARGS ) {
  MqExit_1(mqctx);
  return MK_OK;
}

static enum MkErrorE SOEX ( MQ_CALLBACK_SERVICE_CALL_ARGS ) {
  MkErrorSetEXIT_1M(mqctx);
  return MK_OK;
}

static enum MkErrorE FiIn ( MQ_CALLBACK_SERVICE_CALL_ARGS ) {
  //MQ_LTR Id = 0LL;
//X0(mqctx)
//MqStorageLog(mqctx,"FiIn");
  //MqStorageExport (mqctx, &Id);
  MqStorageExport (mqctx, NULL);
//X1(mqctx)
//printXV(mqctx, "tok<%s>, Id<%i>\n", MqServiceGetToken(mqctx), Id);
//MqStorageLog(mqctx,"FiIn");
  return MqSendRETURN(mqctx);
}

static enum MkErrorE CLTI ( MQ_CALLBACK_SERVICE_CALL_ARGS ) {
  return MqSend(mqctx,"R","I",mqctx->link.protect.rmtTransLId);
}

/*****************************************************************************/
/*                                                                           */
/*                                context_init                               */
/*                                                                           */
/*****************************************************************************/

static enum MkErrorE
FilterCleanup ( MQ_CALLBACK_SERVICE_CALL_ARGS )
{
  SETUP_ctx;
  MkLogFileClose(ctx->FH);
  regfree(&ctx->rgx);
  return MK_OK;
}

static enum MkErrorE
FilterSetup ( MQ_CALLBACK_SERVICE_CALL_ARGS )
{
  register SETUP_ctx;
  MQ_CTX ftr;
  MqSlaveGetFilter_E (mqctx, &ftr);

  // [filter_service_example]
  MqServiceCreate_E   (mqctx, "LOGF", LOGF, NULL, NULL, NULL);
  MqServiceCreate_E   (mqctx, "EXIT", EXIT, NULL, NULL, NULL);
  MqServiceCreate_E   (mqctx, "SOEX", SOEX, NULL, NULL, NULL);
  MqServiceStorage_E  (mqctx, "PRNT");
  MqServiceStorage_E  (mqctx, "PRN2");
  MqServiceCreate_E   (mqctx, "+ALL", FiIn, NULL, NULL, NULL);
  MqServiceCreate_E   (ftr,   "WRIT", WRIT, NULL, NULL, NULL);
  MqServiceCreate_E   (mqctx, "WRIT", WRIT, NULL, NULL, NULL);
  MqServiceCreate_E   (mqctx, "CLTI", CLTI, NULL, NULL, NULL);
  MqServiceProxy_E    (ftr,   "WRT2", MQ_SLAVE_MASTER);
  // [filter_service_example]

  ctx->FH = NULL;

  int ret = regcomp(&ctx->rgx, "^(Filter6-1|Filter6|Filter6E|fs1.*)$", REG_EXTENDED);
//printXP(mqctx,&ftrctx(mqctx)->rgx)

  if (ret != 0) {
    return MkErrorSetC_2M(mqctx,"compiling regular expression failed");
  }

  return MK_OK;
error:
  return MkErrorStack_1X(mqctx);
}

enum MkErrorE
Filter6Factory ( MQ_CALLBACK_FACTORY_CTOR_ARGS )
{ 
  MQ_CTX const mqctx = *contextP = MqContextCreate(Filter6TT,tmpl);
  
  mqctx->setup.isServer		    = true;
  mqctx->setup.ServerSetup.fCall    = FilterSetup;
  mqctx->setup.ServerCleanup.fCall  = FilterCleanup;
  mqctx->setup.Event.fCall          = FilterEvent;
  mqctx->setup.EventIsServer        = true;
  mqctx->setup.ignoreExit	    = true;

  return MK_OK;
}

/*****************************************************************************/
/*                                                                           */
/*                                  main                                     */
/*                                                                           */
/*****************************************************************************/

/// [error_example]
int
main (
  const int argc,
  MK_STRN argv[]
)
{
  MqRtSetup_NULL;

  // define the new type
  Filter6TT = MkTypeDup2(MqContextC_TT,"Filter6");
  Filter6TT->objsize = sizeof(struct FilterCtxS);
  Filter6T->fHelp = FilterHelp;

  // create the factory
  MQ_FCT mqfct = MqFactoryAdd(MK_ERROR_PANIC, Filter6Factory, NULL,NULL,NULL,NULL,NULL,NULL,NULL,"Filter6");
  MQ_CTX mqctx = NULL;

#ifdef BUG1
  // for BUG testing in 'trans2-2-T1aE-...'
  MqFactoryInitial(mqfct);
#endif

  // parse the command-line
  MK_BFL args = MkBufferListCreateVC (argc, argv);

  // search for option "--retryCnt"
  MkBufferListCheckOptionI_E (args, "--retryCnt", 3, true, &retryCnt);

  // create the filter
  MqFactoryNew_E (mqfct, NULL, &mqctx);

  // create the link
  MqLinkCreate_E (mqctx, args);

  // start event-loop and wait forever
  MqProcessEvent_E (mqctx, MQ_WAIT_FOREVER, MK_TIMEOUT_USER);

  // finish and exit
error:
  MkBufferListDelete(args);
  MqExit_1 (mqctx);
}
/// [error_example]