Diff for /servers/slapd/overlays/syncprov.c between versions 1.56.2.16 and 1.110

version 1.56.2.16, 2005/10/05 17:42:20 version 1.110, 2005/10/02 10:26:02
Line 1 Line 1
 /* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.114 2005/10/04 00:29:25 hyc Exp $ */  /* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.109 2005/10/02 08:48:28 hyc Exp $ */
 /* syncprov.c - syncrepl provider */  /* syncprov.c - syncrepl provider */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.  /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *   *
Line 70  typedef struct syncops { Line 70  typedef struct syncops {
         struct syncres *s_res;          struct syncres *s_res;
         struct syncres *s_restail;          struct syncres *s_restail;
         struct re_s     *s_qtask;       /* task for playing psearch responses */          struct re_s     *s_qtask;       /* task for playing psearch responses */
 #define RUNQ_INTERVAL   36000   /* a long time */  
         ldap_pvt_thread_mutex_t s_mutex;          ldap_pvt_thread_mutex_t s_mutex;
 } syncops;  } syncops;
   
Line 697  again: Line 696  again:
         return rc;          return rc;
 }  }
   
 static void  
 syncprov_free_syncop( syncops *so )  
 {  
         syncres *sr, *srnext;  
         GroupAssertion *ga, *gnext;  
   
         ldap_pvt_thread_mutex_lock( &so->s_mutex );  
         if ( --so->s_inuse > 0 ) {  
                 ldap_pvt_thread_mutex_unlock( &so->s_mutex );  
                 return;  
         }  
         ldap_pvt_thread_mutex_unlock( &so->s_mutex );  
         if ( so->s_flags & PS_IS_DETACHED ) {  
                 filter_free( so->s_op->ors_filter );  
                 for ( ga = so->s_op->o_groups; ga; ga=gnext ) {  
                         gnext = ga->ga_next;  
                         ch_free( ga );  
                 }  
                 ch_free( so->s_op );  
         }  
         ch_free( so->s_base.bv_val );  
         for ( sr=so->s_res; sr; sr=srnext ) {  
                 srnext = sr->s_next;  
                 ch_free( sr );  
         }  
         ldap_pvt_thread_mutex_destroy( &so->s_mutex );  
         ch_free( so );  
 }  
   
 /* Send a persistent search response */  /* Send a persistent search response */
 static int  static int
 syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode)  syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode)
Line 791  syncprov_sendresp( Operation *op, opcook Line 761  syncprov_sendresp( Operation *op, opcook
         default:          default:
                 assert(0);                  assert(0);
         }          }
         /* In case someone else freed it already? */          op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
         if ( rs.sr_ctrls ) {          rs.sr_ctrls = NULL;
                 op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );  
                 rs.sr_ctrls = NULL;  
         }  
   
         return rs.sr_err;          return rs.sr_err;
 }  }
Line 821  syncprov_qplay( Operation *op, slap_over Line 788  syncprov_qplay( Operation *op, slap_over
                         so->s_restail = NULL;                          so->s_restail = NULL;
                 ldap_pvt_thread_mutex_unlock( &so->s_mutex );                  ldap_pvt_thread_mutex_unlock( &so->s_mutex );
   
                 if ( !sr || so->s_op->o_abandon )                  if ( !sr )
                         break;                          break;
   
                 opc.sdn = sr->s_dn;                  if ( !so->s_op->o_abandon ) {
                 opc.sndn = sr->s_ndn;                          opc.sdn = sr->s_dn;
                 opc.suuid = sr->s_uuid;                          opc.sndn = sr->s_ndn;
                 opc.sctxcsn = sr->s_csn;                          opc.suuid = sr->s_uuid;
                 opc.sreference = sr->s_isreference;                          opc.sctxcsn = sr->s_csn;
                 e = NULL;                          opc.sreference = sr->s_isreference;
                           e = NULL;
                 if ( sr->s_mode != LDAP_SYNC_DELETE ) {  
                         rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );                          if ( sr->s_mode != LDAP_SYNC_DELETE ) {
                         if ( rc ) {                                  rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
                                 ch_free( sr );                                  if ( rc ) {
                                 continue;                                          ch_free( sr );
                                           continue;
                                   }
                         }                          }
                 }                          rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
                 rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );  
   
                 if ( e ) {                          if ( e ) {
                         be_entry_release_rw( op, e, 0 );                                  be_entry_release_rw( op, e, 0 );
                           }
                           if ( rc )
                                   break;
                 }                  }
   
                 ch_free( sr );                  ch_free( sr );
   
                 if ( rc )  
                         break;  
         }          }
         op->o_bd->bd_info = (BackendInfo *)on;          op->o_bd->bd_info = (BackendInfo *)on;
         return rc;          return rc;
