yorick banner

Home

Manual

Packages

Global Index

Keywords

Quick Reference


/*
   MPY.I
   Message passing extensions to Yorick.

   $Id$
 */

/* MAKE-INSTRUCTIONS
SRCS = mpy.c
LIB = mpy
DEPLIBS =
*/

/* mp_debug= 1; */
/* system,"/bin/hostname"; */

local mp_rank ;
local mp_size ;
/* DOCUMENT mp_rank, mp_size

     set to the rank of this instance of Yorick in a message passing
     group and the size (total number of processes) of the group at
     startup.  DO NOT CHANGE THESE VALUES!
     The rank is between 0 and size-1 inclusive; note that mp_rank
     and mp_size will be nil if multiple processes are not present
     (mp_size==1 is impossible).

     The underlying notions are defined in the MPI (Message Passing
     Interface) standard.

   SEE ALSO: mp_send, mp_recv, mp_task, mp_include
 */

extern mp_send ;
/* DOCUMENT mp_send, to, msg
         or mp_send, to, msg1, msg2, ...
         or mp_send, to_list, msg1, msg2, ...

     send MSG, MSG1, MSG2, ... to process whose rank is TO.  Each
     MSG must be an array (or scalar) of type char, short, int, long,
     float, double, or complex, or a scalar string.

     If the object is to send a message to all other processes, the
     binary tree broadcaster mp_bcast will probably be faster than
     mp_send.

     If TO_LIST is an array of rank numbers, then each MSG may be an
     equal length array of pointers to send a different message to
     each process in the TO_LIST, or one of the basic data types to
     send the same message to each process in TO_LIST.

     The mp_send operation blocks until the matching mp_recv operation
     begins (for every message if multiple MSG or a TO_LIST are
     specified).  However, within each mp_send, many messages may be
     launched simultaneously (but never more than one at a time to
     each process) -- the underlying MPI call is the non-blocking
     synchronous send routine MPI_Issend.

     See mp_task for how to synchronize all the processes in order to
     cleanly begin a parallel calculation.

   SEE ALSO: mp_recv, mp_bcast, mp_rank, mp_task, mp_include
 */

extern mp_recv ;
/* DOCUMENT msg= mp_recv()
         or msg= mp_recv(dimlist)

     receive the next message sent by mp_send from some other process.
     The operation blocks until the next message arrives.  The mp_from
     function can be used to determine who sent the message, or to
     determine whether any message at all has arrived.

     If DIMLIST is specified, it has the same format as for the array
     function.  The arriving message must have the correct number of
     elements for the DIMLIST, or a multiple of that number; the
     result will have either those dimensions, or with an extra
     dimension tacked on the end if the arriving message is a multiple.
     (That is, you are really specifying the dimensions of the "cells"
     of which the message is to be composed.)  By default (and as a
     special case), the result of mp_recv will be either a scalar value,
     or a 1D array of the same type as the matching send.

     Each mp_recv gets one MSG from an mp_send.  Several messages from
     one process to another are guaranteed to arrive in the order of
     the mp_send calls, and within each call in the order of the
     arguments to mp_send.  However, you are responsible for considering
     how to handle the order of messages arriving from several processes.

     If you wish to process message in a different order from that in
     which they are received, you must implement the necessary queuing
     functions yourself in interpreted code.

     See mp_task for how to synchronize all the processes in order to
     cleanly begin a parallel calculation.

   SEE ALSO: mp_send, mp_rank, mp_from, mp_task, mp_include, array
 */

