Diff for /servers/slapd/overlays/syncprov.c between versions 1.147.2.50 and 1.147.2.53

version 1.147.2.50, 2009/03/13 19:47:32 version 1.147.2.53, 2009/03/17 16:26:43
Line 1 Line 1
 /* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.49 2009/03/12 23:28:52 quanah Exp $ */  /* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.52 2009/03/17 16:25:00 quanah Exp $ */
 /* syncprov.c - syncrepl provider */  /* syncprov.c - syncrepl provider */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.  /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *   *
Line 71  typedef struct syncops { Line 71  typedef struct syncops {
 #define PS_WROTE_BASE           0x04  #define PS_WROTE_BASE           0x04
 #define PS_FIND_BASE            0x08  #define PS_FIND_BASE            0x08
 #define PS_FIX_FILTER           0x10  #define PS_FIX_FILTER           0x10
   #define PS_TASK_QUEUED          0x20
   
         int             s_inuse;        /* reference count */          int             s_inuse;        /* reference count */
         struct syncres *s_res;          struct syncres *s_res;
         struct syncres *s_restail;          struct syncres *s_restail;
         struct re_s     *s_qtask;       /* task for playing psearch responses */  
 #define RUNQ_INTERVAL   36000   /* a long time */  
         ldap_pvt_thread_mutex_t s_mutex;          ldap_pvt_thread_mutex_t s_mutex;
 } syncops;  } syncops;
   
Line 143  typedef struct syncprov_info_t { Line 142  typedef struct syncprov_info_t {
 typedef struct opcookie {  typedef struct opcookie {
         slap_overinst *son;          slap_overinst *son;
         syncmatches *smatches;          syncmatches *smatches;
           modtarget *smt;
         struct berval sdn;      /* DN of entry, for deletes */          struct berval sdn;      /* DN of entry, for deletes */
         struct berval sndn;          struct berval sndn;
         struct berval suuid;    /* UUID of entry */          struct berval suuid;    /* UUID of entry */
Line 748  syncprov_free_syncop( syncops *so ) Line 748  syncprov_free_syncop( syncops *so )
                 ldap_pvt_thread_mutex_unlock( &so->s_mutex );                  ldap_pvt_thread_mutex_unlock( &so->s_mutex );
                 return;                  return;
         }          }
         if ( so->s_qtask ) {  
                 ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );  
                 if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) )  
                         ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );  
                 ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask );  
                 ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );  
         }  
         ldap_pvt_thread_mutex_unlock( &so->s_mutex );          ldap_pvt_thread_mutex_unlock( &so->s_mutex );
         if ( so->s_flags & PS_IS_DETACHED ) {          if ( so->s_flags & PS_IS_DETACHED ) {
                 filter_free( so->s_op->ors_filter );                  filter_free( so->s_op->ors_filter );
Line 861  syncprov_sendresp( Operation *op, opcook Line 854  syncprov_sendresp( Operation *op, opcook
         return rs.sr_err;          return rs.sr_err;
 }  }
   
   static void
   syncprov_qstart( syncops *so );
   
 /* Play back queued responses */  /* Play back queued responses */
 static int  static int
 syncprov_qplay( Operation *op, struct re_s *rtask )  syncprov_qplay( Operation *op, syncops *so )
 {  {
         syncops *so = rtask->arg;  
         slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;          slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
         syncres *sr;          syncres *sr;
         Entry *e;          Entry *e;
Line 874  syncprov_qplay( Operation *op, struct re Line 869  syncprov_qplay( Operation *op, struct re
   
         opc.son = on;          opc.son = on;
   
         for (;;) {          do {
                 ldap_pvt_thread_mutex_lock( &so->s_mutex );                  ldap_pvt_thread_mutex_lock( &so->s_mutex );
                 sr = so->s_res;                  sr = so->s_res;
                 if ( sr )                  if ( sr )
Line 918  syncprov_qplay( Operation *op, struct re Line 913  syncprov_qplay( Operation *op, struct re
   
                 ch_free( sr );                  ch_free( sr );
   
                 if ( rc ) {                  /* Exit loop with mutex held */
                         /* Exit loop with mutex held */                  ldap_pvt_thread_mutex_lock( &so->s_mutex );
                         ldap_pvt_thread_mutex_lock( &so->s_mutex );  
                         break;  
                 }  
         }  
   
         /* wait until we get explicitly scheduled again */          } while (0);
         ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );  
         ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );          /* We now only send one change at a time, to prevent one
         if ( rc == 0 ) {           * psearch from hogging all the CPU. Resubmit this task if
                 ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );           * there are more responses queued and no errors occurred.
         } else {           */
                 /* bail out on any error */  
                 ldap_pvt_runqueue_remove( &slapd_rq, rtask );  
   
                 /* Prevent duplicate remove */          if ( rc == 0 && so->s_res ) {
                 if ( so->s_qtask == rtask )                  syncprov_qstart( so );
                         so->s_qtask = NULL;          } else {
                   so->s_flags ^= PS_TASK_QUEUED;
         }          }
         ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );  
         ldap_pvt_thread_mutex_unlock( &so->s_mutex );          ldap_pvt_thread_mutex_unlock( &so->s_mutex );
         return rc;          return rc;
 }  }
   
 /* runqueue task for playing back queued responses */  /* task for playing back queued responses */
 static void *  static void *
 syncprov_qtask( void *ctx, void *arg )  syncprov_qtask( void *ctx, void *arg )
 {  {
         struct re_s *rtask = arg;          syncops *so = arg;
         syncops *so = rtask->arg;  
         OperationBuffer opbuf;          OperationBuffer opbuf;
         Operation *op;          Operation *op;
         BackendDB be;          BackendDB be;
Line 973  syncprov_qtask( void *ctx, void *arg ) Line 962  syncprov_qtask( void *ctx, void *arg )
         LDAP_SLIST_FIRST(&op->o_extra) = NULL;          LDAP_SLIST_FIRST(&op->o_extra) = NULL;
         op->o_callback = NULL;          op->o_callback = NULL;
   
         rc = syncprov_qplay( op, rtask );          rc = syncprov_qplay( op, so );
   
         /* decrement use count... */          /* decrement use count... */
         syncprov_free_syncop( so );          syncprov_free_syncop( so );
   
 #if 0   /* FIXME: connection_close isn't exported from slapd.  
                  * should it be?  
                  */  
         if ( rc ) {  
                 ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );  
                 if ( connection_state_closing( op->o_conn )) {  
                         connection_close( op->o_conn );  
                 }  
                 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );  
         }  
 #endif  
         return NULL;          return NULL;
 }  }
   
