classdescMP.h
Go to the documentation of this file.
1 /*
2  @copyright Russell Standish 2000-2013
3  @author Russell Standish
4  This file is part of Classdesc
5 
6  Open source licensed under the MIT license. See LICENSE for details.
7 */
8 
12 #ifndef CLASSDESCMP_H
13 #define CLASSDESCMP_H
14 
15 #include "pack_base.h"
16 
17 #undef HAVE_MPI_CPP
18 //#undef SEEK_SET
19 //#undef SEEK_CUR
20 //#undef SEEK_END
21 #include <mpi.h>
22 #include <stdexcept>
23 #include <stdio.h>
24 #include <vector>
25 
26 namespace classdesc
27 {
28 
29 #ifdef HETERO
30  /* Use XDR machine independent packing is cluster is heterogeneous*/
31  typedef xdr_pack MPIbuf_base;
32 #else
33  typedef pack_t MPIbuf_base;
34 #endif
35 
37  class send
38  {
39  send();
40  public:
41  int proc, tag;
42  send(int proc, int tag=0): proc(proc), tag(tag) {}
43  };
44 
45 
47  class isend
48  {
49  isend();
50  public:
51  int proc, tag;
53  isend(int proc, int tag=0): proc(proc), tag(tag) {}
54  };
55 
56 
58  class bcast
59  {
60  bcast();
61  public:
62  int root;
64  bcast(int root): root(root) {}
65  };
66 
68  class mark {};
69 
70  class MPIbuf_array;
71 
73 
75  class MPIbuf: public MPIbuf_base
76  {
77  int *offsets;
78  unsigned offsctr;
79  /* MPI_Finalized only available in MPI-2 standard */
80  bool MPI_running()
81  {
82  int fi, ff=0; MPI_Initialized(&fi);
83 #if (defined(MPI_VERSION) && MPI_VERSION>1 || defined(MPICH_NAME))
84  MPI_Finalized(&ff);
85 #endif
86  return fi&&!ff;
87  }
88  MPI_Request request;
89  friend class MPIbuf_array;
90  public:
92  MPI_Comm Communicator;
93 
94  unsigned myid();
95  unsigned nprocs();
96 
99  bool const_buffer;
100  int proc, tag; /* store status of receives */
101 
102  MPIbuf()
103  {
104  request=MPI_REQUEST_NULL;
105  Communicator=MPI_COMM_WORLD; const_buffer=0;
106  offsets=new int[nprocs()+1]; offsctr=1; offsets[0]=0;
107  }
108  MPIbuf(const MPIbuf& x): offsets(NULL) {*this=x;}
109  ~MPIbuf()
110  {
111  if (request!=MPI_REQUEST_NULL) wait(); //MPI_Request_free(&request);
112  delete [] offsets;
113  }
114  const MPIbuf& operator=(const MPIbuf& x)
115  {
116  Communicator=x.Communicator;
117  delete [] offsets;
118  offsets=new int[nprocs()+1];
119  offsctr=x.offsctr;
120  for (unsigned i=0; i<offsctr; i++) offsets[i]=x.offsets[i];
121  request=MPI_REQUEST_NULL;
122  packraw(x.data(),x.size());
123  return *this;
124  }
125 
126 
128  bool sent() {int is_sent; MPI_Status s; MPI_Test(&request,&is_sent,&s); return is_sent;}
130  void wait() {MPI_Status s; MPI_Wait(&request,&s);}
131 
133  void send(unsigned dest, int tag);
135  void isend(unsigned dest, int tag);
136 
138  MPIbuf& get(int p=MPI_ANY_SOURCE, int t=MPI_ANY_TAG);
139 
141  void send_recv(unsigned dest, int sendtag, int source, int recvtag);
142 
144  MPIbuf& bcast(unsigned root);
145 
147  MPIbuf& gather(unsigned root);
149  MPIbuf& scatter(unsigned root);
151  MPIbuf& reset() {reseti(); reseto(); tag=1; return *this;}
153  bool msg_waiting(int source=MPI_ANY_SOURCE, int tag=MPI_ANY_TAG);
154 
155  template <class T> MPIbuf& operator<<(const T& x);
156 
157  /* Manipulators */
158  MPIbuf& operator<<(classdesc::send s)
159  {send(s.proc,s.tag); return *this;}
160  MPIbuf& operator<<(classdesc::isend s)
161  {isend(s.proc,s.tag); return *this;}
162  MPIbuf& operator<<(classdesc::bcast s)
163  {bcast(s.root); return *this;}
164  /* Mark a processor boundary for scatterv */
165  MPIbuf& operator<<(mark s)
166  {offsets[offsctr++]=size(); return *this;}
167 
168  // template <class T> inline MPIbuf& operator<<(const T& x);
169  // {pack(this,string(),x); return *this;}
170 
171  };
172 
175  {
176  std::vector<MPIbuf> bufs;
177  std::vector<MPI_Request> requests;
178  public:
179 
180  MPIbuf_array(unsigned n): bufs(n), requests(n) {}
181 
182  MPIbuf& operator[](unsigned i) {return bufs[i];}
183 
185  bool testall()
186  {
187  int flag;
188  for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
189  MPI_Testall(bufs.size(),&requests[0],&flag,MPI_STATUSES_IGNORE);
190  return flag;
191  }
193  int testany()
194  {
195  int flag,index;
196  for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
197  MPI_Testany(bufs.size(),&requests[0],&index,&flag,MPI_STATUS_IGNORE);
198  return index;
199  }
201  std::vector<int> testsome()
202  {
203  int count;
204  std::vector<int> index(bufs.size());
205  for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
206  MPI_Testsome(bufs.size(),&requests[0],&count,&index[0],MPI_STATUSES_IGNORE);
207  return std::vector<int>(index.begin(),index.begin()+count);
208  }
210  void waitall()
211  {
212  for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
213  MPI_Waitall(bufs.size(),&requests[0],MPI_STATUSES_IGNORE);
214  }
216  int waitany()
217  {
218  int index;
219  for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
220  MPI_Waitany(bufs.size(),&requests[0],&index,MPI_STATUSES_IGNORE);
221  return index;
222  }
224  std::vector<int> waitsome()
225  {
226  int count;
227  std::vector<int> index(bufs.size());
228  for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
229  MPI_Waitsome(bufs.size(),&requests[0],&count,&index[0],MPI_STATUS_IGNORE);
230  return std::vector<int>(index.begin(),index.begin()+count);
231  }
232  };
233 
234 
235  inline unsigned MPIbuf::myid()
236  {
237  int m;
238  if (MPI_running()) MPI_Comm_rank(Communicator,&m);
239  else m=0;
240  return m;
241  }
242 
243  inline unsigned MPIbuf::nprocs()
244  {
245  int m;
246  if (MPI_running()) MPI_Comm_size(Communicator,&m);
247  else m=1;
248  return m;
249  }
250 
251  inline void MPIbuf::send(unsigned dest, int tag=0)
252  {
253  if (dest==myid()) return; /* nothing to be done */
254  MPI_Send(data(),size(),MPI_CHAR,dest,tag,Communicator); reseti();
255  }
256 
257  inline void MPIbuf::isend(unsigned dest, int tag=0)
258  {
259  if (dest==myid()) return; /* nothing to be done */
260  MPI_Isend(data(),size(),MPI_CHAR,dest,tag,Communicator,&request); reseti();
261  }
262 
263  inline MPIbuf& MPIbuf::get(int p, int t)
264  {
265  MPI_Status status;
266  MPI_Probe(p,t,Communicator,&status);
267  int sz;
268  MPI_Get_count(&status,MPI_CHAR,&sz); //this is berserk, but MPI must have ints!
269  m_size=sz;
270  realloc(m_size);
271  proc=status.MPI_SOURCE;
272  tag=status.MPI_TAG;
273  MPI_Recv(data(),size(),MPI_CHAR,proc,tag,Communicator,&status);
274  reseto();
275  return *this;
276  }
277 
278  // 2002-05-16 - asynchorous send and receive modification.
279  // 2002-05-22 - use MPI_Isend, MPI_Proobe and MPI_Recv
280  //
281  inline void MPIbuf::send_recv(unsigned dest, int sendtag,
282  int source=MPI_ANY_SOURCE,
283  int recvtag=MPI_ANY_TAG)
284  {
285  if (dest==myid()) return; /* nothing to be done */
286  /* send sizes first */
287  int tempsize;
288  MPI_Status status;
289  MPI_Request r1;
290 
291  MPI_Isend(data(), size(), MPI_CHAR, dest, sendtag, Communicator, &r1);
292  MPI_Probe(source, sendtag, Communicator, &status);
293  MPI_Get_count(&status, MPI_CHAR, &tempsize);
294 
295  char *tempdata=realloc(NULL, tempsize);
296  MPI_Recv(tempdata,tempsize, MPI_CHAR,source,recvtag,Communicator,&status);
297 
298  MPI_Wait(&r1,&status); // ensure data is actually sent before deleting storage
299  realloc(0); m_data=tempdata; m_size=tempsize;
300  proc=status.MPI_SOURCE;
301  tag=status.MPI_TAG;
302  reseto();
303  }
304 
305  inline MPIbuf& MPIbuf::bcast(unsigned root)
306  {
307  int myid;
308  if (!const_buffer)
309  {
310  int sz=size();
311  MPI_Bcast(&sz,1,MPI_INT,root,Communicator);
312  m_size=sz;
313  }
314  MPI_Comm_rank(Communicator,&myid);
315  if (myid!=int(root)) realloc(m_size);
316  MPI_Bcast(data(),size(),MPI_CHAR,root,Communicator);
317  reseto();
318  return *this;
319  }
320 
321  inline MPIbuf& MPIbuf::gather(unsigned root)
322  {
323  int rootsz=0;
324  unsigned i;
325  char *rootdata=NULL;
326  int *sizes=NULL, *offsets=NULL;
327  if (!const_buffer)
328  {
329  if (myid()==root)
330  {
331  sizes=new int[nprocs()];
332  offsets=new int[nprocs()+1];
333  }
334  int sz=m_size;
335  MPI_Gather(&sz,1,MPI_INT,sizes,1,MPI_INT,root,Communicator);
336  if (myid()==root)
337  {
338  for (offsets[0]=0, i=0; i<nprocs(); i++)
339  offsets[i+1]=offsets[i]+sizes[i];
340  rootsz=offsets[nprocs()];
341  rootdata=realloc(NULL,rootsz);
342  }
343  MPI_Gatherv(data(),size(),MPI_CHAR,rootdata,sizes,offsets,
344  MPI_CHAR,root,Communicator);
345  if (myid()==root)
346  {delete [] sizes; delete [] offsets;}
347  }
348  else
349  {
350  rootsz=size();
351  if (myid()==root) rootdata=realloc(NULL,size()*nprocs());
352  MPI_Gather(data(),size(),MPI_CHAR,rootdata,size(),
353  MPI_CHAR,root,Communicator);
354  }
355  if (myid()==root) {free(m_data); m_data=rootdata; reseto(); m_size=rootsz;}
356  else reseti();
357  return *this;
358  }
359 
360  inline MPIbuf& MPIbuf::scatter(unsigned root)
361  {
362  int *sizes=NULL, np=nprocs(), amroot=myid()==root;
363  char *rootdata=NULL;
364  if (amroot)
365  {
366  rootdata=m_data;
367  sizes=new int[np];
368  for (int i=0; i<np; i++) sizes[i]=offsets[i+1]-offsets[i];
369  m_data=realloc(NULL,sizes[root]);
370  }
371  /* broadcast sizes array to slaves */
372  int sz;
373  MPI_Scatter(sizes,1,MPI_INT,&sz,1,MPI_INT,root,Communicator);
374  if (sz>=0)
375  {
376  m_size=sz;
377  realloc(m_size);
378  MPI_Scatterv(rootdata,sizes,offsets,MPI_CHAR,data(),size(),MPI_CHAR,
379  root,Communicator);
380  }
381  if (amroot)
382  {
383  free(rootdata); delete [] sizes;
384  for (int i=0; i<np; offsets[i++]=0);
385  }
386  return *this;
387  }
388 
389  inline bool MPIbuf::msg_waiting(int source,int tag)
390  {
391  MPI_Status status;
392  int waiting;
393  MPI_Iprobe(source,tag,Communicator,&waiting,&status);
394  return waiting;
395  }
396 
398  template<class S>
399  class MPIslave: public MPIbuf
400  {
401  S slave;
402  void method(MPIbuf& buffer);
403  public:
404  std::vector<int> idle;
405  MPIslave() {init();}
406  ~MPIslave() {finalize();}
407  void init();
408  void finalize();
409  MPIbuf& operator<<(void (S::*m)(MPIbuf&))
410  {
411  reseti();
412  pack(*this,string(),m);
413  // ::pack(cmd,string(),is_array(),*(char*)&m,1,sizeof(m));
414  return *this;
415  }
416  template <class T> MPIbuf& operator<<(const T& x)
417  {reseti(); return (*this) << x;}
419  void exec(MPIbuf& x) {x.send(idle.back(),0); idle.pop_back();}
421  MPIbuf& get_returnv(){get(); idle.push_back(proc); return *this;}
423  bool all_idle() {return idle.size()==nprocs()-1;}
425  void wait_all_idle() {while (!all_idle()) get_returnv();}
427  void bcast(MPIbuf& c);
428  };
429 
430  template <class S>
431  inline void MPIslave<S>::init()
432  {
433 #if MPI_DEBUG
434  /* enable this piece of code for debugging under gdb */
435  char buf[1];
436  if (myid==0) gets(buf);
437  MPI_Barrier(MPI_COMM_WORLD);
438 #endif
439  if (myid()>0)
440  {
441  /* slave loop */
442  MPIbuf buffer;
443  for (buffer.get(); buffer.tag==0; buffer.get())
444  method(buffer);
445  MPI_Finalize();
446  exit(0);
447  }
448  else
449  for (unsigned i=1; i<nprocs(); i++) idle.push_back(i);
450  }
451 
452  template <class S>
453  void MPIslave<S>::finalize()
454  {
455  if (myid()==0)
456  for (unsigned i=1; i<nprocs(); i++) send(i,1);
457  }
458 
459  template <class S>
460  inline void MPIslave<S>::method(MPIbuf& buffer)
461  {
462  void (S::*m)(MPIbuf&);
463  // buffer.unpackraw((char*)&m,sizeof(m));
464  unpack(buffer,string(),m);
465  buffer.tag=0;
466  (slave.*m)(buffer);
467  if (buffer.tag) buffer.send(buffer.proc,0);
468  }
469 
470  template <class S>
471  inline void MPIslave<S>::bcast(MPIbuf& c)
472  {
473  for (unsigned i=1; i<nprocs(); i++)
474  {MPI_Send(data(),size(),MPI_CHAR,i,0,c.Communicator);}
475  c.reseti();
476  }
477 
479  class MPISPMD
480  {
481  public:
482  int nprocs, myid;
483  MPISPMD() {nprocs=1, myid=0;}
484  MPISPMD(int& argc, char**& argv) {init(argc,argv);};
485  ~MPISPMD() {finalize();}
486  void init(int& argc, char**& argv);
487  void finalize() {MPI_Finalize();}
488  };
489 
490  inline void MPISPMD::init(int& argc, char**& argv)
491  {
492  MPI_Init(&argc,&argv);
493  MPI_Comm_rank(MPI_COMM_WORLD,&myid);
494  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
495 #if MPI_DEBUG
496  /* enable this piece of code for debugging under gdb */
497  char buf[1];
498  if (myid==0) gets(buf);
499  MPI_Barrier(MPI_COMM_WORLD);
500 #endif
501  }
502 
503 }
504 
505 #endif /* CLASSDESCMP_H */
506 
507 
508 
bool msg_waiting(int source=MPI_ANY_SOURCE, int tag=MPI_ANY_TAG)
is there a message waiting to be received into the buffe
Definition: classdescMP.h:389
unsigned nprocs()
current processes taskID
Definition: classdescMP.h:243
void wait_all_idle()
wait until all slaves are idle
Definition: classdescMP.h:425
void bcast(MPIbuf &c)
broadcast a request to all slaves.
Definition: classdescMP.h:471
MPIbuf & gather(unsigned root)
gather data (concatenated) into root&#39;s buffer
Definition: classdescMP.h:321
unsigned myid
main window of application
size_t size() const
size of buffer
Definition: pack_base.h:154
void wait()
wait for previous asyncronous call to complete
Definition: classdescMP.h:130
void exec(MPIbuf &x)
send a request to the next available slave
Definition: classdescMP.h:419
MPIbuf manipulator to broadcast the MPIbuf&#39;s contents to all processes.
Definition: classdescMP.h:58
int waitany()
wait for any outstanding request to complete, returning index of completed request ...
Definition: classdescMP.h:216
MPIbuf manipulator to send the MPIbuf&#39;s contents to a remote process.
Definition: classdescMP.h:37
std::vector< int > waitsome()
wait for some outstanding requests to complete, returning an array of request indices ...
Definition: classdescMP.h:224
Master slave manager.
Definition: classdescMP.h:399
MPI_Comm Communicator
The MPI communicator to be used for subsequent communications.
Definition: classdescMP.h:92
MPIbuf manipulator to asyncronously send the MPIbuf&#39;s contents to a remote process.
Definition: classdescMP.h:47
buffer object providing MPI functionality
Definition: classdescMP.h:75
int testany()
return the index of any request that has completed, or MPI_UNDEFINED if none
Definition: classdescMP.h:193
RAII class to setup and tear down MPI classes. Must be instantiated in main.
Definition: classdescMP.h:479
serialisation descriptor
bool sent()
returns true if previous asyncronous call has been set
Definition: classdescMP.h:128
void send(unsigned dest, int tag)
send data to dest with tag tag
Definition: classdescMP.h:251
used for managing groups of messages
Definition: classdescMP.h:174
MPIbuf & scatter(unsigned root)
scatter root&#39;s data (that has been marked with mark)
Definition: classdescMP.h:360
isend(int proc, int tag=0)
proc MPI taskID to send the message to
Definition: classdescMP.h:53
void send_recv(unsigned dest, int sendtag, int source, int recvtag)
perform a simultaneous send and receive between a pair of processes
Definition: classdescMP.h:281
void pack(pack_t &targ, const string &desc, is_treenode dum, const T *const &arg)
serialise a tree (or DAG)
Definition: pack_graph.h:28
bool all_idle()
true if all slaves are idle
Definition: classdescMP.h:423
MPIbuf & get(int p=MPI_ANY_SOURCE, int t=MPI_ANY_TAG)
receive a message from p (MPI_ANY_SOURCE) with tag t (MPI_ANY_TAG)
Definition: classdescMP.h:263
void isend(unsigned dest, int tag)
asyncronously send data to dest with tag tag
Definition: classdescMP.h:257
const char * data() const
actual buffer
Definition: pack_base.h:152
A manipulator to mark a processor boundary for scatterv.
Definition: classdescMP.h:68
std::vector< int > testsome()
return the index of the requests that have completed
Definition: classdescMP.h:201
MPIbuf & reset()
reset the buffer to send or receive a new message
Definition: classdescMP.h:151
bcast(int root)
root is the taskID of the source data
Definition: classdescMP.h:64
bool const_buffer
buffer size is same on all processes in a collective communication
Definition: classdescMP.h:99
std::vector< int > idle
list of waiting slaves, valid on master
Definition: classdescMP.h:404
Contains definitions related to classdesc functionality.
Definition: arrays.h:2514
MPIbuf & bcast(unsigned root)
broadcast data from root
Definition: classdescMP.h:305
bool testall()
return true if all messages have been completed
Definition: classdescMP.h:185
MPIbuf & get_returnv()
process a return value
Definition: classdescMP.h:421
void waitall()
wait for all outstanding requests to complete
Definition: classdescMP.h:210
void unpack(unpack_t &targ, const string &desc, is_treenode dum, T *&arg)
unserialise a tree.
Definition: pack_graph.h:44