func mp_bcast (origin, msg, .., nfan=)
/* DOCUMENT mp_bcast, mp_rank, msg
                <on originating node>
            msg= mp_bcast(origin)
         or msg= mp_bcast(origin, dimlist)
                <on receiving nodes>

     broadcast a message MSG from one process (the ORIGIN) to all other
     processes, by means of a binary tree -- that is, each process
     passes the message along to two others.  The ORIGIN argument must
     have the same value on all processes; you cannot use mp_recv to
     receive a message broadcast using mp_bcast.  The performance of
     mp_bcast will probably be better than mp_send with a long to-list.

     For receivers, the optional DIMLIST argument works as described
     for mp_recv.  The returned msg will be scalar or 1D without it.

     All processes can use the nfan= keyword (same value everywhere) to
     have each distribute to nfan others instead of the default nfan=2.
     If nfan=1, the message is passed in a daisy chain from one to the
     next; if nfan=mp_size-1, mp_bcast reduces to a single mp_send.

   SEE ALSO: mp_send, mp_recv
 */
{
  /* roll the process rank numbers so that origin becomes 0 */
  me= (mp_rank - origin + mp_size)%mp_size;

  /* with the new numbering scheme, process me passes the message
   * along to nfan*me+1, nfan*me+2, ... nfan*me+nfan
   * (these must be rolled back to get the true process numbering) */
  if (is_void(nfan)) nfan= 2;
  i= me*nfan + 1;
  if (i<mp_size)
    to= (indgen(i:min(i-1+nfan,mp_size-1)) + origin)%mp_size;

  /* if this is not the origin, must receive the message first */
  if (me) {
    /* first collect any dimlist like the array function */
    if (!is_void(msg)) {
      if (!dimsof(msg)(1)) dims= [1, msg];
      else dims= msg(1:msg(1)+1);
    }
    while (more_args()) {
      msg= next_arg();
      if (is_void(msg)) continue;
      if (is_void(dims)) dims= [0];
      if (!dimsof(msg)(1)) {
        grow, dims, msg;
        dims(1)+= 1;
      } else {
        n= msg(1);
        if (n) grow, dims, msg(2:n+1);
        dims(1)+= n;
      }
    }

    /* receive the message */
    msg= mp_recv(dims);
  }

  /* pass the message along */
  if (numberof(to)) mp_send, to, msg;

  return msg;
}

extern mp_from ;
/* DOCUMENT rank= mp_from()
         or rank= mp_from(next)

     returns the rank number of the process of the sender of the most
     recently received message (or -1 if mp_recv has not been called
     since the last mpy_sync).

     If NEXT is non-nil and non-zero, returns the rank of the sender
     of the *next* message mp_recv will produce.  If no message has
     yet been posted, returns -1 if next==1.  This feature may be used
     to test for the arrival of a message without blocking.  If next
     is a non-zero value other than 1, mp_from will block until the
     next message actually arrives.

   SEE ALSO: mp_recv, mp_send, mp_rank
 */

func mp_task (task)
/* DOCUMENT mp_task, task

     register the TASK function as a parallel processing function.
     Each process must call mp_task to declare the same set of TASK
     functions, therefore mp_task will normally be called from a
     source file included by mp_include.

     A TASK function must be have the following structure in order
     to work properly:

        func TASK(arg1, arg2, ...)
        {
          if (catch(-1)) mp_abort;
          mp_start, TASK;
          if (mp_rank) {
            ...TASK will be invoked as a subroutine with no    ...
            ...arguments in all but the rank 0 process         ...
            ...when it returns, the process becomes idle       ...
            ...so it no longer participates in the calculation ...
            ...instead of returning, a non-rank 0 task may also...
            ...choose to hang indefinitely in an mp_recv --    ...
            ...it will exit via a caught error when the next   ...
            ...registered task runs on the rank 0 process      ...
          } else {
            ...the interface to task -- it's arguments and return...
            ...value -- are relevant only to the rank 0 process  ...
            ...which must distribute startup messages and collect...
            ...results from all other processes.  when the rank 0...
            ...process decides the task is finished it returns a ...
            ...result (or has side effects) which the user wants ...
            ...the catch line is optional; if not present an     ...
            ...error in the rank 0 process will leave the other  ...
            ...processes running.  if the other processes        ...
            ...might run communicating with each other and       ...
            ...meaninglessly consuming resources, you should     ...
            ...worry about this; if the other processes will     ...
            ...halt without continuing messages from rank 0      ...
            ...you don't need to bother with this                ...
            return significant_value;
          }
        }
        mp_task, TASK;

     Only a registered task will start running on every process
     in the parallel machine.  A registered task must never be
     called during the execution of another registered task (since
     a side effect of the mp_start call is to halt and resynchronize);
     a registered task may only be called by the rank 0 process,
     invoked either interactively or as the result of some other
     interactive action (e.g.- a #include or an unregistered function
     which calls the registered TASK function).

     The mp_include, mp_cd, and mp_pool functions are initially the only
     registered tasks; other tasks are generally defined and registered
     in source files included as startup files for other packages or by
     means of mp_include.

   SEE ALSO: mp_include, mp_cd, mp_pool, mp_rank, mp_start, mp_abort, catch
 */
{
  if (!is_func(task))
    error, "cannot register non-function: "+nameof(task);
  task= nameof(task);
  if (is_void(mpy_tasks) || noneof(mpy_tasks==task))
    grow, mpy_tasks, [task];
}

