Open MPI logo

Open MPI Development Mailing List Archives

  |   Home   |   Support   |   FAQ   |   all Development mailing list

From: Graham E Fagg (fagg_at_[hidden])
Date: 2006-01-14 16:07:18


Hi all,
  whatever this fixed/changed, I no longer get corrupted memory in the
tuned data segment hung off each communicator... ! I'm still testing to
see if I get TimPs error.
G

On Sat, 14 Jan 2006 bosilca_at_[hidden] wrote:

> Author: bosilca
> Date: 2006-01-14 15:21:44 -0500 (Sat, 14 Jan 2006)
> New Revision: 8692
>
> Modified:
> trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c
> trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h
> trunk/ompi/mca/btl/tcp/btl_tcp_frag.c
> trunk/ompi/mca/btl/tcp/btl_tcp_frag.h
> Log:
> A better implementation for the TCP endpoint cache + few comments.
>
>
> Modified: trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c
> ===================================================================
> --- trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c 2006-01-14 20:19:01 UTC (rev 8691)
> +++ trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c 2006-01-14 20:21:44 UTC (rev 8692)
> @@ -79,7 +79,7 @@
> endpoint->endpoint_nbo = false;
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> endpoint->endpoint_cache = NULL;
> - endpoint->endpoint_cache_pos = 0;
> + endpoint->endpoint_cache_pos = NULL;
> endpoint->endpoint_cache_length = 0;
> #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
> OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
> @@ -187,21 +187,20 @@
> static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint, int sd)
> {
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> - btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
> + btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
> + btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
> #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
>
> - opal_event_set(
> - &btl_endpoint->endpoint_recv_event,
> - btl_endpoint->endpoint_sd,
> - OPAL_EV_READ|OPAL_EV_PERSIST,
> - mca_btl_tcp_endpoint_recv_handler,
> - btl_endpoint);
> - opal_event_set(
> - &btl_endpoint->endpoint_send_event,
> - btl_endpoint->endpoint_sd,
> - OPAL_EV_WRITE|OPAL_EV_PERSIST,
> - mca_btl_tcp_endpoint_send_handler,
> - btl_endpoint);
> + opal_event_set( &btl_endpoint->endpoint_recv_event,
> + btl_endpoint->endpoint_sd,
> + OPAL_EV_READ|OPAL_EV_PERSIST,
> + mca_btl_tcp_endpoint_recv_handler,
> + btl_endpoint );
> + opal_event_set( &btl_endpoint->endpoint_send_event,
> + btl_endpoint->endpoint_sd,
> + OPAL_EV_WRITE|OPAL_EV_PERSIST,
> + mca_btl_tcp_endpoint_send_handler,
> + btl_endpoint);
> }
>
>
> @@ -357,7 +356,9 @@
> btl_endpoint->endpoint_sd = -1;
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> free( btl_endpoint->endpoint_cache );
> - btl_endpoint->endpoint_cache = NULL;
> + btl_endpoint->endpoint_cache = NULL;
> + btl_endpoint->endpoint_cache_pos = NULL;
> + btl_endpoint->endpoint_cache_length = 0;
> #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
> }
> btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
> @@ -619,13 +620,12 @@
> }
>
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> - btl_endpoint->endpoint_cache_pos = 0;
> + assert( 0 == btl_endpoint->endpoint_cache_length );
> data_still_pending_on_endpoint:
> #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
> /* check for completion of non-blocking recv on the current fragment */
> if(mca_btl_tcp_frag_recv(frag, sd) == false) {
> btl_endpoint->endpoint_recv_frag = frag;
> - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
> } else {
> btl_endpoint->endpoint_recv_frag = NULL;
> switch(frag->hdr.type) {
> @@ -636,39 +636,37 @@
> break;
> }
> default:
> - {
> - break;
> - }
> + break;
> }
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> if( 0 != btl_endpoint->endpoint_cache_length ) {
> + /* If the cache still contain some data we can reuse the same fragment
> + * until we flush it completly.
> + */
> MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
> goto data_still_pending_on_endpoint;
> }
> #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
> MCA_BTL_TCP_FRAG_RETURN_MAX(frag);
> - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
> }
> - break;
> - }
> - case MCA_BTL_TCP_SHUTDOWN:
> - {
> OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
> + assert( 0 == btl_endpoint->endpoint_cache_length );
> break;
> }
> + case MCA_BTL_TCP_SHUTDOWN:
> + OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
> + break;
> default:
> - {
> - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
> - BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
> - mca_btl_tcp_endpoint_close(btl_endpoint);
> - break;
> - }
> + OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
> + BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
> + mca_btl_tcp_endpoint_close(btl_endpoint);
> + break;
> }
> }
>
>
> /*
> - * A file descriptor is available/ready for send. Check the state
> + * A file descriptor is available/ready for send. Check the state
> * of the socket and take the appropriate action.
> */
>
> @@ -680,7 +678,7 @@
> case MCA_BTL_TCP_CONNECTING:
> mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
> break;
> - case MCA_BTL_TCP_CONNECTED:
> + case MCA_BTL_TCP_CONNECTED:
> {
> /* complete the current send */
> do {
>
> Modified: trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h
> ===================================================================
> --- trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h 2006-01-14 20:19:01 UTC (rev 8691)
> +++ trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h 2006-01-14 20:21:44 UTC (rev 8692)
> @@ -60,9 +60,9 @@
> struct mca_btl_tcp_addr_t* endpoint_addr; /**< address of endpoint */
> int endpoint_sd; /**< socket connection to endpoint */
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> - char* endpoint_cache; /**< cache for the recv (reduce the number of recv syscall */
> - size_t endpoint_cache_pos; /**< */
> - size_t endpoint_cache_length; /**< */
> + char* endpoint_cache; /**< cache for the recv (reduce the number of recv syscall) */
> + char* endpoint_cache_pos; /**< current position in the cache */
> + size_t endpoint_cache_length; /**< length of the data in the cache */
> #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
> struct mca_btl_tcp_frag_t* endpoint_send_frag; /**< current send frag being processed */
> struct mca_btl_tcp_frag_t* endpoint_recv_frag; /**< current recv frag being processed */
>
> Modified: trunk/ompi/mca/btl/tcp/btl_tcp_frag.c
> ===================================================================
> --- trunk/ompi/mca/btl/tcp/btl_tcp_frag.c 2006-01-14 20:19:01 UTC (rev 8691)
> +++ trunk/ompi/mca/btl/tcp/btl_tcp_frag.c 2006-01-14 20:21:44 UTC (rev 8692)
> @@ -119,29 +119,36 @@
> bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
> {
> int cnt;
> - size_t i, num_vecs = frag->iov_cnt;
> + size_t i, num_vecs;
> mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
>
> repeat:
> -
> + num_vecs = frag->iov_cnt;
> #if MCA_BTL_TCP_ENDPOINT_CACHE
> if( 0 != btl_endpoint->endpoint_cache_length ) {
> size_t length = btl_endpoint->endpoint_cache_length;
> + /* It's strange at the first look but cnt have to be set to the full amount of data available.
> + * After going to advance_iov_position we will use cnt to detect if there is still some
> + * data pending.
> + */
> cnt = btl_endpoint->endpoint_cache_length;
> for( i = 0; i < frag->iov_cnt; i++ ) {
> if( length > frag->iov_ptr[i].iov_len )
> length = frag->iov_ptr[0].iov_len;
> - memcpy( frag->iov_ptr[i].iov_base,
> - btl_endpoint->endpoint_cache + btl_endpoint->endpoint_cache_pos,
> - length );
> + memcpy( frag->iov_ptr[i].iov_base, btl_endpoint->endpoint_cache_pos, length );
> btl_endpoint->endpoint_cache_pos += length;
> btl_endpoint->endpoint_cache_length -= length;
> length = btl_endpoint->endpoint_cache_length;
> - if( 0 == length ) break;
> + if( 0 == length ) {
> + btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
> + break;
> + }
> }
> goto advance_iov_position;
> }
> -
> + /* What's happens if all iovecs are used by the fragment ? It still work, as we reserve one
> + * iovec for the caching in the fragment structure (the +1).
> + */
> frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache;
> frag->iov_ptr[num_vecs].iov_len = mca_btl_tcp_component.tcp_endpoint_cache;
> num_vecs++;
> @@ -162,15 +169,13 @@
> frag->iov_ptr[0].iov_base, frag->iov_ptr[0].iov_len,
> strerror(ompi_socket_errno), frag->iov_cnt );
> default:
> - {
> - opal_output(0, "mca_btl_tcp_frag_send: writev failed with errno=%d",
> - ompi_socket_errno);
> - mca_btl_tcp_endpoint_close(btl_endpoint);
> - return false;
> - }
> + opal_output(0, "mca_btl_tcp_frag_send: writev failed with errno=%d",
> + ompi_socket_errno);
> + mca_btl_tcp_endpoint_close(btl_endpoint);
> + return false;
> }
> }
> - if(cnt == 0) {
> + if( cnt == 0 ) {
> mca_btl_tcp_endpoint_close(btl_endpoint);
> return false;
> }
> @@ -180,8 +185,8 @@
> advance_iov_position:
> /* if the write didn't complete - update the iovec state */
> num_vecs = frag->iov_cnt;
> - for(i=0; i<num_vecs; i++) {
> - if(cnt >= (int)frag->iov_ptr->iov_len) {
> + for( i = 0; i < num_vecs; i++ ) {
> + if( cnt >= (int)frag->iov_ptr->iov_len ) {
> cnt -= frag->iov_ptr->iov_len;
> frag->iov_idx++;
> frag->iov_ptr++;
> @@ -190,7 +195,7 @@
> frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
> (((unsigned char*)frag->iov_ptr->iov_base) + cnt);
> frag->iov_ptr->iov_len -= cnt;
> - cnt = 0;
> + cnt = 0;
> break;
> }
> }
>
> Modified: trunk/ompi/mca/btl/tcp/btl_tcp_frag.h
> ===================================================================
> --- trunk/ompi/mca/btl/tcp/btl_tcp_frag.h 2006-01-14 20:19:01 UTC (rev 8691)
> +++ trunk/ompi/mca/btl/tcp/btl_tcp_frag.h 2006-01-14 20:21:44 UTC (rev 8692)
> @@ -49,7 +49,7 @@
> struct mca_btl_base_endpoint_t *endpoint;
> struct mca_btl_tcp_module_t* btl;
> mca_btl_tcp_hdr_t hdr;
> - struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER];
> + struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
> struct iovec *iov_ptr;
> size_t iov_cnt;
> size_t iov_idx;
>
> _______________________________________________
> svn mailing list
> svn_at_[hidden]
> http://www.open-mpi.org/mailman/listinfo.cgi/svn
>

Thanks,
         Graham.
----------------------------------------------------------------------
Dr Graham E. Fagg | Distributed, Parallel and Meta-Computing
Innovative Computing Lab. PVM3.4, HARNESS, FT-MPI, SNIPE & Open MPI
Computer Science Dept | Suite 203, 1122 Volunteer Blvd,
University of Tennessee | Knoxville, Tennessee, USA. TN 37996-3450
Email: fagg_at_[hidden] | Phone:+1(865)974-5790 | Fax:+1(865)974-8296
Broken complex systems are always derived from working simple systems
----------------------------------------------------------------------