ccmqmsgque 9.0
Loading...
Searching...
No Matches
Example: MpGrep

mpgrep is a massive parallel grep tool to search for strings in large blobs

INTRODUCTION

The aim of the tool is to search for strings , this sounds simple but become problematic if large blobs are involved.

  • This is typical of spy or data recovery companies.

This tool divides the blobs into blocks and starts a separate process or thread for each block.

  • Best results are reached if the # of workers is equal to the # of real (not hyperthread) processors.

A couple of features from ccmqmsgque are used to archiev this aim:

FACTORIES

Add client, server and worker code into a single executable.

Example from mpgrep.cc using multiple application-entry-points in a single executable

  // configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting
  MqFactoryCT<GrepClient>::Add("GrepClient")->Default();
  MqFactoryCT<GrepServer>::Add("GrepServer");
  MqFactoryCT<GrepWorker>::Add("GrepWorker");

SERVER

Useable as client or as server, local or remote.

client & local
The client start a server with the @ pipe and the server start the workers
  • mpgrep --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread
server & remote
The server is started with --tcp or --file and waits for a client to establish a connection.
  • mpgrep GrepServer --tcp --port 2345 --thread
The client establishes a connection to the server and transfers the working-parameters.
  • mpgrep --tcp --port 2345 --filename ../../data/test.data.new --jobs 4 --string hello_world

JOBS

Distribute work in parallel.

Example from mpgrep.cc using an asynchronous-service-call to distribute jobs to multiple workers

      // setup worker
      for ( auto job : jobL) {
        SlaveWorker(
          job.id, "GrepWorker", 
            args->Dup()->AppendVA("--prefix", "wk-cl-", "--postfix", int2str(job.id), 
              "@", "--prefix", "wk-sv-", NULL)
        );
      }

      // call the "GREP" service on "Worker" using an ASYNCRONE service call 
      for ( auto job : jobL) {
        SlaveGet(job.id)->Send("C",CallbackF(&GrepServer::callback),"GREP:CWWC", fn, job.startB, job.endB, str);
      }

      // wait for all jobs finished
      for (i=0; i < jobs; i++) {
        ProcessEvent(MQ_WAIT_ONCE);
      }


CODE client & server & worker

/**
 *   @file         example/cc/mpgrep.cc
 *   @brief        mpgrep.cc - 08 May 2023 - aotto1968
 *   @copyright    (C) NHI - #1 - Project - Group
 *                 This software has NO permissions to copy,
 *                 please contact AUTHOR for additional information
 *   @version      134ea1246fd561683c50e72add2d665212149a8b
 *   @date         Mon May 8 00:08:38 2023 +0200
 *   @author       aotto1968 <aotto1968@t-online.de>
 */

/* VAR-START */

#include  "debug_mq.h"
#include  "LibMqMsgque_cc.hh"

/* VAR-END */

// example:
// Nhi1Exec mpgrep.cc --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread

#include <list>
#include <stdexcept>
#include <iostream>
#include <fstream>
#include <cstring>

#include <stdio.h>
#include <sys/stat.h>

using namespace ccmqmsgque;

// "worker" application
class GrepWorker : public MqContextC, public IServerSetup {
  friend class MqFactoryCT<GrepWorker>;

  // define the factory constructor
  GrepWorker(MK_TYP const typ, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) {
    //printTxt("GrepWorker CTOR");
  };

  private:

    static char* read_from_file_open(MK_STRN filename, const std::streamoff start, const std::streamsize size)
    {
      std::ifstream fd(filename, std::ios_base::in | std::ios_base::binary);
      char *buffer= (char*)malloc(size);
      fd.seekg(start,std::ios_base::beg);
      fd.read(buffer,size);
      return buffer;
    }

