Diff for /servers/slapd/overlays/syncprov.c between versions 1.26 and 1.27

version 1.26, 2004/11/27 09:49:47 version 1.27, 2004/11/27 13:52:28
Line 94  typedef struct syncmatches { Line 94  typedef struct syncmatches {
 typedef struct syncprov_info_t {  typedef struct syncprov_info_t {
         syncops         *si_ops;          syncops         *si_ops;
         struct berval   si_ctxcsn;      /* ldapsync context */          struct berval   si_ctxcsn;      /* ldapsync context */
         int             si_gotcsn;      /* is our ctxcsn up to date? */          int             si_chkops;      /* checkpointing info */
           int             si_chktime;
           int             si_numops;      /* number of ops since last checkpoint */
           time_t  si_chklast;     /* time of last checkpoint */
         Avlnode *si_mods;       /* entries being modified */          Avlnode *si_mods;       /* entries being modified */
         ldap_pvt_thread_mutex_t si_csn_mutex;          ldap_pvt_thread_mutex_t si_csn_mutex;
         ldap_pvt_thread_mutex_t si_ops_mutex;          ldap_pvt_thread_mutex_t si_ops_mutex;
Line 471  syncprov_findbase( Operation *op, fbase_ Line 474  syncprov_findbase( Operation *op, fbase_
 }  }
   
 /* syncprov_findcsn:  /* syncprov_findcsn:
  *   This function has three different purposes, but they all use a search   *   This function has two different purposes, but they both use a search
  * that filters on entryCSN so they're combined here.   * that filters on entryCSN so they're combined here.
  * 1: when the current contextCSN is unknown (i.e., at server start time)   * 1: when the current contextCSN is known and we have a sync cookie, we search
  * and a syncrepl search has arrived with a cookie, we search for all entries  
  * with CSN >= the cookie CSN, and store the maximum as our contextCSN. Also,  
  * we expect to find the cookie CSN in the search results, and note if we did  
  * or not. If not, we assume the cookie is stale. (This may be too restrictive,  
  * notice case 2.)  
  *  
  * 2: when the current contextCSN is known and we have a sync cookie, we search  
  * for one entry with CSN <= the cookie CSN. (Used to search for =.) If an   * for one entry with CSN <= the cookie CSN. (Used to search for =.) If an
  * entry is found, the cookie CSN is valid, otherwise it is stale. Case 1 is   * entry is found, the cookie CSN is valid, otherwise it is stale.
  * considered a special case of case 2, and both are generally called the  
  * "find CSN" task.  
  *   *
  * 3: during a refresh phase, we search for all entries with CSN <= the cookie   * 2: during a refresh phase, we search for all entries with CSN <= the cookie
  * CSN, and generate Present records for them. We always collect this result   * CSN, and generate Present records for them. We always collect this result
  * in SyncID sets, even if there's only one match.   * in SyncID sets, even if there's only one match.
  */   */
 #define FIND_CSN        1  #define FIND_CSN        1
 #define FIND_PRESENT    2  #define FIND_PRESENT    2
   
 typedef struct fcsn_cookie {  
         struct berval maxcsn;  
         int gotmatch;  
 } fcsn_cookie;  
   
 static int  static int
 findcsn_cb( Operation *op, SlapReply *rs )  findcsn_cb( Operation *op, SlapReply *rs )
 {  {
         slap_callback *sc = op->o_callback;          slap_callback *sc = op->o_callback;
   
         if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {          if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
                 /* If the private pointer is set, it points to an fcsn_cookie                  sc->sc_private = (void *)1;
                  * and we want to record the maxcsn and match state.  
                  */  
                 if ( sc->sc_private ) {  
                         int i;  
                         fcsn_cookie *fc = sc->sc_private;  
                         sync_control *srs = op->o_controls[sync_cid];  
                         Attribute *a = attr_find(rs->sr_entry->e_attrs,  
                                 slap_schema.si_ad_entryCSN );  
                         i = ber_bvcmp( &a->a_vals[0], srs->sr_state.ctxcsn );  
                         if ( i == 0 ) fc->gotmatch = 1;  
                         i = ber_bvcmp( &a->a_vals[0], &fc->maxcsn );  
                         if ( i > 0 ) {  
                                 fc->maxcsn.bv_len = a->a_vals[0].bv_len;  
                                 strcpy(fc->maxcsn.bv_val, a->a_vals[0].bv_val );  
                         }  
                 } else {  
                 /* Otherwise, if the private pointer is not set, we just  
                  * want to know if any entry matched the filter.  
                  */  
                         sc->sc_private = (void *)1;  
                 }  
         }          }
         return LDAP_SUCCESS;          return LDAP_SUCCESS;
 }  }
Line 588  syncprov_findcsn( Operation *op, int mod Line 556  syncprov_findcsn( Operation *op, int mod
         Filter cf;          Filter cf;
         AttributeAssertion eq;          AttributeAssertion eq;
         int rc;          int rc;
         fcsn_cookie fcookie;  
         fpres_cookie pcookie;          fpres_cookie pcookie;
         int locked = 0;          int locked = 0;
         sync_control *srs = op->o_controls[sync_cid];          sync_control *srs = op->o_controls[sync_cid];
Line 602  syncprov_findcsn( Operation *op, int mod Line 569  syncprov_findcsn( Operation *op, int mod
   
         fbuf.bv_val = buf;          fbuf.bv_val = buf;
         if ( mode == FIND_CSN ) {          if ( mode == FIND_CSN ) {
                 if ( !si->si_gotcsn ) {                  fop.ors_attrsonly = 1;
                         /* If we don't know the current ctxcsn, find it */                  fop.ors_attrs = slap_anlist_no_attrs;
                         ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );                  fop.ors_slimit = 1;
                         locked = 1;                  cb.sc_private = NULL;
                 }  
                 if ( !si->si_gotcsn ) {  
                         cf.f_choice = LDAP_FILTER_GE;  
                         fop.ors_attrsonly = 0;  
                         fop.ors_attrs = csn_anlist;  
                         fop.ors_slimit = SLAP_NO_LIMIT;  
                         cb.sc_private = &fcookie;  
                         fcookie.maxcsn.bv_val = cbuf;  
                         fcookie.maxcsn.bv_len = 0;  
                         fcookie.gotmatch = 0;  
                         fbuf.bv_len = sprintf( buf, "(entryCSN>=%s)", srs->sr_state.ctxcsn->bv_val );  
                 } else {  
                         if ( locked ) {  
                                 ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );  
                                 locked = 0;  
                         }  
                         cf.f_choice = LDAP_FILTER_LE;  
                         fop.ors_attrsonly = 1;  
                         fop.ors_attrs = slap_anlist_no_attrs;  
                         fop.ors_slimit = 1;  
                         cb.sc_private = NULL;  
                         fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );  
                 }  
                 cb.sc_response = findcsn_cb;                  cb.sc_response = findcsn_cb;
   
         } else if ( mode == FIND_PRESENT ) {          } else if ( mode == FIND_PRESENT ) {
                 cf.f_choice = LDAP_FILTER_LE;  
                 fop.ors_attrsonly = 0;                  fop.ors_attrsonly = 0;
                 fop.ors_attrs = uuid_anlist;                  fop.ors_attrs = uuid_anlist;
                 fop.ors_slimit = SLAP_NO_LIMIT;                  fop.ors_slimit = SLAP_NO_LIMIT;
Line 642  syncprov_findcsn( Operation *op, int mod Line 585  syncprov_findcsn( Operation *op, int mod
                 cb.sc_response = findpres_cb;                  cb.sc_response = findpres_cb;
                 pcookie.num = 0;                  pcookie.num = 0;
                 pcookie.uuids = NULL;                  pcookie.uuids = NULL;
                 fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );  
         }          }
           cf.f_choice = LDAP_FILTER_LE;
         cf.f_ava = &eq;          cf.f_ava = &eq;
         cf.f_av_desc = slap_schema.si_ad_entryCSN;          cf.f_av_desc = slap_schema.si_ad_entryCSN;
         cf.f_av_value = *srs->sr_state.ctxcsn;          cf.f_av_value = *srs->sr_state.ctxcsn;
         cf.f_next = NULL;          cf.f_next = NULL;
           fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)",
                   srs->sr_state.ctxcsn->bv_val );
   
         fop.o_callback = &cb;          fop.o_callback = &cb;
         fop.ors_tlimit = SLAP_NO_LIMIT;          fop.ors_tlimit = SLAP_NO_LIMIT;
Line 659  syncprov_findcsn( Operation *op, int mod Line 604  syncprov_findcsn( Operation *op, int mod
         fop.o_bd->bd_info = (BackendInfo *)on;          fop.o_bd->bd_info = (BackendInfo *)on;
   
         if ( mode == FIND_CSN ) {          if ( mode == FIND_CSN ) {
                 if ( !si->si_gotcsn ) {                  if ( cb.sc_private ) return LDAP_SUCCESS;
                         strcpy(si->si_ctxcsnbuf, fcookie.maxcsn.bv_val);  
                         si->si_ctxcsn.bv_len = fcookie.maxcsn.bv_len;  
                         si->si_gotcsn = 1;  
                         ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );  
                         if ( fcookie.gotmatch ) return LDAP_SUCCESS;  
                           
                 } else {  
                         if ( cb.sc_private ) return LDAP_SUCCESS;  
                 }  
         } else if ( mode == FIND_PRESENT ) {          } else if ( mode == FIND_PRESENT ) {
                 return LDAP_SUCCESS;                  return LDAP_SUCCESS;
         }          }
Line 994  syncprov_op_cleanup( Operation *op, Slap Line 930  syncprov_op_cleanup( Operation *op, Slap
         op->o_tmpfree(cb, op->o_tmpmemctx);          op->o_tmpfree(cb, op->o_tmpmemctx);
 }  }
   
   static void
   syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
   {
           syncprov_info_t         *si = on->on_bi.bi_private;
           Modifications mod;
           Operation opm;
           struct berval bv[2];
           BackendInfo *orig;
           slap_callback cb = {0};
   
           mod.sml_values = bv;
           bv[1].bv_val = NULL;
           bv[0] = si->si_ctxcsn;
           mod.sml_nvalues = NULL;
           mod.sml_desc = slap_schema.si_ad_contextCSN;
           mod.sml_op = LDAP_MOD_REPLACE;
           mod.sml_next = NULL;
   
           cb.sc_response = slap_null_cb;
           opm = *op;
           opm.o_tag = LDAP_REQ_MODIFY;
           opm.o_callback = &cb;
           opm.orm_modlist = &mod;
           opm.o_req_dn = op->o_bd->be_suffix[0];
           opm.o_req_ndn = op->o_bd->be_nsuffix[0];
           orig = opm.o_bd->bd_info;
           opm.o_bd->bd_info = on->on_info->oi_orig;
           opm.o_bd->be_modify( &opm, rs );
   }
   
 static int  static int
 syncprov_op_response( Operation *op, SlapReply *rs )  syncprov_op_response( Operation *op, SlapReply *rs )
 {  {
Line 1017  syncprov_op_response( Operation *op, Sla Line 983  syncprov_op_response( Operation *op, Sla
                                 strcpy( si->si_ctxcsnbuf, cbuf );                                  strcpy( si->si_ctxcsnbuf, cbuf );
                                 si->si_ctxcsn.bv_len = maxcsn.bv_len;                                  si->si_ctxcsn.bv_len = maxcsn.bv_len;
                         }                          }
                         si->si_gotcsn = 1;                  }
   
                   si->si_numops++;
                   if ( si->si_chkops || si->si_chktime ) {
                           int do_check=0;
                           if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
                                   do_check = 1;
                                   si->si_numops = 0;
                           }
                           if ( si->si_chktime && 
                                   (op->o_time - si->si_chklast >= si->si_chktime )) {
                                   do_check = 1;
                                   si->si_chklast = op->o_time;
                           }
                           if ( do_check ) {
                                   syncprov_checkpoint( op, rs, on );
                           }
                 }                  }
                 ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );                  ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
   
Line 1467  syncprov_op_search( Operation *op, SlapR Line 1449  syncprov_op_search( Operation *op, SlapR
                 }                  }
         }          }
   
         /* If we didn't get a cookie and we don't know our contextcsn, try to  
          * find it anyway.  
          */  
         if ( !gotstate && !si->si_gotcsn ) {  
                 struct berval bv = BER_BVC("1"), *old;  
                   
                 old = srs->sr_state.ctxcsn;  
                 srs->sr_state.ctxcsn = &bv;  
                 syncprov_findcsn( op, FIND_CSN );  
                 srs->sr_state.ctxcsn = old;  
         }  
   
         /* Append CSN range to search filter, save original filter          /* Append CSN range to search filter, save original filter
          * for persistent search evaluation           * for persistent search evaluation
          */           */