func mp_start (task)
/* DOCUMENT mp_start, task
     must be the second line of any function TASK which is registered
     as a parallel task interface using mp_task.  See mp_task for the
     required form of the TASK function.

     The mp_start function resynchronizes all processes participating
     in the parallel calculation.  Called from the rank 0 process,
     mp_start arranges for TASK to be started without arguments on
     all other processes; called from any other process, mp_start
     is a no-op.

   SEE ALSO: mp_task
 */
{
  if (mp_rank) return;
  if (noneof(mpy_tasks==nameof(task)))
    error, "mp_task never registered the function "+nameof(task);

  /* resynchronize to force non-rank 0 processes back to mpy_idler */
  mpy_sync;
  /* block until mpy_idler has picked up the new task
   * since this mp_start was called by task on rank 0, and mpy_idler
   * will start task on all other ranks, task should be running
   * everywhere shortly after this mp_send returns */
  mp_send, indgen(mp_size-1), nameof(task);
}

func mp_abort (void)
/* DOCUMENT if (catch(-1)) mp_abort
     must be the first line of any function registered with mp_task.
     This clears all pending messages, then blows up with an error,
     so it does not return to the calling task.
   SEE ALSO: mp_task, mp_start
 */
{
  if (mp_debug)
    write,"(**mp_abort) rank "+pr1(mp_rank)+" caught "+catch_message;
  if (catch_message!=".SYNC.") mpy_sync, catch_message;
  if (mp_debug)
    write,"(**mp_abort) rank "+pr1(mp_rank)+" done with mpy_sync";
  if (mp_rank) error, ".SYNC.";
  else error, "(bailing out of parallel task)";
}

/* The mpy_idler function is called in lieu of waiting for keyboard input
 * when Yorick is finished with all its tasks.  This idler is intended
 * for use by non-rank 0 processes.  It's purpose is to catch any errors
 * and attempt to restart the virtual machine by alerting the rank 0
 * process (which in turn will try to generate errors in the others).  */
func mpy_idler 
{
  batch, 1;
  if (catch(-1)) {
    if (mp_debug)
      write,"(**mpy_idler) rank "+pr1(mp_rank)+" caught "+catch_message;
    if (catch_message!=".SYNC.") mpy_sync, catch_message;
    if (mp_debug)
      write,"(**mpy_idler) rank "+pr1(mp_rank)+" done with mpy_sync";
    set_idler, mpy_idler;
    return;
  }
  if (mp_debug)
    write,"rank "+pr1(mp_rank)+" at sync in mpy_idler";
  mpy_sync;
  /* pick up the message generated by mp_start and start that task */
  task= mp_recv();
  if (mp_debug)
    write,"rank "+pr1(mp_rank)+" executing "+task;
  task= symbol_def(task);
  task;
  set_idler, mpy_idler;
}

func mp_include (filename)
/* DOCUMENT mp_include, filename

     include the specified FILENAME in all processes.  The FILENAME
     argument is ignored in all but the rank 0 process, which is
     assumed to be the orginal caller.  The FILENAME must be visible
     to every process (normally in Y_SITE/include, Y_SITE/contrib,
     ~/Yorick, or the current working directory - but see mp_cd in
     the latter case).

     The mp_include function is registered as a parallel task, so
     it may only be invoked interactively from the rank 0 process
     or from another file included there (see mp_task for a more
     elaborate description).

   SEE ALSO: mp_task, mp_rank, mp_cd
 */
{
  if (catch(-1)) mp_abort;
  mp_start, mp_include;
  if (mp_rank) {
    filename= mp_bcast(0);
  } else {
    if (structof(filename)!=string || strlen(filename)<1)
      error, "filename must be a non-empty string";
    mp_bcast, 0, filename;
  }
  include, filename;
}