Line 996  syncprov_qtask( void *ctx, void *arg ) Line 974  syncprov_qtask( void *ctx, void *arg )
 static void  static void
 syncprov_qstart( syncops *so )  syncprov_qstart( syncops *so )
 {  {
         int wake=0;          so->s_flags |= PS_TASK_QUEUED;
         ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );          so->s_inuse++;
         if ( !so->s_qtask ) {          ldap_pvt_thread_pool_submit( &connection_pool, 
                 so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,                  syncprov_qtask, so );
                         syncprov_qtask, so, "syncprov_qtask",  
                         so->s_op->o_conn->c_peer_name.bv_val );  
                 ++so->s_inuse;  
                 wake = 1;  
         } else {  
                 if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&  
                         !so->s_qtask->next_sched.tv_sec ) {  
                         so->s_qtask->interval.tv_sec = 0;  
                         ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );  
                         so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;  
                         ++so->s_inuse;  
                         wake = 1;  
                 }  
         }  
         ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );  
         if ( wake )  
                 slap_wake_listener();  
 }  }
   
 /* Queue a persistent search response */  /* Queue a persistent search response */
Line 1076  syncprov_qresp( opcookie *opc, syncops * Line 1037  syncprov_qresp( opcookie *opc, syncops *
                 so->s_flags ^= PS_WROTE_BASE;                  so->s_flags ^= PS_WROTE_BASE;
                 so->s_flags |= PS_FIND_BASE;                  so->s_flags |= PS_FIND_BASE;
         }          }
         if ( so->s_flags & PS_IS_DETACHED ) {          if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
                 syncprov_qstart( so );                  syncprov_qstart( so );
         }          }
         ldap_pvt_thread_mutex_unlock( &so->s_mutex );          ldap_pvt_thread_mutex_unlock( &so->s_mutex );
