31 typedef xdr_pack MPIbuf_base;
33 typedef pack_t MPIbuf_base;
42 send(
int proc,
int tag=0): proc(proc), tag(tag) {}
53 isend(
int proc,
int tag=0): proc(proc), tag(tag) {}
82 int fi, ff=0; MPI_Initialized(&fi);
83 #if (defined(MPI_VERSION) && MPI_VERSION>1 || defined(MPICH_NAME)) 104 request=MPI_REQUEST_NULL;
105 Communicator=MPI_COMM_WORLD; const_buffer=0;
106 offsets=
new int[nprocs()+1]; offsctr=1; offsets[0]=0;
111 if (request!=MPI_REQUEST_NULL) wait();
118 offsets=
new int[nprocs()+1];
120 for (
unsigned i=0; i<offsctr; i++) offsets[i]=x.offsets[i];
121 request=MPI_REQUEST_NULL;
128 bool sent() {
int is_sent; MPI_Status s; MPI_Test(&request,&is_sent,&s);
return is_sent;}
130 void wait() {MPI_Status s; MPI_Wait(&request,&s);}
133 void send(
unsigned dest,
int tag);
135 void isend(
unsigned dest,
int tag);
138 MPIbuf&
get(
int p=MPI_ANY_SOURCE,
int t=MPI_ANY_TAG);
141 void send_recv(
unsigned dest,
int sendtag,
int source,
int recvtag);
147 MPIbuf& gather(
unsigned root);
149 MPIbuf& scatter(
unsigned root);
153 bool msg_waiting(
int source=MPI_ANY_SOURCE,
int tag=MPI_ANY_TAG);
155 template <
class T>
MPIbuf& operator<<(
const T& x);
159 {send(s.proc,s.tag);
return *
this;}
161 {isend(s.proc,s.tag);
return *
this;}
163 {bcast(s.root);
return *
this;}
166 {offsets[offsctr++]=size();
return *
this;}
176 std::vector<MPIbuf> bufs;
177 std::vector<MPI_Request> requests;
182 MPIbuf& operator[](
unsigned i) {
return bufs[i];}
188 for (
size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
189 MPI_Testall(bufs.size(),&requests[0],&flag,MPI_STATUSES_IGNORE);
196 for (
size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
197 MPI_Testany(bufs.size(),&requests[0],&index,&flag,MPI_STATUS_IGNORE);
204 std::vector<int> index(bufs.size());
205 for (
size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
206 MPI_Testsome(bufs.size(),&requests[0],&count,&index[0],MPI_STATUSES_IGNORE);
207 return std::vector<int>(index.begin(),index.begin()+count);
212 for (
size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
213 MPI_Waitall(bufs.size(),&requests[0],MPI_STATUSES_IGNORE);
219 for (
size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
220 MPI_Waitany(bufs.size(),&requests[0],&index,MPI_STATUSES_IGNORE);
227 std::vector<int> index(bufs.size());
228 for (
size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
229 MPI_Waitsome(bufs.size(),&requests[0],&count,&index[0],MPI_STATUS_IGNORE);
230 return std::vector<int>(index.begin(),index.begin()+count);
235 inline unsigned MPIbuf::myid()
238 if (MPI_running()) MPI_Comm_rank(Communicator,&m);
246 if (MPI_running()) MPI_Comm_size(Communicator,&m);
253 if (dest==
myid())
return;
254 MPI_Send(data(),size(),MPI_CHAR,dest,tag,Communicator); reseti();
259 if (dest==
myid())
return;
260 MPI_Isend(data(),size(),MPI_CHAR,dest,tag,Communicator,&request); reseti();
266 MPI_Probe(p,t,Communicator,&status);
268 MPI_Get_count(&status,MPI_CHAR,&sz);
271 proc=status.MPI_SOURCE;
273 MPI_Recv(data(),size(),MPI_CHAR,proc,tag,Communicator,&status);
282 int source=MPI_ANY_SOURCE,
283 int recvtag=MPI_ANY_TAG)
285 if (dest==
myid())
return;
291 MPI_Isend(data(), size(), MPI_CHAR, dest, sendtag, Communicator, &r1);
292 MPI_Probe(source, sendtag, Communicator, &status);
293 MPI_Get_count(&status, MPI_CHAR, &tempsize);
295 char *tempdata=realloc(NULL, tempsize);
296 MPI_Recv(tempdata,tempsize, MPI_CHAR,source,recvtag,Communicator,&status);
298 MPI_Wait(&r1,&status);
299 realloc(0); m_data=tempdata; m_size=tempsize;
300 proc=status.MPI_SOURCE;
311 MPI_Bcast(&sz,1,MPI_INT,root,Communicator);
314 MPI_Comm_rank(Communicator,&myid);
315 if (myid!=
int(root)) realloc(m_size);
316 MPI_Bcast(data(),size(),MPI_CHAR,root,Communicator);
326 int *sizes=NULL, *offsets=NULL;
331 sizes=
new int[nprocs()];
332 offsets=
new int[nprocs()+1];
335 MPI_Gather(&sz,1,MPI_INT,sizes,1,MPI_INT,root,Communicator);
338 for (offsets[0]=0, i=0; i<nprocs(); i++)
339 offsets[i+1]=offsets[i]+sizes[i];
340 rootsz=offsets[nprocs()];
341 rootdata=realloc(NULL,rootsz);
343 MPI_Gatherv(data(),size(),MPI_CHAR,rootdata,sizes,offsets,
344 MPI_CHAR,root,Communicator);
346 {
delete [] sizes;
delete [] offsets;}
351 if (
myid()==root) rootdata=realloc(NULL,size()*nprocs());
352 MPI_Gather(data(),size(),MPI_CHAR,rootdata,size(),
353 MPI_CHAR,root,Communicator);
355 if (
myid()==root) {free(m_data); m_data=rootdata; reseto(); m_size=rootsz;}
362 int *sizes=NULL, np=nprocs(), amroot=
myid()==root;
368 for (
int i=0; i<np; i++) sizes[i]=offsets[i+1]-offsets[i];
369 m_data=realloc(NULL,sizes[root]);
373 MPI_Scatter(sizes,1,MPI_INT,&sz,1,MPI_INT,root,Communicator);
378 MPI_Scatterv(rootdata,sizes,offsets,MPI_CHAR,data(),size(),MPI_CHAR,
383 free(rootdata);
delete [] sizes;
384 for (
int i=0; i<np; offsets[i++]=0);
393 MPI_Iprobe(source,tag,Communicator,&waiting,&status);
402 void method(
MPIbuf& buffer);
412 pack(*
this,
string(),m);
416 template <
class T>
MPIbuf& operator<<(
const T& x)
417 {reseti();
return (*
this) << x;}
436 if (
myid==0) gets(buf);
437 MPI_Barrier(MPI_COMM_WORLD);
443 for (buffer.
get(); buffer.tag==0; buffer.
get())
449 for (
unsigned i=1; i<nprocs(); i++) idle.push_back(i);
456 for (
unsigned i=1; i<nprocs(); i++)
send(i,1);
464 unpack(buffer,
string(),m);
467 if (buffer.tag) buffer.
send(buffer.proc,0);
473 for (
unsigned i=1; i<nprocs(); i++)
484 MPISPMD(
int& argc,
char**& argv) {init(argc,argv);};
486 void init(
int& argc,
char**& argv);
487 void finalize() {MPI_Finalize();}
490 inline void MPISPMD::init(
int& argc,
char**& argv)
492 MPI_Init(&argc,&argv);
493 MPI_Comm_rank(MPI_COMM_WORLD,&
myid);
494 MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
498 if (
myid==0) gets(buf);
499 MPI_Barrier(MPI_COMM_WORLD);
bool msg_waiting(int source=MPI_ANY_SOURCE, int tag=MPI_ANY_TAG)
is there a message waiting to be received into the buffe
Definition: classdescMP.h:389
unsigned nprocs()
current processes taskID
Definition: classdescMP.h:243
void wait_all_idle()
wait until all slaves are idle
Definition: classdescMP.h:425
void bcast(MPIbuf &c)
broadcast a request to all slaves.
Definition: classdescMP.h:471
MPIbuf & gather(unsigned root)
gather data (concatenated) into root's buffer
Definition: classdescMP.h:321
unsigned myid
main window of application
size_t size() const
size of buffer
Definition: pack_base.h:154
void wait()
wait for previous asyncronous call to complete
Definition: classdescMP.h:130
void exec(MPIbuf &x)
send a request to the next available slave
Definition: classdescMP.h:419
MPIbuf manipulator to broadcast the MPIbuf's contents to all processes.
Definition: classdescMP.h:58
int waitany()
wait for any outstanding request to complete, returning index of completed request ...
Definition: classdescMP.h:216
MPIbuf manipulator to send the MPIbuf's contents to a remote process.
Definition: classdescMP.h:37
std::vector< int > waitsome()
wait for some outstanding requests to complete, returning an array of request indices ...
Definition: classdescMP.h:224
Master slave manager.
Definition: classdescMP.h:399
MPI_Comm Communicator
The MPI communicator to be used for subsequent communications.
Definition: classdescMP.h:92
MPIbuf manipulator to asyncronously send the MPIbuf's contents to a remote process.
Definition: classdescMP.h:47
buffer object providing MPI functionality
Definition: classdescMP.h:75
int testany()
return the index of any request that has completed, or MPI_UNDEFINED if none
Definition: classdescMP.h:193
RAII class to setup and tear down MPI classes. Must be instantiated in main.
Definition: classdescMP.h:479
bool sent()
returns true if previous asyncronous call has been set
Definition: classdescMP.h:128
void send(unsigned dest, int tag)
send data to dest with tag tag
Definition: classdescMP.h:251
used for managing groups of messages
Definition: classdescMP.h:174
MPIbuf & scatter(unsigned root)
scatter root's data (that has been marked with mark)
Definition: classdescMP.h:360
isend(int proc, int tag=0)
proc MPI taskID to send the message to
Definition: classdescMP.h:53
void send_recv(unsigned dest, int sendtag, int source, int recvtag)
perform a simultaneous send and receive between a pair of processes
Definition: classdescMP.h:281
void pack(pack_t &targ, const string &desc, is_treenode dum, const T *const &arg)
serialise a tree (or DAG)
Definition: pack_graph.h:28
bool all_idle()
true if all slaves are idle
Definition: classdescMP.h:423
MPIbuf & get(int p=MPI_ANY_SOURCE, int t=MPI_ANY_TAG)
receive a message from p (MPI_ANY_SOURCE) with tag t (MPI_ANY_TAG)
Definition: classdescMP.h:263
void isend(unsigned dest, int tag)
asyncronously send data to dest with tag tag
Definition: classdescMP.h:257
const char * data() const
actual buffer
Definition: pack_base.h:152
A manipulator to mark a processor boundary for scatterv.
Definition: classdescMP.h:68
std::vector< int > testsome()
return the index of the requests that have completed
Definition: classdescMP.h:201
MPIbuf & reset()
reset the buffer to send or receive a new message
Definition: classdescMP.h:151
bcast(int root)
root is the taskID of the source data
Definition: classdescMP.h:64
bool const_buffer
buffer size is same on all processes in a collective communication
Definition: classdescMP.h:99
std::vector< int > idle
list of waiting slaves, valid on master
Definition: classdescMP.h:404
Contains definitions related to classdesc functionality.
Definition: arrays.h:2514
MPIbuf & bcast(unsigned root)
broadcast data from root
Definition: classdescMP.h:305
bool testall()
return true if all messages have been completed
Definition: classdescMP.h:185
MPIbuf & get_returnv()
process a return value
Definition: classdescMP.h:421
void waitall()
wait for all outstanding requests to complete
Definition: classdescMP.h:210
void unpack(unpack_t &targ, const string &desc, is_treenode dum, T *&arg)
unserialise a tree.
Definition: pack_graph.h:44