mp_task, mp_include;

func mp_cd (dirname)
/* DOCUMENT mp_cd, dirname
         or mp_cd

     Change all processes to directory DIRNAME, or to the current
     working directory of the rank 0 process if DIRNAME is not
     specified.  Note that DIRNAME must exist for all processes.
     Note also that the processes may start in different directories.

     The mp_cd function is registered as a parallel task, so
     it may only be invoked interactively from the rank 0 process
     or from another file included there (see mp_task for a more
     elaborate description).

   SEE ALSO: mp_task, mp_rank, mp_include
 */
{
  if (catch(-1)) mp_abort;
  mp_start, mp_cd;
  if (mp_rank) {
    dirname= mp_bcast(0);
  } else {
    if (is_void(dirname)) dirname= get_cwd();
    if (structof(dirname)!=string || strlen(dirname)<1)
      error, "dirname must be a non-empty string";
    mp_bcast, 0, dirname;
  }
  cd, dirname;
}

mp_task, mp_cd;

/* ------------------------------------------------------------------------ */

func mp_pool (_p_ntasks, _mp_sow, _mp_work, _mp_reap, _mp_work0)
/* DOCUMENT mp_pool, n_tasks, sow, work, reap, work0
         or mp_pool, n_tasks, sow, work, reap

   Implement pool-of-tasks parallel calculation.  In this model,
   the master process on rank 0 has a number of tasks to be done,
   which he wants to distribute to his slaves.  The pool of tasks
   is hopefully larger than the number of slaves, so that as each
   slave finishes, it can be assigned the next task in the pool,
   achieving a simple form of load balancing.  The master can
   optionally choose to do the next task when all slaves are busy,
   or can just block until a slave finishes.

   N_TASKS is the total number of tasks in the pool; the remaining
   arguments are functions which carry out the various parts of
   the particular calculation that mp_pool is managing:

   func SOW(to, i)
     mp_send, to, <input1>, <input2>, ..., <inputQ>
     <no return value>

     The SOW function will be called by mp_pool in order to send
     the messages to a slave which will be received by the WORK
     function.  Note that SOW only runs on the master process; it
     is called by mp_pool, and has access to the local variables
     of the caller of mp_pool, which it presumably will need in
     order to figure out what to send for the i-th task.  Like
     Yorick indices, i varies from 1 to N_TASKS.  The local variables
     in the mp_pool function all have names beginning with "_p_", to
     make it very unlikely that any variables local to its caller
     will be shadowed.

   func WORK
     input1= mp_recv(dimsi1)
     input2= mp_recv(dimsi2)
     ...
     inputQ= mp_recv(dimsiQ)
     <do the task specified by the messages>
     mp_send, 0, <result1>, <result2>, ..., <resultR>
     <no return value>

     The WORK function runs on a slave, so it does NOT have access
     to any variables available to the caller of mp_pool on the
     master process.  Therefore, WORK must be completely self-contained
     so that all of the information it needs to do its job must arrive
     in the form of the messages sent by SOW (or state information
     previously sent to all slaves).  When finished, WORK sends
     its results back to the master on rank 0; it has no other return
     value.

   func REAP(i, m)
     if (m==1) {
       result1= mp_recv(dimsr1)
       <save or accumulate 1st result of i-th task>
     } else if (m==2) {
       result2= mp_recv(dimsr2)
       <save or accumulate 2nd result of i-th task>
     } else ...
     } else if (m==R) {
       resultR= mp_recv(dimsrR)
       <save or accumulate Rth result of i-th task>
     }
     return (m==R)

     The REAP function runs on the master to collect the results sent
     from a slave by a WORK function.  Like SOW, it is called by mp_pool
     on rank 0, and therefore has access to all the local variables of
     the caller of mp_pool, into which REAP must store the results.
     Because the slaves perform their tasks asynchronously, their return
     messages may be interleaved.  That is, the first argument i to
     REAP varies unpredictably in successive calls.  However, for a given
     i, the second argument m will always begin at 1 and increment by one
     on successive calls with that i.  Thus, it is possible to cope with
     situations in which one result message determines the existence of
     a subsequent message.  The return value of REAP informs mp_pool
     when that slave has completed its task and is available for another,
     by returning non-zero when m reaches the final message sent by work.
     (The mp_pool function has actually peeked at the incoming message
     to learn its origin, and therefore the corresponding task number i.
     There is no reason for REAP to call mp_from.)

   func WORK0(i)
     <do the i-th task>
     <no return value>

     If the optional WORK0 function is provided, and all slaves ever
     busy, mp_pool will call WORK0 to have the master perform a task.
     Like SOW and REAP, WORK0 always runs on rank0 and has access to the
     local variables of the caller of mp_pool.  It should combine the
     functions of SOW, WORK, and REAP without the message passing.

   SEE ALSO: mp_task, mp_partition, mp_prange
 */
{
  if (catch(-1)) mp_abort;
  mp_start, mp_pool;

  if (!mp_rank) {                                       /* MASTER */
    /* broadcast _mp_work name to slaves */
    mp_bcast, 0, nameof(_mp_work);

    /* initialize free processor list */
    _p_free= [];
    for (_p_n=mp_size-1 ; _p_n>0 ; --_p_n) _p_free= _cat(_p_n, _p_free);
    _p_working= 0;

    /* initialize the slave-->task table
     * and the number of messages counters */
    _p_table= _p_nmsg= array(0, mp_size-1);

    for (_p_n=1 ; _p_n<=_p_ntasks ; ++_p_n) {
      /* without waiting, read any results that have arrived,
       * in hopes that some slave might have finished
       * if unwilling to do any work ourself and
       * if no slaves unoccupied, wait for one to finish */
      _mp_pool_reap, is_void(_mp_work0);

      if (_p_free) {
        /* assign this task to an unoccupied slave */
        _p_to= _nxt(_p_free);
        _p_working+= 1;
        _p_table(_p_to)= _p_n;
        mp_send, _p_to, 1;    /* send start message */
        _mp_sow, _p_to, _p_n;
      } else {
        /* do this task ourselves */
        _mp_work0, _p_n;
      }
    }
    /* wait for all slaves to finish */
    _mp_pool_reap, 2;

    /* send all done message (instead of 1 message) */
    mp_send, indgen(mp_size-1), 0;

  } else {                                               /* SLAVE */
    /* get _mp_work function in original broadcast */
    _mp_work= symbol_def(mp_bcast(0));

    /* and slavishly do tasks as master dishes them out */
    while (mp_recv()) _mp_work;
  }
}