    // service to serve all incoming requests for token "HLWO"
    void GREP_Service () {
      // get job configuration
      auto  fnP     =  ReadC()  ;
      auto  startZ  =  ReadW()  ;
      auto  endZ    =  ReadW()  ;
      auto  stP     =  ReadC()  ;
      LogV(__func__,0,"START: startZ=%-10ld, endZ=%-10ld, st=%-10s, fn=%s\n",startZ,endZ,stP,fnP);

      // read BLOCK of data from "fnP" 
      size_t blkZ = endZ-startZ;
      char* datP = read_from_file_open(fnP, startZ, blkZ);

      // return start INDICES as WID integer
      size_t  stZ  = strlen(stP);
      char*   endP = datP + blkZ;
      char*   curP = datP;

      SendSTART();
      while (curP < endP) {

//LogV(__func__,0,"curP<%p>, endP<%p>, todo<%zi>\n",curP,endP,endP-curP);

        curP = (char*) memmem(curP,endP-curP,stP,stZ);
        if (curP == NULL) break;
        SendW(startZ+(curP-datP));
        curP += stZ;
      }

      SendRETURN();
      free(datP);
    }

    // define a service as link between the token "GREP" and the callback "GrepWorker::GREP_Service"
    void ServerSetup() {
      ServiceCreate("GREP", CallbackF(&GrepWorker::GREP_Service));
    }
};

// cammand "server" application
class GrepServer : public MqContextC, public IServerSetup {
  friend class MqFactoryCT<GrepServer>;

  public:
    // define the factory constructor
    GrepServer(MK_TYP const typ=NULL, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) {
      if (!ConfigGetIsServer()) ConfigSetName("GrepServer");
    };

  public:
    MkBufferListC RESULTS;

  private:
    // callback used to process the WORKER results 
    void callback() {
      auto master = static_cast<GrepServer*>(SlaveGetMaster());
      master->RESULTS.AppendLP(ReadALL());
      LogV(__func__,0,"END: num=%-10i\n", master->RESULTS.Size());
    }

    static MK_STRN int2str (MK_INT i) {
      static char buffer[30];
      snprintf(buffer,20,"%i", i);
      return buffer;
    }

  public:
    MkBufferListC*  GrepServer_exec (
      MK_INT  jobs,
      MK_STRN  str,
      MK_STRN  fn,
      MkBufferListC  *args
    ) {

      // setup job start position
      struct jobS {
        int     id;
        size_t  startB;
        size_t  endB;
      };
      struct stat st;
      int err = stat(fn, &st);
      if (err == -1) {
        throw(std::invalid_argument("--filename = '" + std::string(fn) + "' : error = " + std::strerror(errno))); 
      }
      size_t strZ   = strlen(str);
      if (strZ == 0) {
        throw(std::invalid_argument("--string : error = size is '0'")); 
      }
      LogV(__func__,0,"START: jobs=%-4d, str=%-20s, fn=%s[%zdMB], args=%s\n",
            jobs,str,fn,st.st_size/1024/1024,args->ToString());
      size_t blk    = st.st_size / jobs;
      size_t start  = 0;
      int i;
      std::list<struct jobS> jobL;
      for (i=0; i < jobs-1; i++) {
        // overlap blocks (strZ-1) because a string maybe on the split
        jobL.push_back((struct jobS) { i+MQ_SLAVE_USER, start, start+blk+strZ-1 });
        start += blk;
      }
      // LAST block always have to match the end
      jobL.push_back((struct jobS) { i+MQ_SLAVE_USER, start, (size_t)st.st_size });

      // [jobs_example]
      // setup worker
      for ( auto job : jobL) {
        SlaveWorker(
          job.id, "GrepWorker", 
            args->Dup()->AppendVA("--prefix", "wk-cl-", "--postfix", int2str(job.id), 
              "@", "--prefix", "wk-sv-", NULL)
        );
      }

      // call the "GREP" service on "Worker" using an ASYNCRONE service call 
      for ( auto job : jobL) {
        SlaveGet(job.id)->Send("C",CallbackF(&GrepServer::callback),"GREP:CWWC", fn, job.startB, job.endB, str);
      }

      // wait for all jobs finished
      for (i=0; i < jobs; i++) {
        ProcessEvent(MQ_WAIT_ONCE);
      }
      // [jobs_example]

      return &RESULTS;
    }

