diff --git a/ompi/mca/btl/openib/btl_openib_component.c b/ompi/mca/btl/openib/btl_openib_component.c index 5bca8f6..dd7a62a 100644 --- a/ompi/mca/btl/openib/btl_openib_component.c +++ b/ompi/mca/btl/openib/btl_openib_component.c @@ -148,6 +148,7 @@ int btl_openib_component_open(void) mca_btl_openib_component.eager_limit = mca_btl_openib_module.super.btl_eager_limit; + srand48(getpid() * time(NULL)); return ret; } diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.c b/ompi/mca/btl/openib/btl_openib_endpoint.c index 1370164..d898dea 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.c +++ b/ompi/mca/btl/openib/btl_openib_endpoint.c @@ -39,6 +39,10 @@ #include #include +#define ENDPOINT_CONNECT_REQ 0 +#define ENDPOINT_CONNECT_RSP 1 +#define ENDPOINT_CONNECT_ACK 2 + static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint); static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint); @@ -270,6 +274,7 @@ static void mca_btl_openib_endpoint_construct_qp( endpoint->sd_credits[prio] = mca_btl_openib_component.rd_num; endpoint->rem_info.rem_qp_num[prio] = 0; endpoint->rem_info.rem_psn[prio] = 0; + endpoint->lcl_qp[prio] = NULL; } static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint) @@ -377,8 +382,7 @@ static void mca_btl_openib_endpoint_send_cb( OBJ_RELEASE(buffer); } - -static int mca_btl_openib_endpoint_send_connect_data(mca_btl_base_endpoint_t* endpoint) +static int mca_btl_openib_endpoint_send_connect_data(mca_btl_base_endpoint_t* endpoint, uint8_t message_type) { orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t); int rc; @@ -389,57 +393,83 @@ static int mca_btl_openib_endpoint_send_connect_data(mca_btl_base_endpoint_t* en } /* pack the info in the send buffer */ - - rc = orte_dss.pack(buffer, &endpoint->lcl_qp[BTL_OPENIB_HP_QP]->qp_num, 1, ORTE_UINT32); - if(rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; - } - - rc = orte_dss.pack(buffer, &endpoint->lcl_qp[BTL_OPENIB_LP_QP]->qp_num, 1, ORTE_UINT32); + rc = orte_dss.pack(buffer, &message_type, 1, ORTE_UINT8); if(rc != ORTE_SUCCESS) { ORTE_ERROR_LOG(rc); return rc; } - rc = orte_dss.pack(buffer, &endpoint->lcl_psn[BTL_OPENIB_HP_QP], 1, - ORTE_UINT32); + rc = orte_dss.pack(buffer, &endpoint->subnet_id, 1, ORTE_UINT64); if(rc != ORTE_SUCCESS) { ORTE_ERROR_LOG(rc); return rc; } - - rc = orte_dss.pack(buffer, &endpoint->lcl_psn[BTL_OPENIB_LP_QP], 1, - ORTE_UINT32); - if(rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; + + if(message_type != ENDPOINT_CONNECT_REQ) { + /* send the QP connect request info we respond to */ + rc = orte_dss.pack(buffer, + &endpoint->rem_info.rem_qp_num[BTL_OPENIB_HP_QP], 1, + ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + rc = orte_dss.pack(buffer, &endpoint->rem_info.rem_lid, 1, ORTE_UINT16); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } } + + if(message_type != ENDPOINT_CONNECT_ACK) { + rc = orte_dss.pack(buffer, &endpoint->lcl_qp[BTL_OPENIB_HP_QP]->qp_num, + 1, ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } - rc = orte_dss.pack(buffer, &endpoint->endpoint_btl->lid, 1, ORTE_UINT16); - if(rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; - } + rc = orte_dss.pack(buffer, &endpoint->lcl_qp[BTL_OPENIB_LP_QP]->qp_num, + 1, ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } - rc = orte_dss.pack(buffer, &endpoint->subnet_id, 1, ORTE_UINT64); - if(rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; - } + rc = orte_dss.pack(buffer, &endpoint->lcl_psn[BTL_OPENIB_HP_QP], 1, + ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + rc = orte_dss.pack(buffer, &endpoint->lcl_psn[BTL_OPENIB_LP_QP], 1, + ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + rc = orte_dss.pack(buffer, &endpoint->endpoint_btl->lid, 1, ORTE_UINT16); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + rc = orte_dss.pack(buffer, &endpoint->endpoint_btl->hca->mtu, 1, + ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } - rc = orte_dss.pack(buffer, &endpoint->endpoint_btl->hca->mtu, 1, ORTE_UINT32); - if(rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; + rc = orte_dss.pack(buffer, &endpoint->index, 1, ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } } - rc = orte_dss.pack(buffer, &endpoint->index, 1, ORTE_UINT32); - if(rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; - } /* send to endpoint */ rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0, mca_btl_openib_endpoint_send_cb, NULL); @@ -515,8 +545,7 @@ static int mca_btl_openib_endpoint_start_connect_qp( static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoint) { int rc; - - srand48(getpid() * time(NULL)); + rc = mca_btl_openib_endpoint_start_connect_qp(endpoint, BTL_OPENIB_HP_QP); if(rc != OMPI_SUCCESS) { return rc; @@ -534,8 +563,9 @@ static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoi /* Send connection info over to remote endpoint */ endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; - if(OMPI_SUCCESS != - (rc = mca_btl_openib_endpoint_send_connect_data(endpoint))) { + + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_send_connect_data(endpoint, + ENDPOINT_CONNECT_REQ))) { BTL_ERROR(("error sending connect request, error code %d", rc)); return rc; } @@ -575,7 +605,6 @@ static int mca_btl_openib_endpoint_reply_start_connect( { int rc; - srand48(getpid() * time(NULL)); rc = mca_btl_openib_endpoint_reply_start_connect_qp(endpoint, rem_info, BTL_OPENIB_HP_QP); if(rc != OMPI_SUCCESS) { @@ -607,8 +636,8 @@ static int mca_btl_openib_endpoint_reply_start_connect( /* Send connection info over to remote endpoint */ endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK; - if(OMPI_SUCCESS != - (rc = mca_btl_openib_endpoint_send_connect_data(endpoint))) { + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_send_connect_data(endpoint, + ENDPOINT_CONNECT_RSP))) { BTL_ERROR(("error in endpoint send connect request error code is %d", rc)); return rc; @@ -616,14 +645,6 @@ static int mca_btl_openib_endpoint_reply_start_connect( return OMPI_SUCCESS; } -/* - * endpoint is waiting ack to final connection establishment.. - */ - -static void mca_btl_openib_endpoint_waiting_ack(mca_btl_openib_endpoint_t *endpoint) { - endpoint->endpoint_state = MCA_BTL_IB_WAITING_ACK; -} - /* * called when the openib has completed setup via the * OOB channel @@ -634,7 +655,7 @@ static void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoin opal_list_item_t *frag_item; mca_btl_openib_frag_t *frag; mca_btl_openib_module_t* openib_btl; - + endpoint->endpoint_state = MCA_BTL_IB_CONNECTED; endpoint->endpoint_btl->poll_cq = true; @@ -677,38 +698,17 @@ static void mca_btl_openib_endpoint_recv( mca_btl_openib_endpoint_t *ib_endpoint = NULL; int endpoint_state; int rc; - uint32_t i; - int32_t cnt = 1; - mca_btl_openib_rem_info_t rem_info; - + uint32_t i, lcl_qp = 0; + uint16_t lcl_lid = 0; + int32_t cnt = 1; + mca_btl_openib_rem_info_t rem_info = {{0, }, }; + uint8_t message_type; + bool master; + /* start by unpacking data first so we know who is knocking at our door */ - rc = orte_dss.unpack(buffer, &rem_info.rem_qp_num[BTL_OPENIB_HP_QP], &cnt, - ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return; - } - rc = orte_dss.unpack(buffer, &rem_info.rem_qp_num[BTL_OPENIB_LP_QP], &cnt, - ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return; - } - rc = orte_dss.unpack(buffer, &rem_info.rem_psn[BTL_OPENIB_HP_QP], &cnt, - ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return; - } - rc = orte_dss.unpack(buffer, &rem_info.rem_psn[BTL_OPENIB_LP_QP], &cnt, - ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return; - } - rc = orte_dss.unpack(buffer, &rem_info.rem_lid, &cnt, ORTE_UINT16); + rc = orte_dss.unpack(buffer, &message_type, &cnt, ORTE_UINT8); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return; @@ -718,15 +718,58 @@ static void mca_btl_openib_endpoint_recv( ORTE_ERROR_LOG(rc); return; } - rc = orte_dss.unpack(buffer, &rem_info.rem_mtu, &cnt, ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return; + if(message_type != ENDPOINT_CONNECT_REQ) { + rc = orte_dss.unpack(buffer, &lcl_qp, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &lcl_lid, &cnt, ORTE_UINT16); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } } - rc = orte_dss.unpack(buffer, &rem_info.rem_index, &cnt, ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return; + if(message_type != ENDPOINT_CONNECT_ACK) { + rc = orte_dss.unpack(buffer, &rem_info.rem_qp_num[BTL_OPENIB_HP_QP], + &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &rem_info.rem_qp_num[BTL_OPENIB_LP_QP], + &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &rem_info.rem_psn[BTL_OPENIB_HP_QP], &cnt, + ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &rem_info.rem_psn[BTL_OPENIB_LP_QP], &cnt, + ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &rem_info.rem_lid, &cnt, ORTE_UINT16); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &rem_info.rem_mtu, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dss.unpack(buffer, &rem_info.rem_index, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } } BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d, SUBNET = %016x\n", @@ -734,106 +777,124 @@ static void mca_btl_openib_endpoint_recv( rem_info.rem_qp_num[BTL_OPENIB_LP_QP], rem_info.rem_lid, rem_info.rem_subnet_id)); - + + master = orte_ns.compare_fields(ORTE_NS_CMP_ALL, orte_process_info.my_name, + process_name) > 0 ? true : false; + for(ib_proc = (mca_btl_openib_proc_t*) opal_list_get_first(&mca_btl_openib_component.ib_procs); ib_proc != (mca_btl_openib_proc_t*) opal_list_get_end(&mca_btl_openib_component.ib_procs); ib_proc = (mca_btl_openib_proc_t*)opal_list_get_next(ib_proc)) { + bool found = false; - if(orte_ns.compare_fields(ORTE_NS_CMP_ALL, &ib_proc->proc_guid, process_name) == ORTE_EQUAL) { - bool found = false; - - /* Try to get the endpoint instance of this proc */ + if(orte_ns.compare_fields(ORTE_NS_CMP_ALL, + &ib_proc->proc_guid, process_name) != ORTE_EQUAL) + continue; + if(message_type != ENDPOINT_CONNECT_REQ) { + /* This is a reply message. Try to get the endpoint instance the + * reply belongs to */ for(i = 0; i < ib_proc->proc_endpoint_count; i++) { - mca_btl_openib_port_info_t port_info; - port_info = ib_proc->proc_ports[i]; - ib_endpoint = ib_proc->proc_endpoints[i]; - if(ib_endpoint->rem_info.rem_lid && - (ib_endpoint->rem_info.rem_lid == rem_info.rem_lid && - ib_endpoint->rem_info.rem_qp_num[BTL_OPENIB_HP_QP] == - rem_info.rem_qp_num[BTL_OPENIB_HP_QP])) { - /* we've seen them before! */ - found = true; + ib_endpoint = ib_proc->proc_endpoints[i]; + if(ib_endpoint->lcl_qp[BTL_OPENIB_HP_QP] != NULL && + lcl_lid == ib_endpoint->endpoint_btl->lid && + lcl_qp == ib_endpoint->lcl_qp[BTL_OPENIB_HP_QP]->qp_num && + rem_info.rem_subnet_id == ib_endpoint->subnet_id) { + found = true; break; } } - /* If we haven't seen this remote lid before then try to match on - endpoint */ - for(i = 0; !found && i < ib_proc->proc_endpoint_count; i++) { - mca_btl_openib_port_info_t port_info; - port_info = ib_proc->proc_ports[i]; - ib_endpoint = ib_proc->proc_endpoints[i]; - if(!ib_endpoint->rem_info.rem_lid && - ib_endpoint->subnet_id == rem_info.rem_subnet_id) { - /* found a match based on subnet! */ - found = true; - break; - } + } else { + /* This is new connection request. If this is master try to find + * endpoint in a connecting state. If this is slave try to find + * endpoint in closed state and initiate connection back */ + mca_btl_openib_endpoint_t *ib_endpoint_found = NULL; + for(i = 0; i < ib_proc->proc_endpoint_count; i++) { + ib_endpoint = ib_proc->proc_endpoints[i]; + if(ib_endpoint->subnet_id != rem_info.rem_subnet_id || + (ib_endpoint->endpoint_state != MCA_BTL_IB_CONNECTING + && ib_endpoint->endpoint_state != MCA_BTL_IB_CLOSED)) + continue; + found = true; + ib_endpoint_found = ib_endpoint; + if((master && + ib_endpoint->endpoint_state == MCA_BTL_IB_CONNECTING) || + (!master && + ib_endpoint->endpoint_state == MCA_BTL_IB_CLOSED)) + break; /* Found one. No point to continue */ } - - - if(!found) { - BTL_ERROR(("can't find suitable endpoint for this peer\n")); - return; + ib_endpoint = ib_endpoint_found; + + /* if this is slave and there is no endpoints in closed state + * then all connection are already in progress so just ignore + * this connection request */ + if(!master && ib_endpoint->endpoint_state != MCA_BTL_IB_CLOSED) { + return; } - - OPAL_THREAD_LOCK(&ib_endpoint->endpoint_lock); - endpoint_state = ib_endpoint->endpoint_state; - - /* Update status */ - switch(endpoint_state) { - case MCA_BTL_IB_CLOSED : - /* We had this connection closed before. - * The endpoint is trying to connect. Move the - * status of this connection to CONNECTING, - * and then reply with our QP information */ - - if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_reply_start_connect(ib_endpoint, &rem_info))) { - BTL_ERROR(("error in endpoint reply start connect")); - break; - } - - /** As long as we expect a message from the peer (in order to setup the connection) - * let the event engine pool the OOB events. Note: we increment it once peer active - * connection. - */ - opal_progress_event_users_increment(); - break; - - case MCA_BTL_IB_CONNECTING : + } - mca_btl_openib_endpoint_set_remote_info(ib_endpoint, &rem_info); - if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_connect(ib_endpoint))) { - BTL_ERROR(("endpoint connect error: %d", rc)); - break; - } - - /* Setup state as awaiting ack from peer */ - mca_btl_openib_endpoint_waiting_ack(ib_endpoint); + if(!found) { + BTL_ERROR(("can't find suitable endpoint for this peer\n")); + return; + } - /* Send him an ack */ - mca_btl_openib_endpoint_send_connect_data(ib_endpoint); - break; - - case MCA_BTL_IB_WAITING_ACK: - mca_btl_openib_endpoint_connected(ib_endpoint); - break; - - case MCA_BTL_IB_CONNECT_ACK: - mca_btl_openib_endpoint_send_connect_data(ib_endpoint); - mca_btl_openib_endpoint_connected(ib_endpoint); + endpoint_state = ib_endpoint->endpoint_state; + + /* Update status */ + switch(endpoint_state) { + case MCA_BTL_IB_CLOSED : + /* We had this connection closed before. + * The endpoint is trying to connect. Move the + * status of this connection to CONNECTING, + * and then reply with our QP information */ + + if(master) { + rc = mca_btl_openib_endpoint_reply_start_connect(ib_endpoint, + &rem_info); + } else { + rc = mca_btl_openib_endpoint_start_connect(ib_endpoint); + } + + if(OMPI_SUCCESS != rc) { + BTL_ERROR(("error in endpoint reply start connect")); break; + } - case MCA_BTL_IB_CONNECTED : + /** As long as we expect a message from the peer (in order to setup the connection) + * let the event engine pool the OOB events. Note: we increment it once peer active + * connection. + */ + opal_progress_event_users_increment(); + break; + + case MCA_BTL_IB_CONNECTING : + + mca_btl_openib_endpoint_set_remote_info(ib_endpoint, &rem_info); + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_connect(ib_endpoint))) { + BTL_ERROR(("endpoint connect error: %d", rc)); break; - default : - BTL_ERROR(("Invalid endpoint state %d", endpoint_state)); } - OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock); + + /* Send him an ack */ + mca_btl_openib_endpoint_send_connect_data(ib_endpoint, + ENDPOINT_CONNECT_RSP); + mca_btl_openib_endpoint_connected(ib_endpoint); + break; + + case MCA_BTL_IB_CONNECT_ACK: + mca_btl_openib_endpoint_send_connect_data(ib_endpoint, + ENDPOINT_CONNECT_ACK); + mca_btl_openib_endpoint_connected(ib_endpoint); + break; + + case MCA_BTL_IB_CONNECTED : break; + default : + BTL_ERROR(("Invalid endpoint state %d", endpoint_state)); } + OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock); + break; } } @@ -879,7 +940,6 @@ int mca_btl_openib_endpoint_send( break; case MCA_BTL_IB_CONNECT_ACK: - case MCA_BTL_IB_WAITING_ACK: BTL_VERBOSE(("Queuing because waiting for ack")); opal_list_append(&endpoint->pending_send_frags, diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.h b/ompi/mca/btl/openib/btl_openib_endpoint.h index 0ab5222..c95ce43 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.h +++ b/ompi/mca/btl/openib/btl_openib_endpoint.h @@ -48,9 +48,6 @@ typedef enum { /* Waiting for ack from endpoint */ MCA_BTL_IB_CONNECT_ACK, - /*Waiting for final connection ACK from endpoint */ - MCA_BTL_IB_WAITING_ACK, - /* Connected ... both sender & receiver have * buffers associated with this connection */ MCA_BTL_IB_CONNECTED,