--- servers/slapd/overlays/syncprov.c 2004/12/10 02:18:33 1.56 +++ servers/slapd/overlays/syncprov.c 2005/05/10 16:07:04 1.56.2.7 @@ -1,8 +1,8 @@ -/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.55 2004/12/10 01:51:34 hyc Exp $ */ +/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.56.2.6 2005/04/29 21:29:10 kurt Exp $ */ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * - * Copyright 2004 The OpenLDAP Foundation. + * Copyright 2004-2005 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -25,6 +25,7 @@ #include #include "lutil.h" #include "slap.h" +#include "config.h" /* A modify request on a particular entry */ typedef struct modinst { @@ -56,7 +57,6 @@ typedef struct syncops { struct berval s_base; /* ndn of search base */ ID s_eid; /* entryID of search base */ Operation *s_op; /* search op */ - int s_sid; int s_rid; struct berval s_filterstr; int s_flags; /* search status */ @@ -84,6 +84,7 @@ typedef struct sync_control { #define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<o_tmpmemctx ); - - ctrls[num_ctrls] = ch_malloc ( sizeof ( LDAPControl ) ); - - entryuuid_bv = slog_e->sl_uuid; - - if ( send_cookie && cookie ) { - ber_printf( ber, "{eOON}", - entry_sync_state, &entryuuid_bv, cookie ); - } else { - ber_printf( ber, "{eON}", - entry_sync_state, &entryuuid_bv ); - } - - ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_STATE; - ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); - ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 ); - - ber_free_buf( ber ); - - if ( ret < 0 ) { - Debug( LDAP_DEBUG_TRACE, - "slap_build_sync_ctrl: ber_flatten2 failed\n", - 0, 0, 0 ); - send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); - return ret; - } - - return LDAP_SUCCESS; -} -#endif - static int syncprov_sendinfo( Operation *op, @@ -472,6 +415,7 @@ syncprov_findbase( Operation *op, fbase_ cb.sc_private = fc; fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync mode */ + fop.o_managedsait = SLAP_CONTROL_CRITICAL; fop.o_callback = &cb; fop.o_tag = LDAP_REQ_SEARCH; fop.ors_scope = LDAP_SCOPE_BASE; @@ -525,7 +469,7 @@ findmax_cb( Operation *op, SlapReply *rs Attribute *a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); - if ( a && ber_bvcmp( &a->a_vals[0], maxcsn )) { + if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 ) { maxcsn->bv_len = a->a_vals[0].bv_len; strcpy( maxcsn->bv_val, a->a_vals[0].bv_val ); } @@ -606,21 +550,27 @@ syncprov_findcsn( Operation *op, int mod char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; struct berval fbuf, maxcsn; Filter cf, af; - AttributeAssertion eq; +#ifdef LDAP_COMP_MATCH + AttributeAssertion eq = { NULL, BER_BVNULL, NULL }; +#else + AttributeAssertion eq = { NULL, BER_BVNULL }; +#endif int i, rc = LDAP_SUCCESS; fpres_cookie pcookie; - sync_control *srs; + sync_control *srs = NULL; if ( mode != FIND_MAXCSN ) { srs = op->o_controls[slap_cids.sc_LDAPsync]; - if ( srs->sr_state.ctxcsn->bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) { + if ( srs->sr_state.ctxcsn.bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) { return LDAP_OTHER; } } fop = *op; fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */ + /* We want pure entries, not referrals */ + fop.o_managedsait = SLAP_CONTROL_CRITICAL; fbuf.bv_val = buf; cf.f_ava = &eq; @@ -644,12 +594,13 @@ syncprov_findcsn( Operation *op, int mod fop.ors_slimit = SLAP_NO_LIMIT; cb.sc_private = &maxcsn; cb.sc_response = findmax_cb; + strcpy( cbuf, si->si_ctxcsn.bv_val ); maxcsn.bv_val = cbuf; - maxcsn.bv_len = 0; + maxcsn.bv_len = si->si_ctxcsn.bv_len; break; case FIND_CSN: cf.f_choice = LDAP_FILTER_LE; - cf.f_av_value = *srs->sr_state.ctxcsn; + cf.f_av_value = srs->sr_state.ctxcsn; fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", cf.f_av_value.bv_val ); fop.ors_attrsonly = 1; @@ -663,15 +614,13 @@ syncprov_findcsn( Operation *op, int mod af.f_next = NULL; af.f_and = &cf; cf.f_choice = LDAP_FILTER_LE; - cf.f_av_value = *srs->sr_state.ctxcsn; + cf.f_av_value = srs->sr_state.ctxcsn; cf.f_next = op->ors_filter; fop.ors_filter = ⁡ filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr ); fop.ors_attrsonly = 0; fop.ors_attrs = uuid_anlist; fop.ors_slimit = SLAP_NO_LIMIT; - /* We want pure entries, not referrals */ - fop.o_managedsait = SLAP_CONTROL_CRITICAL; cb.sc_private = &pcookie; cb.sc_response = findpres_cb; pcookie.num = 0; @@ -696,10 +645,8 @@ syncprov_findcsn( Operation *op, int mod switch( mode ) { case FIND_MAXCSN: - if ( maxcsn.bv_len ) { - strcpy( si->si_ctxcsnbuf, maxcsn.bv_val ); - si->si_ctxcsn.bv_len = maxcsn.bv_len; - } + strcpy( si->si_ctxcsnbuf, maxcsn.bv_val ); + si->si_ctxcsn.bv_len = maxcsn.bv_len; break; case FIND_CSN: /* If matching CSN was not found, invalidate the context. */ @@ -714,7 +661,7 @@ syncprov_findcsn( Operation *op, int mod return rc; } -/* Queue a persistent search response if still in Refresh stage */ +/* Queue a persistent search response */ static int syncprov_qresp( opcookie *opc, syncops *so, int mode ) { @@ -724,13 +671,18 @@ syncprov_qresp( opcookie *opc, syncops * opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 ); sr->s_next = NULL; sr->s_dn.bv_val = (char *)(sr + 1); + sr->s_dn.bv_len = opc->sdn.bv_len; sr->s_mode = mode; sr->s_isreference = opc->sreference; sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val ); + sr->s_ndn.bv_len = opc->sndn.bv_len; *(sr->s_ndn.bv_val++) = '\0'; sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val ); + sr->s_uuid.bv_len = opc->suuid.bv_len; *(sr->s_uuid.bv_val++) = '\0'; sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val ); + sr->s_csn.bv_len = opc->sctxcsn.bv_len; + strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val ); if ( !so->s_res ) { so->s_res = sr; @@ -742,9 +694,57 @@ syncprov_qresp( opcookie *opc, syncops * return LDAP_SUCCESS; } +/* Play back queued responses */ +static int +syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue ); + +static int +syncprov_qplay( Operation *op, slap_overinst *on, syncops *so ) +{ + syncres *sr, *srnext; + Entry *e; + opcookie opc; + int rc; + + opc.son = on; + op->o_bd->bd_info = (BackendInfo *)on->on_info; + for (sr = so->s_res; sr; sr=srnext) { + srnext = sr->s_next; + opc.sdn = sr->s_dn; + opc.sndn = sr->s_ndn; + opc.suuid = sr->s_uuid; + opc.sctxcsn = sr->s_csn; + opc.sreference = sr->s_isreference; + e = NULL; + + if ( sr->s_mode != LDAP_SYNC_DELETE ) { + rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); + if ( rc ) { + ch_free( sr ); + so->s_res = srnext; + continue; + } + } + rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode, 0 ); + + if ( e ) { + be_entry_release_rw( op, e, 0 ); + } + if ( rc ) + break; + + ch_free( sr ); + so->s_res = srnext; + } + op->o_bd->bd_info = (BackendInfo *)on; + if ( !so->s_res ) + so->s_restail = NULL; + return rc; +} + /* Send a persistent search response */ static int -syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode, int queue ) +syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue ) { slap_overinst *on = opc->son; syncprov_info_t *si = on->on_bi.bi_private; @@ -757,22 +757,47 @@ syncprov_sendresp( Operation *op, opcook Operation sop = *so->s_op; Opheader ohdr; + if ( so->s_op->o_abandon ) + return SLAPD_ABANDON; + ohdr = *sop.o_hdr; sop.o_hdr = &ohdr; sop.o_tmpmemctx = op->o_tmpmemctx; sop.o_bd = op->o_bd; sop.o_controls = op->o_controls; + sop.o_private = op->o_private; - if ( queue && (so->s_flags & PS_IS_REFRESHING) ) { + /* If queueing is allowed */ + if ( queue ) { ldap_pvt_thread_mutex_lock( &so->s_mutex ); - if ( so->s_flags & PS_IS_REFRESHING ) + /* If we're still in refresh mode, must queue */ + if (so->s_flags & PS_IS_REFRESHING) { + return syncprov_qresp( opc, so, mode ); + } + /* If connection is free but queue is non-empty, + * try to flush the queue. + */ + if ( so->s_res ) { + rs.sr_err = syncprov_qplay( &sop, on, so ); + } + /* If the connection is busy, must queue */ + if ( sop.o_conn->c_writewaiter || rs.sr_err == LDAP_BUSY ) { return syncprov_qresp( opc, so, mode ); + } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + + /* If syncprov_qplay returned any other error, bail out. */ + if ( rs.sr_err ) { + return rs.sr_err; + } + } else { + /* Queueing not allowed and conn is busy, give up */ + if ( sop.o_conn->c_writewaiter ) + return LDAP_BUSY; } ctrls[1] = NULL; - slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, - so->s_sid, so->s_rid ); + slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid ); e_uuid.e_attrs = &a_uuid; a_uuid.a_desc = slap_schema.si_ad_entryUUID; @@ -780,20 +805,30 @@ syncprov_sendresp( Operation *op, opcook rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid, mode, ctrls, 0, 1, &cookie ); - rs.sr_entry = e; rs.sr_ctrls = ctrls; + op->o_bd->bd_info = (BackendInfo *)on->on_info; switch( mode ) { case LDAP_SYNC_ADD: + rs.sr_entry = *e; + if ( rs.sr_entry->e_private ) + rs.sr_flags = REP_ENTRY_MUSTRELEASE; if ( opc->sreference ) { - rs.sr_ref = get_entry_referrals( &sop, e ); + rs.sr_ref = get_entry_referrals( &sop, rs.sr_entry ); send_search_reference( &sop, &rs ); ber_bvarray_free( rs.sr_ref ); + if ( !rs.sr_entry ) + *e = NULL; break; } /* fallthru */ case LDAP_SYNC_MODIFY: + rs.sr_entry = *e; + if ( rs.sr_entry->e_private ) + rs.sr_flags = REP_ENTRY_MUSTRELEASE; rs.sr_attrs = sop.ors_attrs; send_search_entry( &sop, &rs ); + if ( !rs.sr_entry ) + *e = NULL; break; case LDAP_SYNC_DELETE: e_uuid.e_attrs = NULL; @@ -812,7 +847,20 @@ syncprov_sendresp( Operation *op, opcook assert(0); } op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx ); + op->o_private = sop.o_private; rs.sr_ctrls = NULL; + /* Check queue again here; if we were hanging in a send and eventually + * recovered, there may be more to send now. But don't check if the + * original psearch has been abandoned. + */ + if ( so->s_op->o_abandon ) + return SLAPD_ABANDON; + + if ( rs.sr_err == LDAP_SUCCESS && queue && so->s_res ) { + ldap_pvt_thread_mutex_lock( &so->s_mutex ); + rs.sr_err = syncprov_qplay( &sop, on, so ); + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + } return rs.sr_err; } @@ -829,12 +877,14 @@ syncprov_free_syncop( syncops *so ) return; } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); - filter_free( so->s_op->ors_filter ); - for ( ga = so->s_op->o_groups; ga; ga=gnext ) { - gnext = ga->ga_next; - ch_free( ga ); + if ( so->s_flags & PS_IS_DETACHED ) { + filter_free( so->s_op->ors_filter ); + for ( ga = so->s_op->o_groups; ga; ga=gnext ) { + gnext = ga->ga_next; + ch_free( ga ); + } + ch_free( so->s_op ); } - ch_free( so->s_op ); ch_free( so->s_base.bv_val ); for ( sr=so->s_res; sr; sr=srnext ) { srnext = sr->s_next; @@ -847,15 +897,29 @@ syncprov_free_syncop( syncops *so ) static int syncprov_drop_psearch( syncops *so, int lock ) { - if ( lock ) - ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex ); - so->s_op->o_conn->c_n_ops_executing--; - so->s_op->o_conn->c_n_ops_completed++; - LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, slap_op, - o_next ); - if ( lock ) - ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); + if ( so->s_flags & PS_IS_DETACHED ) { + if ( lock ) + ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex ); + so->s_op->o_conn->c_n_ops_executing--; + so->s_op->o_conn->c_n_ops_completed++; + LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, slap_op, + o_next ); + if ( lock ) + ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); + } syncprov_free_syncop( so ); + + return 0; +} + +static int +syncprov_ab_cleanup( Operation *op, SlapReply *rs ) +{ + slap_callback *sc = op->o_callback; + op->o_callback = sc->sc_next; + syncprov_drop_psearch( op->o_callback->sc_private, 0 ); + op->o_tmpfree( sc, op->o_tmpmemctx ); + return 0; } static int @@ -879,15 +943,19 @@ syncprov_op_abandon( Operation *op, Slap if ( so ) { /* Is this really a Cancel exop? */ if ( op->o_tag != LDAP_REQ_ABANDON ) { + so->s_op->o_cancel = SLAP_CANCEL_ACK; rs->sr_err = LDAP_CANCELLED; send_ldap_result( so->s_op, rs ); + if ( so->s_flags & PS_IS_DETACHED ) { + slap_callback *cb; + cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx ); + cb->sc_cleanup = syncprov_ab_cleanup; + cb->sc_next = op->o_callback; + cb->sc_private = so; + return SLAP_CB_CONTINUE; + } } - /* Our cloned searches have no ctrls set. - * we don't want to muck with real search ops - * from the frontend. - */ - if ( ! so->s_op->o_sync ) - syncprov_drop_psearch( so, 0 ); + syncprov_drop_psearch( so, 0 ); } return SLAP_CB_CONTINUE; } @@ -920,16 +988,18 @@ syncprov_matchops( Operation *op, opcook if ( op->o_tag != LDAP_REQ_ADD ) { op->o_bd->bd_info = (BackendInfo *)on->on_info; rc = be_entry_get_rw( op, fc.fdn, NULL, NULL, 0, &e ); + /* If we're sending responses now, make a copy and unlock the DB */ + if ( e && !saveit ) { + Entry *e2 = entry_dup( e ); + be_entry_release_rw( op, e, 0 ); + e = e2; + } op->o_bd->bd_info = (BackendInfo *)on; if ( rc ) return; } else { e = op->ora_e; } - /* Never replicate these */ - if ( is_entry_syncConsumerSubentry( e )) { - goto done; - } if ( saveit ) { ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); @@ -990,19 +1060,25 @@ syncprov_matchops( Operation *op, opcook opc->smatches = sm; } else { /* if found send UPDATE else send ADD */ - syncprov_sendresp( op, opc, ss, e, + ss->s_inuse++; + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + syncprov_sendresp( op, opc, ss, &e, found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 ); + ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); + ss->s_inuse--; } } else if ( !saveit && found ) { /* send DELETE */ + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 ); + ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); } } ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); done: - if ( op->o_tag != LDAP_REQ_ADD ) { + if ( op->o_tag != LDAP_REQ_ADD && e ) { op->o_bd->bd_info = (BackendInfo *)on->on_info; - be_entry_release_r( op, e ); + be_entry_release_rw( op, e, 0 ); op->o_bd->bd_info = (BackendInfo *)on; } if ( freefdn ) { @@ -1030,7 +1106,6 @@ syncprov_op_cleanup( Operation *op, Slap mtdummy.mt_op = op; ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); - ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); if ( mt ) { modinst *mi = mt->mt_mods; @@ -1041,14 +1116,13 @@ syncprov_op_cleanup( Operation *op, Slap mt->mt_op = mt->mt_mods->mi_op; ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); } else { - ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); avl_delete( &si->si_mods, mt, sp_avl_cmp ); - ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); ch_free( mt ); } } + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); if ( !BER_BVISNULL( &opc->suuid )) op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx ); if ( !BER_BVISNULL( &opc->sndn )) @@ -1057,6 +1131,8 @@ syncprov_op_cleanup( Operation *op, Slap op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); op->o_callback = cb->sc_next; op->o_tmpfree(cb, op->o_tmpmemctx); + + return 0; } static void @@ -1066,8 +1142,8 @@ syncprov_checkpoint( Operation *op, Slap Modifications mod; Operation opm; struct berval bv[2]; - BackendInfo *orig; slap_callback cb = {0}; + int manage = get_manageDSAit(op); mod.sml_values = bv; bv[1].bv_val = NULL; @@ -1084,9 +1160,10 @@ syncprov_checkpoint( Operation *op, Slap opm.orm_modlist = &mod; opm.o_req_dn = op->o_bd->be_suffix[0]; opm.o_req_ndn = op->o_bd->be_nsuffix[0]; - orig = opm.o_bd->bd_info; opm.o_bd->bd_info = on->on_info->oi_orig; + opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; opm.o_bd->be_modify( &opm, rs ); + opm.o_managedsait = manage; } static void @@ -1098,7 +1175,8 @@ syncprov_add_slog( Operation *op, struct sessionlog *sl; slog_entry *se; - for ( sl = si->si_logs; sl; sl=sl->sl_next ) { + sl = si->si_logs; + { /* Allocate a record. UUIDs are not NUL-terminated. */ se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + csn->bv_len + 1 ); @@ -1194,6 +1272,10 @@ syncprov_playlog( Operation *op, SlapRep ndel = i; + /* Zero out unused slots */ + for ( i=ndel; i < num - nmods; i++ ) + uuids[i].bv_len = 0; + /* Mods must be validated to see if they belong in this delete set. */ @@ -1222,7 +1304,11 @@ syncprov_playlog( Operation *op, SlapRep SlapReply frs = { REP_RESULT }; int rc; Filter mf, af; +#ifdef LDAP_COMP_MATCH + AttributeAssertion eq = { NULL, BER_BVNULL, NULL }; +#else AttributeAssertion eq; +#endif slap_callback cb = {0}; fop = *op; @@ -1294,6 +1380,13 @@ syncprov_op_response( Operation *op, Sla } } + /* Don't do any processing for consumer contextCSN updates */ + if ( SLAP_SYNC_SHADOW( op->o_bd ) && + op->o_msgid == SLAP_SYNC_UPDATE_MSGID ) { + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); + return SLAP_CB_CONTINUE; + } + si->si_numops++; if ( si->si_chkops || si->si_chktime ) { int do_check=0; @@ -1328,14 +1421,12 @@ syncprov_op_response( Operation *op, Sla /* for each match in opc->smatches: * send DELETE msg */ - ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); for ( sm = opc->smatches; sm; sm=sm->sm_next ) { if ( sm->sm_op->s_op->o_abandon ) continue; syncprov_sendresp( op, opc, sm->sm_op, NULL, LDAP_SYNC_DELETE, 1 ); } - ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); break; } } @@ -1461,6 +1552,18 @@ syncprov_op_mod( Operation *op, SlapRepl ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); ldap_pvt_thread_yield(); ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); + + /* clean up if the caller is giving up */ + if ( op->o_abandon ) { + modinst *m2; + for ( m2 = mt->mt_mods; m2->mi_next != mi; + m2 = m2->mi_next ); + m2->mi_next = mi->mi_next; + if ( mt->mt_tail == mi ) mt->mt_tail = m2; + op->o_tmpfree( cb, op->o_tmpmemctx ); + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); + return SLAPD_ABANDON; + } } ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); } else { @@ -1477,7 +1580,6 @@ syncprov_op_mod( Operation *op, SlapRepl if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) syncprov_matchops( op, opc, 1 ); - return SLAP_CB_CONTINUE; } @@ -1581,6 +1683,7 @@ syncprov_detach_op( Operation *op, synco op->o_conn->c_n_ops_executing++; op->o_conn->c_n_ops_completed--; LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next ); + so->s_flags |= PS_IS_DETACHED; ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); } @@ -1604,11 +1707,12 @@ syncprov_search_response( Operation *op, Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 ); return SLAP_CB_CONTINUE; } - if ( srs->sr_state.ctxcsn ) { + if ( !BER_BVISNULL( &srs->sr_state.ctxcsn )) { Attribute *a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); + /* Don't send the ctx entry twice */ - if ( bvmatch( &a->a_nvals[0], srs->sr_state.ctxcsn )) + if ( a && bvmatch( &a->a_nvals[0], &srs->sr_state.ctxcsn ) ) return LDAP_SUCCESS; } rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, @@ -1621,7 +1725,7 @@ syncprov_search_response( Operation *op, slap_compose_sync_cookie( op, &cookie, &op->ors_filter->f_and->f_ava->aa_value, - srs->sr_state.sid, srs->sr_state.rid ); + srs->sr_state.rid ); /* Is this a regular refresh? */ if ( !ss->ss_so ) { @@ -1639,41 +1743,9 @@ syncprov_search_response( Operation *op, &cookie, 1, NULL, 0 ); /* Flush any queued persist messages */ if ( ss->ss_so->s_res ) { - syncres *sr, *srnext; - Entry *e; - opcookie opc; - - opc.son = on; ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); locked = 1; - for (sr = ss->ss_so->s_res; sr; sr=srnext) { - int rc = LDAP_SUCCESS; - srnext = sr->s_next; - opc.sdn = sr->s_dn; - opc.sndn = sr->s_ndn; - opc.suuid = sr->s_uuid; - opc.sctxcsn = sr->s_csn; - opc.sreference = sr->s_isreference; - e = NULL; - - if ( sr->s_mode != LDAP_SYNC_DELETE ) { - op->o_bd->bd_info = (BackendInfo *)on->on_info; - rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); - op->o_bd->bd_info = (BackendInfo *)on; - } - if ( rc == LDAP_SUCCESS ) - syncprov_sendresp( op, &opc, ss->ss_so, e, - sr->s_mode, 0 ); - - if ( e ) { - op->o_bd->bd_info = (BackendInfo *)on->on_info; - be_entry_release_r( op, e ); - op->o_bd->bd_info = (BackendInfo *)on; - } - ch_free( sr ); - } - ss->ss_so->s_res = NULL; - ss->ss_so->s_restail = NULL; + syncprov_qplay( op, on, ss->ss_so ); } /* Turn off the refreshing flag */ @@ -1713,6 +1785,7 @@ syncprov_op_search( Operation *op, SlapR } srs = op->o_controls[slap_cids.sc_LDAPsync]; + op->o_managedsait = SLAP_CONTROL_NONCRITICAL; /* If this is a persistent search, set it up right away */ if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) { @@ -1741,7 +1814,6 @@ syncprov_op_search( Operation *op, SlapR sop = ch_malloc( sizeof( syncops )); *sop = so; ldap_pvt_thread_mutex_init( &sop->s_mutex ); - sop->s_sid = srs->sr_state.sid; sop->s_rid = srs->sr_state.rid; sop->s_inuse = 1; @@ -1759,13 +1831,13 @@ syncprov_op_search( Operation *op, SlapR ctxcsn.bv_val = csnbuf; /* If we have a cookie, handle the PRESENT lookups */ - if ( srs->sr_state.ctxcsn ) { + if ( !BER_BVISNULL( &srs->sr_state.ctxcsn )) { sessionlog *sl; /* The cookie was validated when it was parsed, just use it */ /* If just Refreshing and nothing has changed, shortcut it */ - if ( bvmatch( srs->sr_state.ctxcsn, &ctxcsn )) { + if ( bvmatch( &srs->sr_state.ctxcsn, &ctxcsn )) { nochange = 1; if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { LDAPControl *ctrls[2]; @@ -1783,14 +1855,13 @@ syncprov_op_search( Operation *op, SlapR goto shortcut; } /* Do we have a sessionlog for this search? */ - for ( sl=si->si_logs; sl; sl=sl->sl_next ) - if ( sl->sl_sid == srs->sr_state.sid ) break; + sl=si->si_logs; if ( sl ) { ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); - if ( ber_bvcmp( srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) { + if ( ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) { do_present = 0; /* mutex is unlocked in playlog */ - syncprov_playlog( op, rs, sl, srs->sr_state.ctxcsn, &ctxcsn ); + syncprov_playlog( op, rs, sl, &srs->sr_state.ctxcsn, &ctxcsn ); } else { ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); } @@ -1830,6 +1901,9 @@ shortcut: fava->f_choice = LDAP_FILTER_LE; fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; +#ifdef LDAP_COMP_MATCH + fava->f_ava->aa_cf = NULL; +#endif ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx ); fand->f_and = fava; if ( gotstate ) { @@ -1838,7 +1912,10 @@ shortcut: fava->f_choice = LDAP_FILTER_GE; fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; - ber_dupbv_x( &fava->f_ava->aa_value, srs->sr_state.ctxcsn, op->o_tmpmemctx ); +#ifdef LDAP_COMP_MATCH + fava->f_ava->aa_cf = NULL; +#endif + ber_dupbv_x( &fava->f_ava->aa_value, &srs->sr_state.ctxcsn, op->o_tmpmemctx ); } fava->f_next = op->ors_filter; op->ors_filter = fand; @@ -1918,72 +1995,109 @@ syncprov_operational( return SLAP_CB_CONTINUE; } +enum { + SP_CHKPT = 1, + SP_SESSL +}; + +static ConfigDriver sp_cf_gen; + +static ConfigTable spcfg[] = { + { "syncprov-checkpoint", "ops> bd_info; + slap_overinst *on = (slap_overinst *)c->bi; syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; + int rc = 0; - if ( strcasecmp( argv[ 0 ], "syncprov-checkpoint" ) == 0 ) { - if ( argc != 3 ) { - fprintf( stderr, "%s: line %d: wrong number of arguments in " - "\"syncprov-checkpoint \"\n", fname, lineno ); - return -1; + if ( c->op == SLAP_CONFIG_EMIT ) { + switch ( c->type ) { + case SP_CHKPT: + if ( si->si_chkops || si->si_chktime ) { + struct berval bv; + bv.bv_len = sprintf( c->msg, "%d %d", + si->si_chkops, si->si_chktime ); + bv.bv_val = c->msg; + value_add_one( &c->rvalue_vals, &bv ); + } else { + rc = 1; + } + break; + case SP_SESSL: + if ( si->si_logs ) { + c->value_int = si->si_logs->sl_size; + } else { + rc = 1; + } + break; } - si->si_chkops = atoi( argv[1] ); - si->si_chktime = atoi( argv[2] ) * 60; - return 0; - - } else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) { - sessionlog *sl; - int sid, size; - if ( argc != 3 ) { - fprintf( stderr, "%s: line %d: wrong number of arguments in " - "\"syncprov-sessionlog \"\n", fname, lineno ); - return -1; - } - sid = atoi( argv[1] ); - if ( sid < 0 || sid > 999 ) { - fprintf( stderr, - "%s: line %d: session log id %d is out of range [0..999]\n", - fname, lineno, sid ); - return -1; + return rc; + } else if ( c->op == LDAP_MOD_DELETE ) { + switch ( c->type ) { + case SP_CHKPT: + si->si_chkops = 0; + si->si_chktime = 0; + break; + case SP_SESSL: + if ( si->si_logs ) + si->si_logs->sl_size = 0; + else + rc = LDAP_NO_SUCH_ATTRIBUTE; + break; } - size = atoi( argv[2] ); + return rc; + } + switch ( c->type ) { + case SP_CHKPT: + si->si_chkops = atoi( c->argv[1] ); + si->si_chktime = atoi( c->argv[2] ) * 60; + break; + case SP_SESSL: { + sessionlog *sl; + int size = c->value_int; + if ( size < 0 ) { - fprintf( stderr, - "%s: line %d: session log size %d is negative\n", - fname, lineno, size ); - return -1; - } - for ( sl = si->si_logs; sl; sl=sl->sl_next ) { - if ( sl->sl_sid == sid ) { - sl->sl_size = size; - break; - } + sprintf( c->msg, "%s size %d is negative", + c->argv[0], size ); + Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 ); + return ARG_BAD_CONF; } + sl = si->si_logs; if ( !sl ) { sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE ); sl->sl_mincsn.bv_val = (char *)(sl+1); sl->sl_mincsn.bv_len = 0; - sl->sl_sid = sid; - sl->sl_size = size; sl->sl_num = 0; sl->sl_head = sl->sl_tail = NULL; - sl->sl_next = si->si_logs; ldap_pvt_thread_mutex_init( &sl->sl_mutex ); si->si_logs = sl; } - return 0; + sl->sl_size = size; + } + break; } - - return SLAP_CONF_UNKNOWN; + return rc; } /* Cheating - we have no thread pool context for these functions, @@ -2019,6 +2133,15 @@ syncprov_db_open( Attribute *a; int rc; + if ( slapMode & SLAP_TOOL_MODE ) { + return 0; + } + + rc = overlay_register_control( be, LDAP_CONTROL_SYNC ); + if ( rc ) { + return rc; + } + connection_fake_init( &conn, op, thrctx ); op->o_bd = be; op->o_dn = be->be_rootdn; @@ -2041,12 +2164,18 @@ syncprov_db_open( si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0'; strcpy( ctxcsnbuf, si->si_ctxcsnbuf ); } - be_entry_release_r( op, e ); + be_entry_release_rw( op, e, 0 ); op->o_bd->bd_info = (BackendInfo *)on; op->o_req_dn = be->be_suffix[0]; op->o_req_ndn = be->be_nsuffix[0]; op->ors_scope = LDAP_SCOPE_SUBTREE; syncprov_findcsn( op, FIND_MAXCSN ); + } else if ( SLAP_SYNC_SHADOW( op->o_bd )) { + /* If we're also a consumer, and we didn't find the context entry, + * then don't generate anything, wait for our provider to send it + * to us. + */ + goto out; } if ( BER_BVISEMPTY( &si->si_ctxcsn ) ) { @@ -2062,6 +2191,7 @@ syncprov_db_open( syncprov_checkpoint( op, &rs, on ); } +out: op->o_bd->bd_info = (BackendInfo *)on; return 0; } @@ -2077,6 +2207,9 @@ syncprov_db_close( syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; int i; + if ( slapMode & SLAP_TOOL_MODE ) { + return 0; + } if ( si->si_numops ) { Connection conn; char opbuf[OPERATION_BUFFER_SIZE]; @@ -2224,7 +2357,7 @@ static int syncprov_parseCtrl ( sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx ); sr->sr_rhint = rhint; if (!BER_BVISNULL(&cookie)) { - ber_bvarray_add( &sr->sr_state.octet_str, &cookie ); + ber_dupbv( &sr->sr_state.octet_str, &cookie ); slap_parse_sync_cookie( &sr->sr_state ); } @@ -2257,13 +2390,13 @@ syncprov_init() SLAP_CTRL_HIDE|SLAP_CTRL_SEARCH, NULL, syncprov_parseCtrl, &slap_cids.sc_LDAPsync ); if ( rc != LDAP_SUCCESS ) { - fprintf( stderr, "Failed to register control %d\n", rc ); + Debug( LDAP_DEBUG_ANY, + "syncprov_init: Failed to register control %d\n", rc, 0, 0 ); return rc; } syncprov.on_bi.bi_type = "syncprov"; syncprov.on_bi.bi_db_init = syncprov_db_init; - syncprov.on_bi.bi_db_config = syncprov_db_config; syncprov.on_bi.bi_db_destroy = syncprov_db_destroy; syncprov.on_bi.bi_db_open = syncprov_db_open; syncprov.on_bi.bi_db_close = syncprov_db_close; @@ -2280,6 +2413,11 @@ syncprov_init() syncprov.on_bi.bi_extended = syncprov_op_extended; syncprov.on_bi.bi_operational = syncprov_operational; + syncprov.on_bi.bi_cf_ocs = spocs; + + rc = config_register_schema( spcfg, spocs ); + if ( rc ) return rc; + return overlay_register( &syncprov ); }