ccmqmsgque 9.0
Loading...
Searching...
No Matches
Example: Filter6

Documentation of the Filter6 tool used for trans2.test.

INTRODUCTION

The Filer6 tool is used to test the filter-feature of ccmqmsgque.

To run the filter test, a first client, one or more filters and a final server are created. All are connected to the ccmqmsgque protocol.

The trans2.test carries out common filter tests and special stress tests. A stress test is performed by exiting one or more filters or servers and observing the response and behavior when reconnecting.

The GOAL for this test is:

CODE filter

/**
 *   @file         example/cc/Filter6.cc
 *   @brief        Filter6.cc - 02 May 2023 - aotto1968
 *   @copyright    (C) NHI - #1 - Project - Group
 *                 This software has NO permissions to copy,
 *                 please contact AUTHOR for additional information
 *   @version      d2cd8f6ec2179f5e5583ae98faed11359996631d
 *   @date         Tue May 2 21:28:46 2023 +0200
 *   @author       aotto1968 <aotto1968@t-online.de>
 */

/* VAR-START */

#include  "debug_mq.h"
#include  "LibMqMsgque_cc.hh"

/* VAR-END */

#define META_CONTEXT_S &context

#include <stdexcept>
#include <stdlib.h>
#include <stdio.h>
#include <regex>

using namespace std;
using namespace ccmqmsgque;

static MK_INT retryCnt=3;

class Filter6 : public MqContextC, public IServerSetup, public IServerCleanup, public IEvent {
  friend class MqFactoryCT<Filter6>; 

  private:
    Filter6(MK_TYP const typ, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) {
      ConfigSetIgnoreExit(true);
      rgx = "^(?:Filter6-1|Filter6|fs1.*)$";
    }

  private:
    FILE *FH;
    regex rgx;

    void ErrorWrite () {
      auto err = ErrorDEFAULT();
      fprintf(FH, "ERROR: %s\n", err->GetText());
      fflush(FH);
      err->Reset();
    }

    void LOGF () {
      // get the "link-target"
      MqContextC *ftr = SlaveGetFilter();
      // check "Ident" from the "link-target"
      if (std::regex_match(ftr->ConfigGetName(),rgx)) {
	// if "Ident" is not "transFilter" use the data as file-name to open a file 
	FH = fopen (ReadC(), "a");
      } else {
	// if "Ident" is "transFilter" send the data to the "link-target"
	ProxyForward(ftr);
      }
      SendRETURN();
    }

    static void WRIT (MqContextC *ctx) {
      // get the "master"
      Filter6 *master = dynamic_cast<Filter6*>(ctx->SlaveGetMaster());
      if (std::regex_match(master->ConfigGetName(),master->rgx)) {
        fprintf (master->FH, "%s\n", ctx->ReadC());
        fflush (master->FH);
      } else {
        ctx->ProxyForward(master);
      }
      ctx->SendRETURN();
    }

    void EXIT () {
      Exit();
    }

    void SOEX () {
      ErrorDEFAULT()->SetEXIT();
    }

    void FilterIn () {
      StorageExport();
      SendRETURN();
    }

    void Event () {
      // check if data is available
      if (StorageCount() == 0LL) {
	// no data available, set error-code to CONTINUE
	ErrorDEFAULT()->SetCONTINUE();
      } else {
	MQ_LTR Id = 0LL;
	// an item is available, try to send the data
	try {
	  // get the filter-context
	  MqContextC *ftr = SlaveGetFilter();
	  // read package from storage
	  Id = StorageImport();
	  // forward the entire BDY data to the ftr-target
          try {
            ProxyForward(ftr);
	  } catch (const MkExceptionC& e) {
            if (StorageErrCnt(Id) <= retryCnt) {
              StorageDecrRef(Id);
              return;
            } else {
              throw;
            }
          } catch (...) {
            throw;
          }
	} catch (const exception& e) {
	  ErrorCatch (e);
          // on error write the error-text and "forget" the data
          ErrorWrite();
	}
	// on "success" or on "error" delete item from storage
        if (Id != 0LL) StorageDelete(&Id);
      }
    }

    void ServerCleanup() {
      if (FH != NULL) fclose (FH);
    }

    // [filter_service_example]
    void ServerSetup() {
      MqContextC *ftr = SlaveGetFilter();
      FH = NULL;
      // SERVER: listen on every token (+ALL)
      ServiceCreate       ("LOGF", CallbackF(&Filter6::LOGF));
      ServiceCreate       ("EXIT", CallbackF(&Filter6::EXIT));
      ServiceCreate       ("SOEX", CallbackF(&Filter6::SOEX));
      ServiceCreate       ("+ALL", CallbackF(&Filter6::FilterIn));
      ServiceStorage      ("PRNT");
      ServiceStorage      ("PRN2");
      ftr->ServiceCreate  ("WRIT", StaticCallbackF(&Filter6::WRIT));
      ServiceCreate       ("WRIT", StaticCallbackF(&Filter6::WRIT));
      ftr->ServiceProxy   ("WRT2", MQ_SLAVE_MASTER);

      if (isParent()) {
        LinkCheckOptionI  ("--retryCnt", &retryCnt);
      }
    }
    // [filter_service_example]

  public:

    static void __attribute__ ((noreturn))
    Help (MK_STRN base)
    {
      fputs("\n", stderr);
      fprintf(stderr, "usage: %s [OPTION]... [ARGUMENT]...\n", base);
      fputs("\n", stderr);
      fputs("  This tool is database (MqStorage...) test tool of NHI1.\n", stderr);
      fputs("\n", stderr);
      fprintf(stderr, "  %s [ARGUMENT]... syntax:\n", base);
      fprintf(stderr, "    aclient... %c %s ... %c aserver\n", MK_ALFA, base, MK_ALFA);
      fputs("\n", stderr);
      fputs(MqHelp (NULL), stderr);
      fputs("\n", stderr);
      fprintf(stderr, "  %s [OPTION]:\n", base);
      fputs(          "    -h, --help       print this help\n", stderr);
      fputs("\n", stderr);
      fputs(          "    --retryCnt       how often an item should be send on error (default: 3)\n", stderr);
      fputs("\n", stderr);

      exit(EXIT_SUCCESS);
    }
};

/*****************************************************************************/
/*                                                                           */
/*                               M A I N                                     */
/*                                                                           */
/*****************************************************************************/

/// [error_example]
int MK_CDECL main (int argc, MK_STRN argv[])
{
  MqMsgque::Setup();
  // define factory
  auto Filter6F = MqFactoryCT<Filter6>::Add("Filter6");
  // modify default type
  Filter6F->Type()->fHelp = Filter6::Help;
  // create object from factory
  Filter6 *filter = Filter6F->New();
  try {
    filter->LinkCreate (MkBufferListC {argc, argv});
    filter->ProcessEvent (MQ_WAIT_FOREVER);
  } catch (const exception& e) {
    filter->ErrorCatch(e);
  }
  filter->Exit();
}
/// [error_example]