Evolution of a Parallel Program

Prev Start Next

Network Computation - PVM

PVM is an acronym for Parallel Virtual Machine. PVM is a common protocol used for implementing distributed computation. It is relatively straightforward, inasmuch as can be expected, and is very highly flexible. This flexibility that PVM offers in fact is considerably more than we require for our purposes but merits example nonetheless. One of PVM's strengths is that it seamlessly incorporates heterogenous collections of machines and processors together in a distributed message passing environment taking care of data heterogeneity with minimal fuss. These "advantages" are not necessarily advantageous in our setting since we have tightly coupled homogenous systems with the CIM beowulf. The overhead and undue latency involved with TCP/IP communication, [unecessary] data packaging and alignment, and "flexiblity" of machine dependent local binary paths all fail to be useful features with Beowulf Cluster but can in greater generality be an excellent mechanism for a distributed system over semi-reliable networks.
However PVM is a very useful, well documented and nearly ideal protocol for our purposes of distributed computation and is well worth consideration as a distributed protocol if not least for its ease of use.

Implementation

The computation on each node is done as in the SMP version identically. The focus here is to distribute the workheap for a collection of nodes and accumulate the results. This workheap partitioning is done according to what is commonly referred to as a master/slave paradigm. There is no inter-communication between the nodes and no data dependency. This is the easiest approach to clustered distributed computation.

Slave Processes

