Open MPI logo

Open MPI User's Mailing List Archives

  |   Home   |   Support   |   FAQ   |   all Open MPI User's mailing list

Subject: Re: [OMPI users] Problems with MPI_Issend and MX
From: Scott Atchley (atchley_at_[hidden])
Date: 2009-07-02 21:06:15


Hi Kris,

I have not run your code yet, but I will try to this weekend.

You can have MX checksum its messages if you set MX_CSUM=1 and use the
MX debug library (e.g. LD_LIBRARY_PATH to /opt/mx/lib/debug).

Do you have the problem if you use the MX MTL? To test it modify your
mpirun as follows:

$ mpirun -mca pml cm ...

and do not specify any BTL info.

Scott

On Jul 2, 2009, at 6:05 PM, 8mj6tc902_at_[hidden] wrote:

> Hi. I've now spent many many hours tracking down a bug that was
> causing
> my program to die, as though either its memory were getting
> corrupted or
> messages were getting clobbered while going through the network, I
> couldn't tell which. I really wish the checksum flag on btl_mx_flags
> were working. But anyway, I think I've managed to recreate the core of
> the problem in a small-ish test case which I've attached
> (verifycontent.cc). This usually segfaults at MPI_Issend after sending
> about 60-90 messages for me while using OpenMPI 1.3.2 with myricom's
> mx-1.2.9 drivers on linux using gcc 4.3.2. Disabling the mx btl
> (mpirun
> -mca btl ^mx) makes it work (likewise, the same for my own larger
> project (Murasaki)). The MPI_Ssend using version
> (verifycontent-ssend.cc) also works no problem over mx. So I suspect
> the
> issue lies in OpenMPI 1.3.2's handling of MPI_Issend over mx, but it's
> also possible I've horribly misunderstood something fundamental about
> MPI and it's just my fault, so if that's the case, please let me know
> (but both my this test case and Murasaki work over mpichmx, so OpenMPI
> is definitely doing something different).
>
> Here's a brief description of verifycontent.cc to make reading it
> easier:
> * given -np=N, half the nodes will be sending, half will be receiving
> some number of messages (reps)
> * each message consists of buflen (5000) chars, set to some value
> based
> on the sending node's rank and the sequence number of the message
> * the receiving node starts an irecv for each sending node, tests each
> request until a message arrives
> * the receiver then checks the contents of the message to make sure it
> matches what was supposed to be in there (this is where my real
> project,
> Murasaki, fails actually. I can't seem to replicate that however).
> * the senders meanwhile keep sending messages and dequeuing them when
> their request tests as completed.
>
> Testing out the current subversion trunk version, 1.4a1r21594, that
> seems to pass my test case, but also tends to show errors like
> "mca_btl_mx_init: mx_open_endpoint() failed with status 20 (Busy)" on
> start up, and Murasaki still fails (messages turn into zeros about
> 132KB
> in), so something still isn't right...
>
> If anyone has any ideas about this test case failing, or my larger
> issue
> of messages turning into zeros after 132KB (though sadly sometimes it
> isn't at 132KB, but straight from 0KB, which is very confusing)
> while on
> MX, I'd greatly appreciate it. Even a simple confirmation of "Yes,
> MPI_Issend/Irecv with MX has issues in 1.3.2" would help my sanity.
> --
> Kris Popendorf
>
> Keio University
> http://murasaki................... <- (Probably too cumbersome to
> expect
> most people to test, but if you feel daring, try putting in some
> Human/Mouse chromosomes over MX)
> #include <mpi.h>
> #include <iostream>
> #include <list>
> #include <vector>
> #include <string.h>
> #include <assert.h>
>
> #include <stdlib.h> //for atoi (in case someone doesn't have boost)
>
> const int buflen=50000*24;
>
> int numprocs, rank, namelen;
> char processor_name[MPI_MAX_PROCESSOR_NAME];
>
> using namespace std;
>
> class Message {
> public:
> MPI_Request req;
> MPI_Status status;
> char buffer[buflen];
> int count;
> void reset(char val){
> memset(buffer,val,sizeof(char)*buflen);
> }
> Message():
> count(0)
> {
> reset(rank);
> }
> Message(int _count) :
> count(_count)
> {
> reset(count+rank+1);
> }
>
> bool preVerify(){
> char content=rank;
> for(int b=0;b<buflen;b++){
> if(buffer[b]!=content){
> cout << "Pre-panic! "<<rank<<" has wrong pre-data (expected
> "<<(int)content<<", not "<<(int)buffer[b]<<") in message "<<count<<"
> at "<<b<<endl;
> for(int bi=max(0,b-5);bi<min(buflen,b+5);bi++)
> cout << rank << ">"<< bi << " = "<< (int)buffer[bi]<<endl;
> assert(buffer[b]==content);
> return false;
> }
> }
> return true;
> }
>
> bool verify(int r){
> char content=count+r+1;
> for(int b=0;b<buflen;b++){
> if(buffer[b]!=content){
> cout << "Oh noes! "<<rank<<" has wrong data (expected
> "<<(int)content<<", not "<<(int)buffer[b]<<") in message "<<count<<"
> to/from "<<r<<" at "<<b<<endl;
> for(int bi=max(0,b-5);bi<min(buflen,b+5);bi++)
> cout << rank << ">"<< bi << " = "<< (int)buffer[bi]<<endl;
> assert(buffer[b]==content);
> return false;
> }
> }
> return true;
> }
>
> };
>
> int main(int argc, char *argv[]) {
> using namespace std;
> long reps=1000;
> if(argc>1){ //optionally specify number of repeats on the command
> line
> reps=atoi(argv[1]);
> }
>
> MPI_Init(&argc, &argv);
> MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
> MPI_Comm_rank(MPI_COMM_WORLD, &rank);
> MPI_Get_processor_name(processor_name, &namelen);
>
> int senders=numprocs/2;
> int receivers=numprocs-senders;
>
> assert(senders>0);
> assert(receivers>0);
>
> cout << "Process "<<rank<<" ("<< getpid()<<") on
> "<<processor_name<<" out of "<<numprocs<<"\n";
> if(rank<senders){
> cout << "Process "<<rank<<" sending "<<reps<<" messages to each
> of "<<receivers<<" nodes"<<endl;
> vector<list<Message> > sendQs(receivers);
> vector<int> counts(receivers,0);
>
> for(int i=0;i<reps;i++){
> for(int receiver=0;receiver<receivers;receiver++){
> list<Message> &sendQ=sendQs[receiver];
> int target=receiver+senders;
>
> sendQ.push_back(Message(counts[receiver]++));
> Message &msg=sendQ.back();
> char content=msg.count+rank+1;
>
> //confirm that everything we're sending hasn't been corrupted
> assert(msg.buffer);
> // cerr << rank<< ">Starting send "<<msg.count<<" from
> "<<((void*)msg.buffer)<<" to "<<receiver<<endl;
> msg.verify(rank);
> MPI_Issend(msg.buffer,buflen,MPI_CHAR,target,
> 0,MPI_COMM_WORLD,&msg.req);
> // cerr << rank << ">:Started send "<<msg.count<<" to
> "<<receiver<<endl;
>
> //if any messages have finished sending, go ahead and dequeue them
> int f=0;
> for(list<Message>::iterator ite=sendQ.begin();ite!=sendQ.end();){
> MPI_Test(&ite->req,&f,&ite->status);
> if(f){
> // cerr << "Send "<<ite->count<<" to "<<receiver<<"
> finished. reseting and deleting"<<endl;
> // ite->reset(0);
> list<Message>::iterator j=ite;
> ++ite;
> sendQ.erase(j);
> }else
> break; //packets should be received in order by receiver, so
> can stop if we hit one that hasn't been received yet
> }
> }
> }
>
> //wait for remaining sends to finish...
> int nodesLeft=0,messageCount=0;
> for(int receiver=0;receiver<receivers;receiver++)
> if(!sendQs[receiver].empty()){
> nodesLeft++;
> messageCount+=sendQs[receiver].size();
> }
>
> cout << "Initial send complete. Waiting for "<<nodesLeft<<" nodes
> to finish "<<messageCount<<" messages."<<endl;
>
> while(nodesLeft){
> for(int receiver=0;receiver<receivers;receiver++){
> list<Message> &sendQ=sendQs[receiver];
> if(!sendQ.empty()){
> int finished=0;
> int
> ret=MPI_Test(&sendQ.front().req,&finished,&sendQ.front().status);
> if(finished)
> sendQ.pop_front();
>
> if(sendQ.empty()){//with this message received we've exhausted
> this queue
> nodesLeft--;
> // cout << "Receiver "<<receiver<<" is done.
> "<<(nodesLeft)<<" left!"<<endl;
> }
> }
> }
> }
>
> cout << "Process "<<rank<<" cleanup done."<<endl;
> }else{
> cout << "Process "<<rank<<" receiving. Expecting "<<reps<<"
> messages from each of "<<senders<<" nodes."<<endl;
>
> vector<Message> msgSenders(senders);
> for(int si=0;si<senders;si++){ //start all of them receiving
> msgSenders[si].preVerify();
> MPI_Irecv(msgSenders[si].buffer,buflen,MPI_CHAR,si,
> 0,MPI_COMM_WORLD,&msgSenders[si].req);
> }
>
> int sendersLeft=senders;
> while(sendersLeft){
> for(int si=0;si<senders;si++){ //for each sender
> Message &msg=msgSenders[si];
> if(msg.count>=reps)
> continue; //done with this one
> //see if any messages have been received
> int f=0;
> if(1){ //use tests
> MPI_Test(&msg.req,&f,&msg.status);
> }else{
> cout << "Waiting on message "<<msg.count<<" from "<<si<<endl;
> MPI_Wait(&msg.req,&msg.status);
> f=1;
> }
> if(f){ //got data back.
> if(msg.status.MPI_SOURCE!=si){//it had better have come from who
> we were expecting
> cout << "Expected message from "<<si<<" but MPI reports it came
> from "<<msg.status.MPI_SOURCE<<endl;
> }
> assert(msg.status.MPI_SOURCE==si);
> char content=si+msg.count+1; //should contain lots of this
> // cout << "Got message "<<msg.count<<" from "<<si<<endl;
> msg.verify(si);
>
> //if we're still expecting more, start receiving again
> msg.count++;
> if(msg.count<reps){
> msg.reset(rank);
> MPI_Irecv(msg.buffer,buflen,MPI_CHAR,si,
> 0,MPI_COMM_WORLD,&msg.req);
> }else{
> sendersLeft--;
> // cout << "Sender "<<si<<" done. "<<sendersLeft<<"
> left."<<endl;
> }
> }
>
> }
> }
>
> cout << "All messages accounted for!\n";
> }
>
> MPI_Barrier(MPI_COMM_WORLD);
>
> if(rank==0)
> cout << "All procs done."<<endl;
>
> MPI_Finalize();
> return 0;
> }
> #include <mpi.h>
> #include <iostream>
> #include <list>
> #include <vector>
> #include <string.h>
> #include <assert.h>
>
> #include <stdlib.h> //for atoi (in case someone doesn't have boost)
>
> const int buflen=50000*24;
>
> int numprocs, rank, namelen;
> char processor_name[MPI_MAX_PROCESSOR_NAME];
>
> using namespace std;
>
> class Message {
> public:
> MPI_Request req;
> MPI_Status status;
> char buffer[buflen];
> int count;
> void reset(char val){
> memset(buffer,val,sizeof(char)*buflen);
> }
> Message():
> count(0)
> {
> reset(rank);
> }
> Message(int _count) :
> count(_count)
> {
> reset(count+rank+1);
> }
>
> bool preVerify(){
> char content=rank;
> for(int b=0;b<buflen;b++){
> if(buffer[b]!=content){
> cout << "Pre-panic! "<<rank<<" has wrong pre-data (expected
> "<<(int)content<<", not "<<(int)buffer[b]<<") in message "<<count<<"
> at "<<b<<endl;
> for(int bi=max(0,b-5);bi<min(buflen,b+5);bi++)
> cout << rank << ">"<< bi << " = "<< (int)buffer[bi]<<endl;
> assert(buffer[b]==content);
> return false;
> }
> }
> return true;
> }
>
> bool verify(int r){
> char content=count+r+1;
> for(int b=0;b<buflen;b++){
> if(buffer[b]!=content){
> cout << "Oh noes! "<<rank<<" has wrong data (expected
> "<<(int)content<<", not "<<(int)buffer[b]<<") in message "<<count<<"
> to/from "<<r<<" at "<<b<<endl;
> for(int bi=max(0,b-5);bi<min(buflen,b+5);bi++)
> cout << rank << ">"<< bi << " = "<< (int)buffer[bi]<<endl;
> assert(buffer[b]==content);
> return false;
> }
> }
> return true;
> }
>
> };
>
> int main(int argc, char *argv[]) {
> using namespace std;
> long reps=1000;
> if(argc>1){ //optionally specify number of repeats on the command
> line
> reps=atoi(argv[1]);
> }
>
> MPI_Init(&argc, &argv);
> MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
> MPI_Comm_rank(MPI_COMM_WORLD, &rank);
> MPI_Get_processor_name(processor_name, &namelen);
>
> int senders=numprocs/2;
> int receivers=numprocs-senders;
>
> assert(senders>0);
> assert(receivers>0);
>
> cout << "Process "<<rank<<" ("<< getpid()<<") on
> "<<processor_name<<" out of "<<numprocs<<"\n";
> if(rank<senders){
> cout << "Process "<<rank<<" sending "<<reps<<" messages to each
> of "<<receivers<<" nodes"<<endl;
> vector<list<Message> > sendQs(receivers);
> vector<int> counts(receivers,0);
>
> for(int i=0;i<reps;i++){
> for(int receiver=0;receiver<receivers;receiver++){
> list<Message> &sendQ=sendQs[receiver];
> int target=receiver+senders;
>
> sendQ.push_back(Message(counts[receiver]++));
> Message &msg=sendQ.back();
> char content=msg.count+rank+1;
>
> //confirm that everything we're sending hasn't been corrupted
> assert(msg.buffer);
> // cerr << rank<< ">Starting send "<<msg.count<<" from
> "<<((void*)msg.buffer)<<" to "<<receiver<<endl;
> msg.verify(rank);
> MPI_Ssend(msg.buffer,buflen,MPI_CHAR,target,0,MPI_COMM_WORLD);
> // cerr << rank << ">:Started send "<<msg.count<<" to
> "<<receiver<<endl;
> }
> }
>
> cout << "Process "<<rank<<" cleanup done."<<endl;
> }else{
> cout << "Process "<<rank<<" receiving. Expecting "<<reps<<"
> messages from each of "<<senders<<" nodes."<<endl;
>
> vector<Message> msgSenders(senders);
> for(int si=0;si<senders;si++){ //start all of them receiving
> msgSenders[si].preVerify();
> MPI_Irecv(msgSenders[si].buffer,buflen,MPI_CHAR,si,
> 0,MPI_COMM_WORLD,&msgSenders[si].req);
> }
>
> int sendersLeft=senders;
> while(sendersLeft){
> for(int si=0;si<senders;si++){ //for each sender
> Message &msg=msgSenders[si];
> if(msg.count>=reps)
> continue; //done with this one
> //see if any messages have been received
> int f=0;
> if(1){ //use tests
> MPI_Test(&msg.req,&f,&msg.status);
> }else{
> cout << "Waiting on message "<<msg.count<<" from "<<si<<endl;
> MPI_Wait(&msg.req,&msg.status);
> f=1;
> }
> if(f){ //got data back.
> if(msg.status.MPI_SOURCE!=si){//it had better have come from who
> we were expecting
> cout << "Expected message from "<<si<<" but MPI reports it came
> from "<<msg.status.MPI_SOURCE<<endl;
> }
> assert(msg.status.MPI_SOURCE==si);
> char content=si+msg.count+1; //should contain lots of this
> // cout << "Got message "<<msg.count<<" from "<<si<<endl;
> msg.verify(si);
>
> //if we're still expecting more, start receiving again
> msg.count++;
> if(msg.count<reps){
> msg.reset(rank);
> MPI_Irecv(msg.buffer,buflen,MPI_CHAR,si,
> 0,MPI_COMM_WORLD,&msg.req);
> }else{
> sendersLeft--;
> // cout << "Sender "<<si<<" done. "<<sendersLeft<<"
> left."<<endl;
> }
> }
>
> }
> }
>
> cout << "All messages accounted for!\n";
> }
>
> MPI_Barrier(MPI_COMM_WORLD);
>
> if(rank==0)
> cout << "All procs done."<<endl;
>
> MPI_Finalize();
> return 0;
> }
> _______________________________________________
> users mailing list
> users_at_[hidden]
> http://www.open-mpi.org/mailman/listinfo.cgi/users