func _mp_pool_reap (wait)
{
  /* next, grab messages until one (wait==1) or all (wait==2) slaves
   * finish, or until we would block if wait==0 */
  no_free= !_p_free;
  while (!wait || ((wait==1)? no_free : _p_working)) {
    _p_n= mp_from((wait?2:1));
    if (!wait && _p_n<0) break;
    _mp_pool_handler, _p_n;
    no_free= !_p_free;
  }
}

func _mp_pool_handler (_p_n)
{
  if (!_p_n) error, "why did rank 0 send message to itself?";
  _p_nt= _p_table(_p_n);   /* translate process rank to task number */
  _p_nm= ++_p_nmsg(_p_n);  /* number of this message */
  if (_mp_reap(_p_nt, _p_nm)) {
    /* rank p slave has finished his task */
    _p_nmsg(_p_n)= _p_table(_p_n)= 0;
    _p_free= _cat(_p_n, _p_free);
    _p_working-= 1;
  }
}

mp_task, mp_pool;

func mp_partition (njobs, ntrips, master_works)
/* DOCUMENT ntasks= mp_partition(njobs, ntrips)
         or ntasks= mp_partition(njobs, ntrips, master_works)

   Partition NJOBS jobs into tasks to be distributed among slave
   processes by mp_pool.  Typically, NJOBS will be the length of an
   array of input variables, for example the number of rays to be
   traced.  Each slave is to perform a task consisting of some number
   of these individual jobs.  The idea is to pick a number of jobs
   per task large enough that the overhead of message passing and of
   carving up inputs then reassembling outputs is tolerable.
   Conversely, the number of jobs per task should be small enough
   that the pool of tasks will achieve reasonable load balancing by
   requiring each slave to come back for a new task several times.

   The NTRIPS argument is the number of tasks you would like each
   slave to perform in order to achieve load balancing, bearing in
   mind that if you choose it too large, you will increase the
   overhead by partitioning your jobs into too many tasks.  The
   minimum number of jobs for good load balancing is nslaves*NTRIPS.

   The return value is the number of tasks into which you should
   partition the NJOBS.  The mp_prange function can be used to
   compute the job index range for the i-th such task.

   SEE ALSO: mp_pool, mp_prange
 */
{
  nslaves= mp_size;
  if (!master_works) nslaves-= 1;

  /* get the maximum number of tasks any one slave should do */
  ntrips= min((njobs-1)/nslaves+1, ntrips);

  return min(nslaves*ntrips, njobs);
}