Line 1353  syncprov_op_cleanup( Operation *op, Slap Line 1314  syncprov_op_cleanup( Operation *op, Slap
         }          }
   
         /* Remove op from lock table */          /* Remove op from lock table */
         mtdummy.mt_op = op;          mt = opc->smt;
         ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );  
         mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );  
         if ( mt ) {          if ( mt ) {
                 modinst *mi = mt->mt_mods;                  modinst *mi = mt->mt_mods;
   
                 /* If there are more, promote the next one */                  /* If there are more, promote the next one */
                 ldap_pvt_thread_mutex_lock( &mt->mt_mutex );  
                 if ( mi->mi_next ) {                  if ( mi->mi_next ) {
                           ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
                         mt->mt_mods = mi->mi_next;                          mt->mt_mods = mi->mi_next;
                         mt->mt_op = mt->mt_mods->mi_op;                          mt->mt_op = mt->mt_mods->mi_op;
                         ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );                          ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
                 } else {                  } else {
                           ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
                         avl_delete( &si->si_mods, mt, sp_avl_cmp );                          avl_delete( &si->si_mods, mt, sp_avl_cmp );
                         ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );                          ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
                         ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );                          ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
                         ch_free( mt );                          ch_free( mt );
                 }                  }
         }          }
         ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );  
         if ( !BER_BVISNULL( &opc->suuid ))          if ( !BER_BVISNULL( &opc->suuid ))
                 op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );                  op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );
         if ( !BER_BVISNULL( &opc->sndn ))          if ( !BER_BVISNULL( &opc->sndn ))
Line 1685  syncprov_op_response( Operation *op, Sla Line 1644  syncprov_op_response( Operation *op, Sla
                 maxcsn.bv_len = sizeof(cbuf);                  maxcsn.bv_len = sizeof(cbuf);
                 ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock );                  ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock );
   
   #if 0
                   if ( op->o_dont_replicate &&
                                   op->orm_modlist->sml_op == LDAP_MOD_REPLACE &&
                                   op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) {
                           /* Catch contextCSN updates from syncrepl. We have to look at
                            * all the attribute values, as there may be more than one csn
                            * that changed, and only one can be passed in the csn queue.
                            */
                           Modifications *mod = op->orm_modlist;
                           int i, j, sid;
   
                           for ( i=0; i<mod->sml_numvals; i++ ) {
                                   sid = slap_parse_csn_sid( &mod->sml_values[i] );
   
                                   for ( j=0; j<si->si_numcsns; j++ ) {
                                           if ( sid == si->si_sids[j] ) {
                                                   if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) {
                                                           ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] );
                                                           csn_changed = 1;
                                                   }
                                                   break;
                                           }
                                   }
   
                                   if ( j == si->si_numcsns ) {
                                           value_add_one( &si->si_ctxcsn, &mod->sml_values[i] );
                                           si->si_numcsns++;
                                           si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
                                                   sizeof(int));
                                           si->si_sids[j] = sid;
                                           csn_changed = 1;
                                   }
                           }
                           ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
   
                           if ( csn_changed ) {
                                   ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
                                   have_psearches = ( si->si_ops != NULL );
                                   ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
   
                                   if ( have_psearches ) {
                                           for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
                                                   if ( sm->sm_op->s_op->o_abandon )
                                                           continue;
                                                   syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_NEW_COOKIE );
                                           }
                                   }
                           }
                           return SLAP_CB_CONTINUE;
                   }
   #endif
   
                 slap_get_commit_csn( op, &maxcsn, &foundit );                  slap_get_commit_csn( op, &maxcsn, &foundit );
                 if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) {                  if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) {
                         /* syncrepl queues the CSN values in the db where                          /* syncrepl queues the CSN values in the db where
Line 1962  syncprov_op_mod( Operation *op, SlapRepl Line 1973  syncprov_op_mod( Operation *op, SlapRepl
                         avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );                          avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
                         ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );                          ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
                 }                  }
                   opc->smt = mt;
         }          }
   
         if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )          if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )

Removed from v.1.147.2.50  
changed lines
  Added in v.1.147.2.53


______________
© Copyright 1998-2020, OpenLDAP Foundation, info@OpenLDAP.org