  private:
    void GREP_service() {
      auto jobs = ReadI();
      auto str  = ReadC();
      auto fn   = ReadC();
      auto args = ReadL();

      // send RESULT back to client
      Send("R", "L", GrepServer_exec(jobs, str, fn, args));
    }

    // define a service as link between the token "GREP" and the callback "GrepServer::GREP"
    void ServerSetup() {
      ServiceCreate("GREP",CallbackF(&GrepServer::GREP_service));
    }
};

// "client" application
class GrepClient : public MqContextC {
  friend class MqFactoryCT<GrepClient>;

  // define the factory constructor
  GrepClient(MK_TYP const typ, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) { };

  private:
    static void PrintResults (MkBufferListC *bfl) {
      bfl->Sort();
      printf("POSITIONS --- ( num=%d ) -----------------\n", bfl->Size());
      for (MK_NUM idx=0; idx<bfl->Size(); idx++) {
        printf("%-10ld, ", bfl->IndexGetU(idx)->GetW());
        if (((idx+1) % 8) == 0) { printf("\n"); }
      }
      printf("\n");
      printf("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n");

      bfl->Delete();
    };

  public:
    void GrepClient_exec (MkBufferListC &args) {    

      // CLIENT parse arguments
      auto fn   = strdup(args.CheckOptionC("--filename"));
      auto jobs = args.CheckOptionI("--jobs",1);
      auto str  = strdup(args.CheckOptionC("--search"));

      if (*fn == '\0' or *str == '\0')
          throw std::invalid_argument( "usage: mpgrep --filename FILE --search STRING --jobs NUM:1" );

      LogV(__func__,0,"SETUP: jobs=%-4d, str=%-20s, fn=%s\n",jobs,str,fn);

      MkBufferListC *RESULTS;

      if (args.SearchC("--tcp") != -1 || args.SearchC("--uds") != -1) {
        LinkCreate(args);

        // start the GREP
        Send("W","GREP:ICC[]",jobs,str,fn);
        RESULTS = ReadL()->Dup();

      } else {
        // extract FIRST argument (the executable) from args
        // TODO: args.IndexExtract→leek
        MqMsgque::InitSetArg0VA(args.IndexExtract(0)->GetC(),NULL);
   
        // start LOCAL command-server
        RESULTS = GrepServer().GrepServer_exec(jobs,str,fn,&args)->Dup();
      }

      PrintResults(RESULTS);
    }
};

// package-main
int MK_CDECL main(int argc, MK_STRN argv[]) {
  MqMsgque::Setup();

  // setup commandline arguments for later use
  MkBufferListC args = {argc, argv};

  // [factory_example]
  // configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting
  MqFactoryCT<GrepClient>::Add("GrepClient")->Default();
  MqFactoryCT<GrepServer>::Add("GrepServer");
  MqFactoryCT<GrepWorker>::Add("GrepWorker");
  // [factory_example]

  // inspect commandline-arguments for the "factory" to choose… 
  auto fct = MqFactoryCT<MqContextC>::GetCalled(args);
  // and create the initial instance
  auto ctx = fct->New();
  // depend on "ctx" start server or client
  try {
    if (ctx->ConfigGetIsServer()) {
      // SERVER enter eventloop 
      ctx->LinkCreate(args);
      ctx->ProcessEvent(MQ_WAIT_FOREVER);
    } else {
      // CLIENT call exec
      dynamic_cast<GrepClient*>(ctx)->GrepClient_exec(args);
    }
  } catch (const std::exception& e) {
    ctx->ErrorCatch(e);
  }
  ctx->Exit();
}

/// [server_example]