Line 1547  syncprov_operational( Line 1517  syncprov_operational(
         if ( rs->sr_entry &&          if ( rs->sr_entry &&
                 dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) {                  dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) {
   
                 Attribute **ap;  
   
                 for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ) ;  
   
                 if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||                  if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
                         ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {                          ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
                           Attribute *a, **ap = NULL;
   
                                                   
                         Attribute *a = ch_malloc( sizeof(Attribute));                          for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
                         a->a_desc = slap_schema.si_ad_contextCSN;                                  if ( a->a_desc == slap_schema.si_ad_contextCSN )
                         a->a_vals = ch_malloc( 2 * sizeof(struct berval));                                          break;
   
 #if 0   /* causes a deadlock */  
                         if ( !si->si_gotcsn ) {  
                                 sync_control sc, *old;  
                                 void *ctrls[SLAP_MAX_CIDS];  
                                 struct berval bv = BER_BVC("1");  
                   
                                 if ( !op->o_controls ) {  
                                         memset(ctrls, 0, sizeof(ctrls));  
                                         op->o_controls = ctrls;  
                                 } else {  
                                         old = op->o_controls[sync_cid];  
                                 }  
                                 op->o_controls[sync_cid] = &sc;  
                                 sc.sr_state.ctxcsn = &bv;  
                                 syncprov_findcsn( op, FIND_CSN );  
                                 if ( op->o_controls == ctrls ) {  
                                         op->o_controls = NULL;  
                                 } else {  
                                         op->o_controls[sync_cid] = old;  
                                 }  
                         }                          }
 #endif  
                         ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );  
                         ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );  
                         ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );  
   
                         a->a_vals[1].bv_val = NULL;                          if ( !a ) {
                         a->a_nvals = a->a_vals;                                  for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next );
                         a->a_next = NULL;  
                         a->a_flags = 0;                                  a = ch_malloc( sizeof(Attribute));
                                   a->a_desc = slap_schema.si_ad_contextCSN;
                                   a->a_vals = ch_malloc( 2 * sizeof(struct berval));
                                   a->a_vals[1].bv_val = NULL;
                                   a->a_nvals = a->a_vals;
                                   a->a_next = NULL;
                                   a->a_flags = 0;
                                   *ap = a;
                           }
   
                         *ap = a;                          ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
                           if ( !ap ) {
                                   strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf );
                           } else {
                                   ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
                           }
                           ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
                 }                  }
         }          }
         return LDAP_SUCCESS;          return LDAP_SUCCESS;