Line 865  syncprov_qtask( void *ctx, void *arg ) Line 833  syncprov_qtask( void *ctx, void *arg )
         BackendDB be;          BackendDB be;
   
         op = (Operation *)opbuf;          op = (Operation *)opbuf;
         *op = *so->s_op;          memset( op, 0, sizeof(opbuf));
         op->o_hdr = (Opheader *)(op+1);          op->o_hdr = (Opheader *)(op+1);
         op->o_controls = (void **)(op->o_hdr+1);          op->o_controls = (void **)(op->o_hdr+1);
         memset( op->o_controls, 0, SLAP_MAX_CIDS * sizeof(void *));  
   
         *op->o_hdr = *so->s_op->o_hdr;          *op->o_hdr = *so->s_op->o_hdr;
   
Line 885  syncprov_qtask( void *ctx, void *arg ) Line 852  syncprov_qtask( void *ctx, void *arg )
   
         syncprov_qplay( op, on, so );          syncprov_qplay( op, on, so );
   
         /* decrement use count... */  
         syncprov_free_syncop( so );  
   
         /* wait until we get explicitly scheduled again */          /* wait until we get explicitly scheduled again */
         ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );          ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
         ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );          ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
Line 936  syncprov_qresp( opcookie *opc, syncops * Line 900  syncprov_qresp( opcookie *opc, syncops *
         if ( so->s_flags & PS_IS_DETACHED ) {          if ( so->s_flags & PS_IS_DETACHED ) {
                 ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );                  ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
                 if ( !so->s_qtask ) {                  if ( !so->s_qtask ) {
                         so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,                          so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, 1,
                                 syncprov_qtask, so, "syncprov_qtask",                                  syncprov_qtask, so, "syncprov_qtask",
                                 so->s_op->o_conn->c_peer_name.bv_val );                                  so->s_op->o_conn->c_peer_name.bv_val );
                         ++so->s_inuse;  
                 } else {                  } else {
                         if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&                          if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask )) {
                                 !so->s_qtask->next_sched.tv_sec ) {  
                                 so->s_qtask->interval.tv_sec = 0;                                  so->s_qtask->interval.tv_sec = 0;
                                 ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );                                  ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
                                 so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;                                  so->s_qtask->interval.tv_sec = 1;
                                 ++so->s_inuse;  
                         }                          }
                 }                  }
                 ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );                  ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
Line 955  syncprov_qresp( opcookie *opc, syncops * Line 916  syncprov_qresp( opcookie *opc, syncops *
         return LDAP_SUCCESS;          return LDAP_SUCCESS;
 }  }
   
   static void
   syncprov_free_syncop( syncops *so )
   {
           syncres *sr, *srnext;
           GroupAssertion *ga, *gnext;
   
           ldap_pvt_thread_mutex_lock( &so->s_mutex );
           so->s_inuse--;
           if ( so->s_inuse > 0 ) {
                   ldap_pvt_thread_mutex_unlock( &so->s_mutex );
                   return;
           }
           ldap_pvt_thread_mutex_unlock( &so->s_mutex );
           if ( so->s_flags & PS_IS_DETACHED ) {
                   filter_free( so->s_op->ors_filter );
                   for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
                           gnext = ga->ga_next;
                           ch_free( ga );
                   }
                   ch_free( so->s_op );
           }
           ch_free( so->s_base.bv_val );
           for ( sr=so->s_res; sr; sr=srnext ) {
                   srnext = sr->s_next;
                   ch_free( sr );
           }
           ldap_pvt_thread_mutex_destroy( &so->s_mutex );
           ch_free( so );
   }
   
 static int  static int
 syncprov_drop_psearch( syncops *so, int lock )  syncprov_drop_psearch( syncops *so, int lock )
 {  {
Line 1130  syncprov_matchops( Operation *op, opcook Line 1121  syncprov_matchops( Operation *op, opcook
                                 sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );                                  sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
                                 sm->sm_next = opc->smatches;                                  sm->sm_next = opc->smatches;
                                 sm->sm_op = ss;                                  sm->sm_op = ss;
                                 ldap_pvt_thread_mutex_lock( &ss->s_mutex );                                  ss->s_inuse++;
                                 ++ss->s_inuse;  
                                 ldap_pvt_thread_mutex_unlock( &ss->s_mutex );  
                                 opc->smatches = sm;                                  opc->smatches = sm;
                         } else {                          } else {
                                 /* if found send UPDATE else send ADD */                                  /* if found send UPDATE else send ADD */
                                   ss->s_inuse++;
                                 syncprov_qresp( opc, ss,                                  syncprov_qresp( opc, ss,
                                         found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );                                          found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
                                   ss->s_inuse--;
                         }                          }
                 } else if ( !saveit && found ) {                  } else if ( !saveit && found ) {
                         /* send DELETE */                          /* send DELETE */
Line 1815  syncprov_search_response( Operation *op, Line 1806  syncprov_search_response( Operation *op,
                         syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?                          syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
                                 LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,                                  LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
                                 &cookie, 1, NULL, 0 );                                  &cookie, 1, NULL, 0 );
                           /* Flush any queued persist messages */
                           if ( ss->ss_so->s_res ) {
                                   syncprov_qplay( op, on, ss->ss_so );
                           }
   
                         /* Detach this Op from frontend control */                          /* Detach this Op from frontend control */
                         ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );                          ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );

Removed from v.1.56.2.16  
changed lines
  Added in v.1.110


______________
© Copyright 1998-2020, OpenLDAP Foundation, info@OpenLDAP.org