--- servers/slapd/overlays/syncprov.c 2009/03/13 02:44:51 1.272 +++ servers/slapd/overlays/syncprov.c 2009/11/14 20:53:06 1.296 @@ -1,4 +1,4 @@ -/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.271 2009/03/13 00:07:45 hyc Exp $ */ +/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.295 2009/11/14 08:35:23 hyc Exp $ */ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * @@ -124,6 +124,7 @@ typedef struct sessionlog { typedef struct syncprov_info_t { syncops *si_ops; BerVarray si_ctxcsn; /* ldapsync context */ + struct berval si_contextdn; int *si_sids; int si_numcsns; int si_chkops; /* checkpointing info */ @@ -137,16 +138,19 @@ typedef struct syncprov_info_t { ldap_pvt_thread_rdwr_t si_csn_rwlock; ldap_pvt_thread_mutex_t si_ops_mutex; ldap_pvt_thread_mutex_t si_mods_mutex; + ldap_pvt_thread_mutex_t si_resp_mutex; } syncprov_info_t; typedef struct opcookie { slap_overinst *son; syncmatches *smatches; + modtarget *smt; struct berval sdn; /* DN of entry, for deletes */ struct berval sndn; struct berval suuid; /* UUID of entry */ struct berval sctxcsn; - short ssid; /* sid of op csn */ + short osid; /* sid of op csn */ + short rsid; /* sid of relay */ short sreference; /* Is the entry a reference? */ } opcookie; @@ -580,8 +584,8 @@ syncprov_findcsn( Operation *op, find_cs slap_callback cb = {0}; Operation fop; SlapReply frs = { REP_RESULT }; - char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; - char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; + char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; + char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; struct berval maxcsn; Filter cf; AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; @@ -773,7 +777,7 @@ syncprov_sendresp( Operation *op, opcook SlapReply rs = { REP_SEARCH }; LDAPControl *ctrls[2]; - struct berval cookie, csns[2]; + struct berval cookie = BER_BVNULL, csns[2]; Entry e_uuid = {0}; Attribute a_uuid = {0}; @@ -781,18 +785,22 @@ syncprov_sendresp( Operation *op, opcook return SLAPD_ABANDON; ctrls[1] = NULL; - csns[0] = opc->sctxcsn; - BER_BVZERO( &csns[1] ); - slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 ); + if ( !BER_BVISNULL( &opc->sctxcsn )) { + csns[0] = opc->sctxcsn; + BER_BVZERO( &csns[1] ); + slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 ); + } #ifdef LDAP_DEBUG - if ( so->s_sid > 0 ) { - Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: to=%03x, cookie=%s\n", - so->s_sid, cookie.bv_val, 0 ); - } else { - Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: cookie=%s\n", - cookie.bv_val, 0, 0 ); - } + if ( !BER_BVISNULL( &cookie )) { + if ( so->s_sid > 0 ) { + Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: to=%03x, cookie=%s\n", + so->s_sid, cookie.bv_val , 0 ); + } else { + Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: cookie=%s\n", + cookie.bv_val, 0, 0 ); + } + } #endif e_uuid.e_attrs = &a_uuid; @@ -800,7 +808,9 @@ syncprov_sendresp( Operation *op, opcook a_uuid.a_nvals = &opc->suuid; rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, mode, ctrls, 0, 1, &cookie ); - op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); + if ( !BER_BVISNULL( &cookie )) { + op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); + } rs.sr_ctrls = ctrls; op->o_bd->bd_info = (BackendInfo *)on->on_info; @@ -845,7 +855,14 @@ syncprov_sendresp( Operation *op, opcook } /* In case someone else freed it already? */ if ( rs.sr_ctrls ) { - op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx ); + int i; + for ( i=0; rs.sr_ctrls[i]; i++ ) { + if ( rs.sr_ctrls[i] == ctrls[0] ) { + op->o_tmpfree( ctrls[0]->ldctl_value.bv_val, op->o_tmpmemctx ); + ctrls[0]->ldctl_value.bv_val = NULL; + break; + } + } rs.sr_ctrls = NULL; } @@ -1117,7 +1134,6 @@ syncprov_matchops( Operation *op, opcook fbase_cookie fc; syncops *ss, *sprev, *snext; - struct sync_cookie *scook; Entry *e = NULL; Attribute *a; int rc; @@ -1169,7 +1185,6 @@ syncprov_matchops( Operation *op, opcook ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); } - scook = op->o_controls ? op->o_controls[slap_cids.sc_LDAPsync] : NULL; ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss; sprev = ss, ss=snext) @@ -1183,18 +1198,22 @@ syncprov_matchops( Operation *op, opcook if ( ss->s_op->o_abandon ) continue; - /* Don't send ops back to the originator */ - if ( opc->ssid > 0 && opc->ssid == ss->s_sid ) { - Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping original sid %03x\n", - opc->ssid, 0, 0 ); - continue; - } + /* First time thru, check for possible skips */ + if ( saveit || op->o_tag == LDAP_REQ_ADD ) { - /* Don't send ops back to the messenger */ - if ( scook && scook->sid > 0 && scook->sid == ss->s_sid ) { - Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping relayed sid %03x\n", - scook->sid, 0, 0 ); - continue; + /* Don't send ops back to the originator */ + if ( opc->osid > 0 && opc->osid == ss->s_sid ) { + Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping original sid %03x\n", + opc->osid, 0, 0 ); + continue; + } + + /* Don't send ops back to the messenger */ + if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) { + Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping relayed sid %03x\n", + opc->rsid, 0, 0 ); + continue; + } } /* validate base */ @@ -1242,13 +1261,18 @@ syncprov_matchops( Operation *op, opcook oh = *op->o_hdr; oh.oh_conn = ss->s_op->o_conn; oh.oh_connid = ss->s_op->o_connid; + op2.o_bd = op->o_bd->bd_self; op2.o_hdr = &oh; op2.o_extra = op->o_extra; + op2.o_callback = NULL; + rc = test_filter( &op2, e, ss->s_op->ors_filter ); } + Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n", + ss->s_sid, fc.fscope, rc ); + /* check if current o_req_dn is in scope and matches filter */ - if ( fc.fscope && test_filter( &op2, e, ss->s_op->ors_filter ) == - LDAP_COMPARE_TRUE ) { + if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) { if ( saveit ) { sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx ); sm->sm_next = opc->smatches; @@ -1307,26 +1331,23 @@ syncprov_op_cleanup( Operation *op, Slap } /* Remove op from lock table */ - mtdummy.mt_op = op; - ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); - mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); + mt = opc->smt; if ( mt ) { - modinst *mi = mt->mt_mods; - - /* If there are more, promote the next one */ ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); - if ( mi->mi_next ) { - mt->mt_mods = mi->mi_next; + mt->mt_mods = mt->mt_mods->mi_next; + /* If there are more, promote the next one */ + if ( mt->mt_mods ) { mt->mt_op = mt->mt_mods->mi_op; ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); } else { - avl_delete( &si->si_mods, mt, sp_avl_cmp ); ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); + ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); + avl_delete( &si->si_mods, mt, sp_avl_cmp ); + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); ch_free( mt ); } } - ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); if ( !BER_BVISNULL( &opc->suuid )) op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx ); if ( !BER_BVISNULL( &opc->sndn )) @@ -1348,6 +1369,7 @@ syncprov_checkpoint( Operation *op, Slap SlapReply rsm = { 0 }; slap_callback cb = {0}; BackendDB be; + #ifdef CHECK_CSN Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; @@ -1374,12 +1396,26 @@ syncprov_checkpoint( Operation *op, Slap be = *on->on_info->oi_origdb; opm.o_bd = &be; } - opm.o_req_dn = opm.o_bd->be_suffix[0]; - opm.o_req_ndn = opm.o_bd->be_nsuffix[0]; + opm.o_req_dn = si->si_contextdn; + opm.o_req_ndn = si->si_contextdn; opm.o_bd->bd_info = on->on_info->oi_orig; opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; opm.o_no_schema_check = 1; opm.o_bd->be_modify( &opm, &rsm ); + + if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && + SLAP_SYNC_SUBENTRY( opm.o_bd )) { + const char *text; + char txtbuf[SLAP_TEXT_BUFLEN]; + size_t textlen = sizeof txtbuf; + Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL ); + slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen); + opm.ora_e = e; + opm.o_bd->be_add( &opm, &rsm ); + if ( e == opm.ora_e ) + be_entry_release_w( &opm, opm.ora_e ); + } + if ( mod.sml_next != NULL ) { slap_mods_free( mod.sml_next, 1 ); } @@ -1455,7 +1491,7 @@ syncprov_playlog( Operation *op, SlapRep slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; slog_entry *se; int i, j, ndel, num, nmods, mmods; - char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; + char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; BerVarray uuids; struct berval delcsn[2]; @@ -1630,17 +1666,18 @@ syncprov_op_response( Operation *op, Sla if ( rs->sr_err == LDAP_SUCCESS ) { struct berval maxcsn; - char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; + char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; int do_check = 0, have_psearches, foundit, csn_changed = 0; + ldap_pvt_thread_mutex_lock( &si->si_resp_mutex ); + /* Update our context CSN */ cbuf[0] = '\0'; maxcsn.bv_val = cbuf; maxcsn.bv_len = sizeof(cbuf); ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock ); -#if 0 - if ( op->o_dont_replicate && + if ( op->o_dont_replicate && op->o_tag == LDAP_REQ_MODIFY && op->orm_modlist->sml_op == LDAP_MOD_REPLACE && op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { /* Catch contextCSN updates from syncrepl. We have to look at @@ -1687,9 +1724,8 @@ syncprov_op_response( Operation *op, Sla } } } - return SLAP_CB_CONTINUE; + goto leave; } -#endif slap_get_commit_csn( op, &maxcsn, &foundit ); if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) { @@ -1730,31 +1766,39 @@ syncprov_op_response( Operation *op, Sla sizeof(int)); si->si_sids[i] = sid; } +#if 0 } else if ( !foundit ) { /* internal ops that aren't meant to be replicated */ ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); return SLAP_CB_CONTINUE; +#endif } /* Don't do any processing for consumer contextCSN updates */ if ( op->o_dont_replicate ) { ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); - return SLAP_CB_CONTINUE; + goto leave; } si->si_numops++; if ( si->si_chkops || si->si_chktime ) { - if ( si->si_chkops && si->si_numops >= si->si_chkops ) { - do_check = 1; - si->si_numops = 0; - } - if ( si->si_chktime && - (op->o_time - si->si_chklast >= si->si_chktime )) { - if ( si->si_chklast ) { + /* Never checkpoint adding the context entry, + * it will deadlock + */ + if ( op->o_tag != LDAP_REQ_ADD || + !dn_match( &op->o_req_ndn, &si->si_contextdn )) { + if ( si->si_chkops && si->si_numops >= si->si_chkops ) { do_check = 1; - si->si_chklast = op->o_time; - } else { - si->si_chklast = 1; + si->si_numops = 0; + } + if ( si->si_chktime && + (op->o_time - si->si_chklast >= si->si_chktime )) { + if ( si->si_chklast ) { + do_check = 1; + si->si_chklast = op->o_time; + } else { + si->si_chklast = 1; + } } } } @@ -1800,7 +1844,7 @@ syncprov_op_response( Operation *op, Sla if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) { syncprov_add_slog( op ); } - +leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex ); } return SLAP_CB_CONTINUE; } @@ -1816,14 +1860,14 @@ syncprov_op_compare( Operation *op, Slap syncprov_info_t *si = on->on_bi.bi_private; int rc = SLAP_CB_CONTINUE; - if ( dn_match( &op->o_req_ndn, op->o_bd->be_nsuffix ) && + if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) && op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN ) { Entry e = {0}; Attribute a = {0}; - e.e_name = op->o_bd->be_suffix[0]; - e.e_nname = op->o_bd->be_nsuffix[0]; + e.e_name = si->si_contextdn; + e.e_nname = si->si_contextdn; e.e_attrs = &a; a.a_desc = slap_schema.si_ad_contextCSN; @@ -1899,10 +1943,16 @@ syncprov_op_mod( Operation *op, SlapRepl cb->sc_next = op->o_callback; op->o_callback = cb; + opc->osid = -1; + opc->rsid = -1; if ( op->o_csn.bv_val ) { - opc->ssid = slap_parse_csn_sid( &op->o_csn ); - } else { - opc->ssid = -1; + opc->osid = slap_parse_csn_sid( &op->o_csn ); + } + if ( op->o_controls ) { + struct sync_cookie *scook = + op->o_controls[slap_cids.sc_LDAPsync]; + if ( scook ) + opc->rsid = scook->sid; } /* If there are active persistent searches, lock this operation. @@ -1921,6 +1971,15 @@ syncprov_op_mod( Operation *op, SlapRepl mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); if ( mt ) { ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); + if ( mt->mt_mods == NULL ) { + /* Cannot reuse this mt, as another thread is about + * to release it in syncprov_op_cleanup. + */ + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); + mt = NULL; + } + } + if ( mt ) { ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); mt->mt_tail->mi_next = mi; mt->mt_tail = mi; @@ -1932,6 +1991,9 @@ syncprov_op_mod( Operation *op, SlapRepl * Currently it's not an issue because there are * no dynamic config deletes... */ + if ( slapd_shutdown ) + return SLAPD_ABANDON; + if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool )) ldap_pvt_thread_yield(); ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); @@ -1959,6 +2021,7 @@ syncprov_op_mod( Operation *op, SlapRepl avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error ); ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); } + opc->smt = mt; } if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) @@ -2182,9 +2245,10 @@ syncprov_search_response( Operation *op, LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL ); } } else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) { - struct berval cookie; + struct berval cookie = BER_BVNULL; - if ( ss->ss_flags & SS_CHANGED ) { + if ( ( ss->ss_flags & SS_CHANGED ) && + ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) { slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn, srs->sr_state.rid, slap_serverID ? slap_serverID : -1 ); @@ -2208,7 +2272,7 @@ syncprov_search_response( Operation *op, LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 1, NULL, 0 ); - if ( ss->ss_flags & SS_CHANGED ) + if ( !BER_BVISNULL( &cookie )) op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); /* Detach this Op from frontend control */ @@ -2362,13 +2426,32 @@ syncprov_op_search( Operation *op, SlapR /* If nothing has changed, shortcut it */ if ( srs->sr_state.numcsns == numcsns ) { - int i, j; + int i, j, newer; for ( i=0; isr_state.numcsns; i++ ) { for ( j=0; jsr_state.sids[i] != sids[j] ) continue; - if ( !bvmatch( &srs->sr_state.ctxcsn[i], &ctxcsn[j] )) + newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ); + /* If our state is newer, tell consumer about changes */ + if ( newer < 0 ) changed = SS_CHANGED; + else if ( newer > 0 ) { + /* our state is older, tell consumer nothing */ + if ( sop ) { + syncops **sp = &si->si_ops; + + ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); + while ( *sp != sop ) + sp = &(*sp)->s_next; + *sp = sop->s_next; + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + ch_free( sop ); + } + rs->sr_err = LDAP_SUCCESS; + rs->sr_ctrls = NULL; + send_ldap_result( op, rs ); + return rs->sr_err; + } break; } if ( changed ) @@ -2523,7 +2606,7 @@ syncprov_operational( return SLAP_CB_CONTINUE; if ( rs->sr_entry && - dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) { + dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) { if ( SLAP_OPATTRS( rs->sr_attr_flags ) || ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) { @@ -2630,8 +2713,11 @@ sp_cf_gen(ConfigArgs *c) case SP_CHKPT: if ( si->si_chkops || si->si_chktime ) { struct berval bv; + /* we assume si_chktime is a multiple of 60 + * because the parsed value was originally + * multiplied by 60 */ bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ), - "%d %d", si->si_chkops, si->si_chktime ); + "%d %d", si->si_chkops, si->si_chktime/60 ); if ( bv.bv_len >= sizeof( c->cr_msg ) ) { rc = 1; } else { @@ -2737,7 +2823,7 @@ sp_cf_gen(ConfigArgs *c) } sl = si->si_logs; if ( !sl ) { - sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE ); + sl = ch_malloc( sizeof( sessionlog ) + LDAP_PVT_CSNSTR_BUFSIZE ); sl->sl_mincsn.bv_val = (char *)(sl+1); sl->sl_mincsn.bv_len = 0; sl->sl_num = 0; @@ -2813,7 +2899,13 @@ syncprov_db_open( op->o_dn = be->be_rootdn; op->o_ndn = be->be_rootndn; - rc = overlay_entry_get_ov( op, be->be_nsuffix, NULL, + if ( SLAP_SYNC_SUBENTRY( be )) { + build_new_dn( &si->si_contextdn, be->be_nsuffix, + (struct berval *)&slap_ldapsync_cn_bv, NULL ); + } else { + si->si_contextdn = be->be_nsuffix[0]; + } + rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL, slap_schema.si_ad_contextCSN, 0, &e, on ); if ( e ) { @@ -2837,7 +2929,7 @@ syncprov_db_open( /* Didn't find a contextCSN, should we generate one? */ if ( !si->si_ctxcsn ) { - char csnbuf[ LDAP_LUTIL_CSNSTR_BUFSIZE ]; + char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; struct berval csn; if ( SLAP_SYNC_SHADOW( op->o_bd )) { @@ -2918,6 +3010,7 @@ syncprov_db_init( ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock ); ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); + ldap_pvt_thread_mutex_init( &si->si_resp_mutex ); csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN; csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname; @@ -2955,6 +3048,7 @@ syncprov_db_destroy( ber_bvarray_free( si->si_ctxcsn ); if ( si->si_sids ) ch_free( si->si_sids ); + ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex ); ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock );