Hi Loh,
I used MPI_Init_thread(&argc,&argv, MPI_THREAD_MULTIPLE, &provided); in my program and got provided = 0 which turns out to be the MPI_THREAD_SINGLE. Does this mean that I can not use MPI_THREAD_MULTIPLE model? I write a little program to test the multithreading and it worked sometimes and failed sometimes. It also hang there sometimes. Does this only because the MPI_THREAD_MULTIPLE is not supported or there are some bugs in the program? I attached the little program as follow:
#include <iostream>
#include <pthread.h>
#include <fstream>
#include <sstream>
#include <string.h>
#include "mpi.h"
using namespace std;
#define MSG_QUERY_SIZE 16 //sizeof(MPI_query_msg) = 16
struct MPI_query_msg
{
int flag; // -1:request cell; 0:query coordinate; 1:there is no cell to grant
int x;
int y;
int ignited; // if x,y are not negative, then ignited: 0 is not ignited, 1 is ignited
};
void* backRecv(void* arg)
{
int myid, nprocs;
pthread_mutex_init(&_dealmutex2, NULL);
stringstream RANK;
MPI_Status status;
MPI_Request req2;
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
int left = (myid - 1 + nprocs - 1) % (nprocs - 1);
int right = (myid + 1) % (nprocs - 1);
MPI_query_msg rMSG;
RANK << myid;
cout << myid << " create background message recv" << endl;
int x, y;
//char c;
int m;
int count = 0;
string filename("f_");
filename += RANK.str();
filename += "_backRecv.txt";
fstream fout(filename.c_str(), ios::out);
if(!fout)
{
cout << "can not create the file " << filename << endl;
fout.close();
exit(1);
}
while(true)
{
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 222, MPI_COMM_WORLD, &status);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 222, MPI_COMM_WORLD, &req2);
//MPI_Wait(&req2, &status);
fout << "BACKREV:" << myid << " recv from " << status.MPI_SOURCE << " rMSG.flag = " << rMSG.flag << " with tag 222" << endl;
fout.flush();
if(rMSG.flag == -1)
{
fout << "*******backRecv FINISHED IN " << myid << "********" << endl;
fout.flush();
fout.close();
pthread_exit(NULL);
return 0;
}
};
}
int main(int argc, char **argv)
{
int myid = 0;
int provided;
int nprocs = 0;
pthread_t pt1 = 0;
pthread_t pt2 = 0;;
int pret1 = 0;
int pret2 = 0;
int i = 0, j = 0, m = 0;
//MPI_Status status;
MPI_Request requ1, requ2;
MPI_Status status1, status2;
MPI_Init_thread(&argc,&argv, MPI_THREAD_MULTIPLE, &provided);
//MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);
pthread_mutex_init(&_dealmutex, NULL);
if(myid == nprocs - 1) // the last one
{
if(provided == MPI_THREAD_MULTIPLE)
{
cout << myid << " got MPI_THREAD_MULTIPLE " << endl;
}
else
{
cout << myid << " MPI_THREAD_MULTIPLE = " << MPI_THREAD_MULTIPLE << endl;
cout << myid << " MPI_THREAD_SINGLE = " << MPI_THREAD_SINGLE << endl;
cout << myid << " got provided = " << provided << endl;
}
MPI_query_msg sMSGqueue[50], rMSG;
for(i=0; i<50; i++)
{
sMSGqueue[i].flag = i;
sMSGqueue[i].x = i;
sMSGqueue[i].y = i;
sMSGqueue[i].ignited = i;
}
while(j < 50)
{
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status2);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &requ2);
//MPI_Wait(&requ2, &status2);
cout << "MAIN(" << j << "): " << myid << " recvs from "<< status2.MPI_SOURCE << " with tag = " << status2.MPI_TAG << " rMSG.flag = " << rMSG.flag << endl;
MPI_Send(&(sMSGqueue[j]), MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, status2.MPI_TAG, MPI_COMM_WORLD);
//MPI_Isend(&(sMSGqueue[j]), MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, status2.MPI_TAG, MPI_COMM_WORLD, &requ1);
//MPI_Wait(&requ1, &status1);
cout << "MAIN(" << j << "): " << myid << " sends to "<< status2.MPI_SOURCE << " with tag = " << status2.MPI_TAG << " sMSGqueue[j].flag = " << sMSGqueue[j].flag << endl;
j++;
};
int count = 0;
while(true)
{
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status2);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &requ2);
//MPI_Wait(&requ2, &status2);
rMSG.flag = -1;
MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, status2.MPI_TAG, MPI_COMM_WORLD);
//MPI_Isend(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, status2.MPI_TAG, MPI_COMM_WORLD, &requ1);
//MPI_Wait(&requ1, &status1);
cout << "MAIN sends termination to " << status2.MPI_SOURCE << endl;
count++;
if(count == myid)
break;
};
cout << "*******************************MAIN SUCESS!" << endl;
}
else
{
pret1 = pthread_create(&pt1, NULL, backRecv, NULL);
if(pret1 != 0)
{
cout << myid << "backRecv Thread Create Failed." << endl;
exit(1);
}
MPI_query_msg sMSG, rMSG;
rMSG.flag = myid;
rMSG.x = myid;
rMSG.y = myid;
rMSG.ignited = myid;
sMSG.flag = myid;
sMSG.x = myid;
sMSG.y = myid;
sMSG.ignited = myid;
int left = (myid - 1 + nprocs - 1) % (nprocs - 1);
int right = (myid + 1) % (nprocs - 1);
while(true)
{
MPI_Send(&sMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD);
//MPI_Isend(&sMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD, &requ1);
//MPI_Wait(&requ1, &status1);
cout << "SLAVE: " << myid << " sends to "<< nprocs-1 << " sMSG.x = " << sMSG.x << endl;
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD, &status2);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD, &requ2);
//MPI_Wait(&requ2, &status2);
cout << "SLAVE: " << myid << " recvs from "<< nprocs-1 << " rMSG.flag = " << rMSG.flag << endl;
MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD);
//MPI_Isend(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD, &requ1);
//MPI_Wait(&requ1, &status1);
if(rMSG.flag == -1)
{
//MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD);
break;
}
for(j=0; j<(myid+1)*300; ++j)
{}
};
cout << "*******************************SLAVE" << myid << " SUCESS!" << endl;
pthread_join(pt1, NULL);
}
MPI_Finalize();
}
BTW, if I want to create a background thread which is sort of like a deamon thread, how can I achieve that in MPI programs? Thanks.
Date: Tue, 22 Sep 2009 10:32:50 -0700
From: Eugene.Loh_at_[hidden]
To: users_at_[hidden]
Subject: Re: [OMPI users] How to create multi-thread parallel program using thread-safe send and recv?
guosong wrote:
Thanks for responding. I used a linux cluster. I think I would like to create a model that is multithreaded and each thread can make MPI calls. I attached test code as follow. It has two pthreads and there are MPI calls in both of those two threads. In the main function, there are also MPI calls. Should I use a full multithreading?I guess so. It seems like the created threads are expected to make independent/concurrent message-passing calls. Do read the link I sent. You need to convert from MPI_Init to MPI_Init_thread(), asking for a full-multithreaded model and checking that you got it. Also note in main() that the MPI_Isend() calls should be matched with MPI_Wait() or similar calls. I guess the parent thread will sit in such calls while the child threads do their own message passing. Good luck.
_________________________________________________________________
Ô¼»á˵²»Ç嵨·½£¿À´ÊÔÊÔ΢ÈíµØÍ¼×îÐÂmsn»¥¶¯¹¦ÄÜ£¡
http://ditu.live.com/?form=TL&swm=1
|