func mp_prange (i, ntasks, njobs)
/* DOCUMENT range= mp_prange(i, ntasks, njobs)
     return the job index range for the I-th of NTASKS, if the total
     number of jobs is NJOBS.  This can be used in conjunction with
     mp_partition and mp_pool.
   SEE ALSO: mp_pool, mp_partition
 */
{
  jpt= njobs/ntasks;
  rem= njobs%ntasks;
  if (rem && i<=rem) {
    jpt+= 1;
    i*= jpt;
  } else {
    i= rem + i*jpt;
  }
  return call(i-jpt+1:i);
}

/* ------------------------------------------------------------------------ */

extern mpy_sync ;
/* **DOCUMENT mpy_sync
           or mpy_sync, error_message

     sends a synchronization request to the rank 0 process (if this is
     not rank 0 itself), then discards all arriving messages until the
     rank 0 process sends back a synchronization acknowledgement, at
     which point mpy_sync returns.  If any process sends an "unexpected"
     mpy_sync, all processes will halt as if by an error eventually.
     This may take some time, since the synchronization request will not
     be detected until all pending mp_send operations have been received
     and processed.

     This function should not be called by user programs; if you want
     to abort call the error function as usual; if you want to synchronize
     before beginning a calculation, use mp_start.

   SEE ALSO: mp_start, error
 */

/* Every process except the rank 0 process pauses on startup
 * until the mpy_sync command is given from the rank 0 process.
 * Also, every non-0 process runs in batch mode with the mpy_idler
 * function installed.  */
extern mpy_rank ;
extern mpy_size ;
extern mpy_init ;

/* See process_argv - this function is called when custom.i executes. */
func get_command_line (void)
{
  { extern mp_rank, mp_size; }
  command_line= get_argv();
  if (numberof(command_line)) list= where(command_line!="-nompi");
  if (!numberof(command_line) || numberof(list)==numberof(command_line)) {
    /* Go ahead and try to start MPI.
     * The problem here is that MPI_Init calls exit if we are not running
     * as a multiprocessing task, and this error cannot be caught.
     * Furthermore, we are not guaranteed that any command line arguments
     * will be delivered to processes started by MPI, so the default must
     * be to assume that MPI_Init should be called.  */
    if (mpy_init()) {
      mp_size= mpy_size();
      mp_rank= mpy_rank();
    }
    if (mp_rank) {
      batch, 1;
      set_idler, mpy_idler;
      grow, command_line, ["-q"];
    } else if (mp_size) {
      write, "MPI started with "+pr1(mp_size)+" processes.";
    }
  } else {
    command_line= command_line(list);
  }
  /* allow for the possibility of a second startup hook function */
  if (!is_void(mp_process_argv)) return mp_process_argv(command_line);
  else return command_line;
}