Line 1625  syncprov_db_config( Line 1582  syncprov_db_config(
         return SLAP_CONF_UNKNOWN;          return SLAP_CONF_UNKNOWN;
 }  }
   
   /* Cheating - we have no thread pool context for these functions,
    * so make one.
    */
   typedef struct thread_keys {
           void *key;
           void *data;
           ldap_pvt_thread_pool_keyfree_t *free;
   } thread_keys;
   
   /* A fake thread context */
   static thread_keys thrctx[8];
   
   /* Read any existing contextCSN from the underlying db.
    * Then search for any entries newer than that. If no value exists,
    * just generate it. Cache whatever result.
    */
   static int
   syncprov_db_open(
       BackendDB *be
   )
   {
       slap_overinst   *on = (slap_overinst *) be->bd_info;
       syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
   
           char opbuf[OPERATION_BUFFER_SIZE];
           Operation *op = (Operation *)opbuf;
           Entry *e;
           Attribute *a;
           int rc;
   
           memset(opbuf, 0, sizeof(opbuf));
           op->o_hdr = (Opheader *)(op+1);
           op->o_bd = be;
           op->o_dn = be->be_rootdn;
           op->o_ndn = be->be_rootndn;
           op->o_threadctx = thrctx;
           op->o_tmpmfuncs = &ch_mfuncs;
   
           op->o_bd->bd_info = on->on_info->oi_orig;
           rc = be_entry_get_rw( op, be->be_nsuffix, NULL,
                   slap_schema.si_ad_contextCSN, 0, &e );
   
           if ( e ) {
                   a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
                   if ( a ) {
                           si->si_ctxcsn.bv_len = a->a_nvals[0].bv_len;
                           if ( si->si_ctxcsn.bv_len >= sizeof(si->si_ctxcsnbuf ))
                                   si->si_ctxcsn.bv_len = sizeof(si->si_ctxcsnbuf)-1;
                           strncpy( si->si_ctxcsnbuf, a->a_nvals[0].bv_val,
                                   si->si_ctxcsn.bv_len );
                           si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0';
                   }
                   be_entry_release_r( op, e );
           }
           op->o_bd->bd_info = (BackendInfo *)on;
       return 0;
   }
   
   /* Write the current contextCSN into the underlying db.
    */
   static int
   syncprov_db_close(
       BackendDB *be
   )
   {
       slap_overinst   *on = (slap_overinst *) be->bd_info;
       syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
           int i;
   
           if ( si->si_numops ) {
                   Connection conn;
                   char opbuf[OPERATION_BUFFER_SIZE];
                   Operation *op = (Operation *)opbuf;
                   SlapReply rs = {REP_RESULT};
   
                   connection_fake_init( &conn, op, thrctx );
                   op->o_bd = be;
                   op->o_dn = be->be_rootdn;
                   op->o_ndn = be->be_rootndn;
                   syncprov_checkpoint( op, &rs, on );
           }
           for ( i=0; thrctx[i].key; i++) {
                   if ( thrctx[i].free )
                           thrctx[i].free( thrctx[i].key, thrctx[i].data );
           }
   
       return 0;
   }
   
 static int  static int
 syncprov_db_init(  syncprov_db_init(
         BackendDB *be          BackendDB *be
Line 1637  syncprov_db_init( Line 1683  syncprov_db_init(
         on->on_bi.bi_private = si;          on->on_bi.bi_private = si;
         ldap_pvt_thread_mutex_init( &si->si_csn_mutex );          ldap_pvt_thread_mutex_init( &si->si_csn_mutex );
         ldap_pvt_thread_mutex_init( &si->si_ops_mutex );          ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
           ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
         si->si_ctxcsn.bv_val = si->si_ctxcsnbuf;          si->si_ctxcsn.bv_val = si->si_ctxcsnbuf;
   
         csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;          csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
Line 1657  syncprov_db_destroy( Line 1704  syncprov_db_destroy(
         syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;          syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
   
         if ( si ) {          if ( si ) {
                   ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
                 ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );                  ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
                 ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );                  ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );
                 ch_free( si );                  ch_free( si );
Line 1790  syncprov_init() Line 1838  syncprov_init()
         syncprov.on_bi.bi_db_init = syncprov_db_init;          syncprov.on_bi.bi_db_init = syncprov_db_init;
         syncprov.on_bi.bi_db_config = syncprov_db_config;          syncprov.on_bi.bi_db_config = syncprov_db_config;
         syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;          syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
           syncprov.on_bi.bi_db_open = syncprov_db_open;
           syncprov.on_bi.bi_db_close = syncprov_db_close;
   
         syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;          syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
         syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;          syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;

Removed from v.1.26  
changed lines
  Added in v.1.27


______________
© Copyright 1998-2020, OpenLDAP Foundation, info@OpenLDAP.org