Our threaded programs become the slaves. The difference from the SMP version lies in the manner in which ranges are specified for the calculation to each node. The master, as shown below, will provide the coordination of the range to each node. A blocking wait is done after each range is completed to recieve more data, or a signal from the master that the computation is complete and the slave may return. Here is the main loop of the slave programs complete with the PVM coordination:
int main(int argc, char* argv[]) {
  pthread_t thread[Num_Proc];
  int retval0, retval1, bounds[2], info;
  int primesfound=0;
  int Range0[3] = {0, 0, 100};
  int Range1[3] = {1, 2, 100};
  int ptid, tid, buf, len=0, tag=0;
  char my_name[100];
  
  ptid = pvm_parent();

  buf = pvm_recv(ptid, -1);
  if(buf <= 0) {pvm_perror("PVM_Recv failure\n");}
  info = pvm_bufinfo(buf, &len, &tag, &tid);
  if(info == PvmNoSuchBuf) {pvm_perror("PVM_BufInfo failure:PvmNoSuchBuf\n");}
  if(info == PvmBadParam) {pvm_perror("PVM_BufInfo failure:PvmBadParam\n");}
  pvm_upkint(bounds,2,1);

  do {
    Range0[0] = 0;
    Range0[1] = bounds[0];
    Range0[2] = bounds[1];
    Range1[0] = 1;
    Range1[1] = bounds[0] + 2;
    Range1[2] = bounds[1];

    pthread_create(&thread[0], NULL, (void *)&findPrimes,(void *)&Range0);
    pthread_create(&thread[1], NULL, (void *)&findPrimes,(void *)&Range1);

    pthread_join(thread[0], (void*)&retval0);
    pthread_join(thread[1], (void*)&retval1);

    primesfound = *(int*)retval0 + *(int*)retval1;

    if(pvm_initsend(PvmDataDefault)<0) pvm_perror("initsend Error!!\n");
    gethostname(my_name, 64);
    pvm_pkstr(my_name);
    pvm_pkint(&primesfound, 1, 1);
    pvm_pkint(bounds,2,1);
    pvm_send(ptid, 1);
  
    buf = pvm_recv(ptid, -1);
    if(buf <= 0) {pvm_perror("PVM_Recv failure\n");}
    info = pvm_bufinfo(buf, &len, &tag, &tid);
    if(info == PvmNoSuchBuf) {pvm_perror("PVM_BufInfo failure:PvmNoSuchBuf\n");}
    if(info == PvmBadParam) {pvm_perror("PVM_BufInfo failure:PvmBadParam\n");}
    pvm_upkint(bounds,2,1);
  } while (bounds[1] > 0);

  pvm_exit();
  exit(0);
}
The interesting difference here from the SMP version is that there are no command line arguments supplied (although this method is one possible manner to provide such data, we don't wish to end the process at the the end of each iteration). We initially read the taskID for the master node, for use in communication later on [with pvm_parent() call]. The blocking wait is achieved with pvm_recv() initially and again at the end of the loop. "Blocking" means that the computation is stalled on this call until the request is satisfied. The computation is handled identically to the SMP version once the range is specified and until a signal is sent to indicate that the master has no more work requests. Data is only sent to the master process (on a different machine) for the duration of the computation.

Instructions will be given at the end of the tutorial on how to aquire a tar ball of all of the code, but for completeness the above code is found in prime_thread.c. Also note that when the code is compiled, the resulting binary must be located in a place that your installation of PVM can 'find' for use with master.c.

${PVM_ROOT}/bin/LINUX
works for me, on our cluster, your mileage may vary.

Master Process

The master process is responsible for getting the range of the entire grand computation from the user and for doling out the partitioned work heaps to the slave units. The master program is repleat with PVM send and recieve calls and provides the logic for the work load partitioning:
int main(int argc, char* argv[]) {
  int cc=0, tid, i, buf, len=0, tag=0;
  int info, numsent = 0, numrecd = 0;
  char name[100];
  int range = 1000; /*  default minimal range for each node */
  int LIMIT = 30000; /*  default upper bound for computation */
  int primes = 0, pfound = 0;
  int bounds[64][2], chunk[2], child[32];
  
  if (argc > 1) {
    LIMIT = atoi(argv[1]);    
  }

    /* mytid() _must_ be the first PVM function called!! */
  printf("Master task[%x]\n", pvm_mytid()); 
  
  cc = pvm_spawn("pvm_prime", (char**)0 , PvmTaskArch, "LINUX", 16, child);
  if(cc <= 0) {printf("PVM_SPAWN failure\n");pvm_exit();return -1;}

  range = ((LIMIT/cc) < range)? (LIMIT/cc) : range;
  printf("Starting %d PVM tasks: stride = %d\n", cc, range);

  for (i=0; i < cc; i++) {
      /* Send the data packets using range */ 
    if(pvm_initsend(PvmDataDefault)<0) printf("initsend Error!!\n");
    chunk[0] = i * range + 1;
    chunk[1] = (i+1)* range;
    pvm_pkint(chunk,2,1);
    pvm_send(child[i], 0);
    numsent++;
  }
  
  i=0;
  do {
    buf = pvm_recv(child[i%cc], -1);
    if(buf <= 0) {printf("PVM_Recv failure\n");continue;}
    info = pvm_bufinfo(buf, &len, &tag, &tid);
    if(info < 0) {
      if(info == PvmNoSuchBuf) {printf("PVM_BufInfo failure:PvmNoSuchBuf\n");continue;}
      if(info == PvmBadParam) {printf("PVM_BufInfo failure:PvmBadParam\n");continue;}
    }
    primes = 0;
    pvm_upkstr(name);
    pvm_upkint(&primes,1,1);
    pvm_upkint(bounds[i%cc], 2, 1);
    numrecd++;
    pfound += primes;
    chunk[0] += range;
    chunk[1] += range;
    if (chunk[1] <= LIMIT) {
      pvm_initsend(PvmDataDefault);
      pvm_pkint(chunk,2,1);
      pvm_send(child[i%cc], 0);
      numsent++;
    }
    i++;
  } while ( numrecd < numsent );

  chunk[0] = chunk[1] = -1;
  for(i=0; i
The work heap management aspect of the master is the only issue at
hand.  After the necessary opening call to pvm_mytid(), 
pvm_spawn() seems to return whatever it feels like.  So this
returned value is used in a loop to assign work chunks of the grand
computation to each node currently available.  Data is packed and sent
to nodes that are asking for further work, in the order they ask for
it.  This is actually a very important functionality to provide to
avoid starvation whereby nodes are ready for more work but are
denied for some reason.  In the initial implementation of this code
the master erroneously assigned work in a "for" loop based on
childID.  This was a mistake as it left nodes already
finished their computation [blocking] queued for further work until
all nodes predecessing them in taskID order were finished
with their computations.  The inefficiency of this method should be
clear [and should have been to the programmer ;)].  The current
solution [correctly] assigns work to the first node to report
completion, thereby minimizing the amount of time any node spends
awaiting further range data.

Instructions will be given at the end of the tutorial on how to aquire a tar ball of all of the code, but for completeness the above code is found in master.c


Prev Start Next