--- servers/slapd/overlays/syncprov.c 2004/12/10 02:18:33 1.56
+++ servers/slapd/overlays/syncprov.c 2005/05/10 16:07:04 1.56.2.7
@@ -1,8 +1,8 @@
-/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.55 2004/12/10 01:51:34 hyc Exp $ */
+/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.56.2.6 2005/04/29 21:29:10 kurt Exp $ */
/* syncprov.c - syncrepl provider */
/* This work is part of OpenLDAP Software .
*
- * Copyright 2004 The OpenLDAP Foundation.
+ * Copyright 2004-2005 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -25,6 +25,7 @@
#include
#include "lutil.h"
#include "slap.h"
+#include "config.h"
/* A modify request on a particular entry */
typedef struct modinst {
@@ -56,7 +57,6 @@ typedef struct syncops {
struct berval s_base; /* ndn of search base */
ID s_eid; /* entryID of search base */
Operation *s_op; /* search op */
- int s_sid;
int s_rid;
struct berval s_filterstr;
int s_flags; /* search status */
@@ -84,6 +84,7 @@ typedef struct sync_control {
#define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<o_tmpmemctx );
-
- ctrls[num_ctrls] = ch_malloc ( sizeof ( LDAPControl ) );
-
- entryuuid_bv = slog_e->sl_uuid;
-
- if ( send_cookie && cookie ) {
- ber_printf( ber, "{eOON}",
- entry_sync_state, &entryuuid_bv, cookie );
- } else {
- ber_printf( ber, "{eON}",
- entry_sync_state, &entryuuid_bv );
- }
-
- ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_STATE;
- ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
- ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 );
-
- ber_free_buf( ber );
-
- if ( ret < 0 ) {
- Debug( LDAP_DEBUG_TRACE,
- "slap_build_sync_ctrl: ber_flatten2 failed\n",
- 0, 0, 0 );
- send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
- return ret;
- }
-
- return LDAP_SUCCESS;
-}
-#endif
-
static int
syncprov_sendinfo(
Operation *op,
@@ -472,6 +415,7 @@ syncprov_findbase( Operation *op, fbase_
cb.sc_private = fc;
fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync mode */
+ fop.o_managedsait = SLAP_CONTROL_CRITICAL;
fop.o_callback = &cb;
fop.o_tag = LDAP_REQ_SEARCH;
fop.ors_scope = LDAP_SCOPE_BASE;
@@ -525,7 +469,7 @@ findmax_cb( Operation *op, SlapReply *rs
Attribute *a = attr_find( rs->sr_entry->e_attrs,
slap_schema.si_ad_entryCSN );
- if ( a && ber_bvcmp( &a->a_vals[0], maxcsn )) {
+ if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 ) {
maxcsn->bv_len = a->a_vals[0].bv_len;
strcpy( maxcsn->bv_val, a->a_vals[0].bv_val );
}
@@ -606,21 +550,27 @@ syncprov_findcsn( Operation *op, int mod
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
struct berval fbuf, maxcsn;
Filter cf, af;
- AttributeAssertion eq;
+#ifdef LDAP_COMP_MATCH
+ AttributeAssertion eq = { NULL, BER_BVNULL, NULL };
+#else
+ AttributeAssertion eq = { NULL, BER_BVNULL };
+#endif
int i, rc = LDAP_SUCCESS;
fpres_cookie pcookie;
- sync_control *srs;
+ sync_control *srs = NULL;
if ( mode != FIND_MAXCSN ) {
srs = op->o_controls[slap_cids.sc_LDAPsync];
- if ( srs->sr_state.ctxcsn->bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) {
+ if ( srs->sr_state.ctxcsn.bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) {
return LDAP_OTHER;
}
}
fop = *op;
fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */
+ /* We want pure entries, not referrals */
+ fop.o_managedsait = SLAP_CONTROL_CRITICAL;
fbuf.bv_val = buf;
cf.f_ava = &eq;
@@ -644,12 +594,13 @@ syncprov_findcsn( Operation *op, int mod
fop.ors_slimit = SLAP_NO_LIMIT;
cb.sc_private = &maxcsn;
cb.sc_response = findmax_cb;
+ strcpy( cbuf, si->si_ctxcsn.bv_val );
maxcsn.bv_val = cbuf;
- maxcsn.bv_len = 0;
+ maxcsn.bv_len = si->si_ctxcsn.bv_len;
break;
case FIND_CSN:
cf.f_choice = LDAP_FILTER_LE;
- cf.f_av_value = *srs->sr_state.ctxcsn;
+ cf.f_av_value = srs->sr_state.ctxcsn;
fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)",
cf.f_av_value.bv_val );
fop.ors_attrsonly = 1;
@@ -663,15 +614,13 @@ syncprov_findcsn( Operation *op, int mod
af.f_next = NULL;
af.f_and = &cf;
cf.f_choice = LDAP_FILTER_LE;
- cf.f_av_value = *srs->sr_state.ctxcsn;
+ cf.f_av_value = srs->sr_state.ctxcsn;
cf.f_next = op->ors_filter;
fop.ors_filter = ⁡
filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr );
fop.ors_attrsonly = 0;
fop.ors_attrs = uuid_anlist;
fop.ors_slimit = SLAP_NO_LIMIT;
- /* We want pure entries, not referrals */
- fop.o_managedsait = SLAP_CONTROL_CRITICAL;
cb.sc_private = &pcookie;
cb.sc_response = findpres_cb;
pcookie.num = 0;
@@ -696,10 +645,8 @@ syncprov_findcsn( Operation *op, int mod
switch( mode ) {
case FIND_MAXCSN:
- if ( maxcsn.bv_len ) {
- strcpy( si->si_ctxcsnbuf, maxcsn.bv_val );
- si->si_ctxcsn.bv_len = maxcsn.bv_len;
- }
+ strcpy( si->si_ctxcsnbuf, maxcsn.bv_val );
+ si->si_ctxcsn.bv_len = maxcsn.bv_len;
break;
case FIND_CSN:
/* If matching CSN was not found, invalidate the context. */
@@ -714,7 +661,7 @@ syncprov_findcsn( Operation *op, int mod
return rc;
}
-/* Queue a persistent search response if still in Refresh stage */
+/* Queue a persistent search response */
static int
syncprov_qresp( opcookie *opc, syncops *so, int mode )
{
@@ -724,13 +671,18 @@ syncprov_qresp( opcookie *opc, syncops *
opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
sr->s_next = NULL;
sr->s_dn.bv_val = (char *)(sr + 1);
+ sr->s_dn.bv_len = opc->sdn.bv_len;
sr->s_mode = mode;
sr->s_isreference = opc->sreference;
sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val );
+ sr->s_ndn.bv_len = opc->sndn.bv_len;
*(sr->s_ndn.bv_val++) = '\0';
sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val );
+ sr->s_uuid.bv_len = opc->suuid.bv_len;
*(sr->s_uuid.bv_val++) = '\0';
sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val );
+ sr->s_csn.bv_len = opc->sctxcsn.bv_len;
+ strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
if ( !so->s_res ) {
so->s_res = sr;
@@ -742,9 +694,57 @@ syncprov_qresp( opcookie *opc, syncops *
return LDAP_SUCCESS;
}
+/* Play back queued responses */
+static int
+syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue );
+
+static int
+syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
+{
+ syncres *sr, *srnext;
+ Entry *e;
+ opcookie opc;
+ int rc;
+
+ opc.son = on;
+ op->o_bd->bd_info = (BackendInfo *)on->on_info;
+ for (sr = so->s_res; sr; sr=srnext) {
+ srnext = sr->s_next;
+ opc.sdn = sr->s_dn;
+ opc.sndn = sr->s_ndn;
+ opc.suuid = sr->s_uuid;
+ opc.sctxcsn = sr->s_csn;
+ opc.sreference = sr->s_isreference;
+ e = NULL;
+
+ if ( sr->s_mode != LDAP_SYNC_DELETE ) {
+ rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
+ if ( rc ) {
+ ch_free( sr );
+ so->s_res = srnext;
+ continue;
+ }
+ }
+ rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode, 0 );
+
+ if ( e ) {
+ be_entry_release_rw( op, e, 0 );
+ }
+ if ( rc )
+ break;
+
+ ch_free( sr );
+ so->s_res = srnext;
+ }
+ op->o_bd->bd_info = (BackendInfo *)on;
+ if ( !so->s_res )
+ so->s_restail = NULL;
+ return rc;
+}
+
/* Send a persistent search response */
static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode, int queue )
+syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue )
{
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
@@ -757,22 +757,47 @@ syncprov_sendresp( Operation *op, opcook
Operation sop = *so->s_op;
Opheader ohdr;
+ if ( so->s_op->o_abandon )
+ return SLAPD_ABANDON;
+
ohdr = *sop.o_hdr;
sop.o_hdr = &ohdr;
sop.o_tmpmemctx = op->o_tmpmemctx;
sop.o_bd = op->o_bd;
sop.o_controls = op->o_controls;
+ sop.o_private = op->o_private;
- if ( queue && (so->s_flags & PS_IS_REFRESHING) ) {
+ /* If queueing is allowed */
+ if ( queue ) {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
- if ( so->s_flags & PS_IS_REFRESHING )
+ /* If we're still in refresh mode, must queue */
+ if (so->s_flags & PS_IS_REFRESHING) {
+ return syncprov_qresp( opc, so, mode );
+ }
+ /* If connection is free but queue is non-empty,
+ * try to flush the queue.
+ */
+ if ( so->s_res ) {
+ rs.sr_err = syncprov_qplay( &sop, on, so );
+ }
+ /* If the connection is busy, must queue */
+ if ( sop.o_conn->c_writewaiter || rs.sr_err == LDAP_BUSY ) {
return syncprov_qresp( opc, so, mode );
+ }
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+
+ /* If syncprov_qplay returned any other error, bail out. */
+ if ( rs.sr_err ) {
+ return rs.sr_err;
+ }
+ } else {
+ /* Queueing not allowed and conn is busy, give up */
+ if ( sop.o_conn->c_writewaiter )
+ return LDAP_BUSY;
}
ctrls[1] = NULL;
- slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn,
- so->s_sid, so->s_rid );
+ slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid );
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
@@ -780,20 +805,30 @@ syncprov_sendresp( Operation *op, opcook
rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid,
mode, ctrls, 0, 1, &cookie );
- rs.sr_entry = e;
rs.sr_ctrls = ctrls;
+ op->o_bd->bd_info = (BackendInfo *)on->on_info;
switch( mode ) {
case LDAP_SYNC_ADD:
+ rs.sr_entry = *e;
+ if ( rs.sr_entry->e_private )
+ rs.sr_flags = REP_ENTRY_MUSTRELEASE;
if ( opc->sreference ) {
- rs.sr_ref = get_entry_referrals( &sop, e );
+ rs.sr_ref = get_entry_referrals( &sop, rs.sr_entry );
send_search_reference( &sop, &rs );
ber_bvarray_free( rs.sr_ref );
+ if ( !rs.sr_entry )
+ *e = NULL;
break;
}
/* fallthru */
case LDAP_SYNC_MODIFY:
+ rs.sr_entry = *e;
+ if ( rs.sr_entry->e_private )
+ rs.sr_flags = REP_ENTRY_MUSTRELEASE;
rs.sr_attrs = sop.ors_attrs;
send_search_entry( &sop, &rs );
+ if ( !rs.sr_entry )
+ *e = NULL;
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
@@ -812,7 +847,20 @@ syncprov_sendresp( Operation *op, opcook
assert(0);
}
op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
+ op->o_private = sop.o_private;
rs.sr_ctrls = NULL;
+ /* Check queue again here; if we were hanging in a send and eventually
+ * recovered, there may be more to send now. But don't check if the
+ * original psearch has been abandoned.
+ */
+ if ( so->s_op->o_abandon )
+ return SLAPD_ABANDON;
+
+ if ( rs.sr_err == LDAP_SUCCESS && queue && so->s_res ) {
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ rs.sr_err = syncprov_qplay( &sop, on, so );
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ }
return rs.sr_err;
}
@@ -829,12 +877,14 @@ syncprov_free_syncop( syncops *so )
return;
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- filter_free( so->s_op->ors_filter );
- for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
- gnext = ga->ga_next;
- ch_free( ga );
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ filter_free( so->s_op->ors_filter );
+ for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
+ gnext = ga->ga_next;
+ ch_free( ga );
+ }
+ ch_free( so->s_op );
}
- ch_free( so->s_op );
ch_free( so->s_base.bv_val );
for ( sr=so->s_res; sr; sr=srnext ) {
srnext = sr->s_next;
@@ -847,15 +897,29 @@ syncprov_free_syncop( syncops *so )
static int
syncprov_drop_psearch( syncops *so, int lock )
{
- if ( lock )
- ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
- so->s_op->o_conn->c_n_ops_executing--;
- so->s_op->o_conn->c_n_ops_completed++;
- LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, slap_op,
- o_next );
- if ( lock )
- ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ if ( lock )
+ ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
+ so->s_op->o_conn->c_n_ops_executing--;
+ so->s_op->o_conn->c_n_ops_completed++;
+ LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, slap_op,
+ o_next );
+ if ( lock )
+ ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
+ }
syncprov_free_syncop( so );
+
+ return 0;
+}
+
+static int
+syncprov_ab_cleanup( Operation *op, SlapReply *rs )
+{
+ slap_callback *sc = op->o_callback;
+ op->o_callback = sc->sc_next;
+ syncprov_drop_psearch( op->o_callback->sc_private, 0 );
+ op->o_tmpfree( sc, op->o_tmpmemctx );
+ return 0;
}
static int
@@ -879,15 +943,19 @@ syncprov_op_abandon( Operation *op, Slap
if ( so ) {
/* Is this really a Cancel exop? */
if ( op->o_tag != LDAP_REQ_ABANDON ) {
+ so->s_op->o_cancel = SLAP_CANCEL_ACK;
rs->sr_err = LDAP_CANCELLED;
send_ldap_result( so->s_op, rs );
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ slap_callback *cb;
+ cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx );
+ cb->sc_cleanup = syncprov_ab_cleanup;
+ cb->sc_next = op->o_callback;
+ cb->sc_private = so;
+ return SLAP_CB_CONTINUE;
+ }
}
- /* Our cloned searches have no ctrls set.
- * we don't want to muck with real search ops
- * from the frontend.
- */
- if ( ! so->s_op->o_sync )
- syncprov_drop_psearch( so, 0 );
+ syncprov_drop_psearch( so, 0 );
}
return SLAP_CB_CONTINUE;
}
@@ -920,16 +988,18 @@ syncprov_matchops( Operation *op, opcook
if ( op->o_tag != LDAP_REQ_ADD ) {
op->o_bd->bd_info = (BackendInfo *)on->on_info;
rc = be_entry_get_rw( op, fc.fdn, NULL, NULL, 0, &e );
+ /* If we're sending responses now, make a copy and unlock the DB */
+ if ( e && !saveit ) {
+ Entry *e2 = entry_dup( e );
+ be_entry_release_rw( op, e, 0 );
+ e = e2;
+ }
op->o_bd->bd_info = (BackendInfo *)on;
if ( rc ) return;
} else {
e = op->ora_e;
}
- /* Never replicate these */
- if ( is_entry_syncConsumerSubentry( e )) {
- goto done;
- }
if ( saveit ) {
ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
@@ -990,19 +1060,25 @@ syncprov_matchops( Operation *op, opcook
opc->smatches = sm;
} else {
/* if found send UPDATE else send ADD */
- syncprov_sendresp( op, opc, ss, e,
+ ss->s_inuse++;
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+ syncprov_sendresp( op, opc, ss, &e,
found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 );
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+ ss->s_inuse--;
}
} else if ( !saveit && found ) {
/* send DELETE */
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 );
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
}
}
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
done:
- if ( op->o_tag != LDAP_REQ_ADD ) {
+ if ( op->o_tag != LDAP_REQ_ADD && e ) {
op->o_bd->bd_info = (BackendInfo *)on->on_info;
- be_entry_release_r( op, e );
+ be_entry_release_rw( op, e, 0 );
op->o_bd->bd_info = (BackendInfo *)on;
}
if ( freefdn ) {
@@ -1030,7 +1106,6 @@ syncprov_op_cleanup( Operation *op, Slap
mtdummy.mt_op = op;
ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
- ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
if ( mt ) {
modinst *mi = mt->mt_mods;
@@ -1041,14 +1116,13 @@ syncprov_op_cleanup( Operation *op, Slap
mt->mt_op = mt->mt_mods->mi_op;
ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
} else {
- ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
avl_delete( &si->si_mods, mt, sp_avl_cmp );
- ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
ch_free( mt );
}
}
+ ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
if ( !BER_BVISNULL( &opc->suuid ))
op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );
if ( !BER_BVISNULL( &opc->sndn ))
@@ -1057,6 +1131,8 @@ syncprov_op_cleanup( Operation *op, Slap
op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
op->o_callback = cb->sc_next;
op->o_tmpfree(cb, op->o_tmpmemctx);
+
+ return 0;
}
static void
@@ -1066,8 +1142,8 @@ syncprov_checkpoint( Operation *op, Slap
Modifications mod;
Operation opm;
struct berval bv[2];
- BackendInfo *orig;
slap_callback cb = {0};
+ int manage = get_manageDSAit(op);
mod.sml_values = bv;
bv[1].bv_val = NULL;
@@ -1084,9 +1160,10 @@ syncprov_checkpoint( Operation *op, Slap
opm.orm_modlist = &mod;
opm.o_req_dn = op->o_bd->be_suffix[0];
opm.o_req_ndn = op->o_bd->be_nsuffix[0];
- orig = opm.o_bd->bd_info;
opm.o_bd->bd_info = on->on_info->oi_orig;
+ opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
opm.o_bd->be_modify( &opm, rs );
+ opm.o_managedsait = manage;
}
static void
@@ -1098,7 +1175,8 @@ syncprov_add_slog( Operation *op, struct
sessionlog *sl;
slog_entry *se;
- for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
+ sl = si->si_logs;
+ {
/* Allocate a record. UUIDs are not NUL-terminated. */
se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
csn->bv_len + 1 );
@@ -1194,6 +1272,10 @@ syncprov_playlog( Operation *op, SlapRep
ndel = i;
+ /* Zero out unused slots */
+ for ( i=ndel; i < num - nmods; i++ )
+ uuids[i].bv_len = 0;
+
/* Mods must be validated to see if they belong in this delete set.
*/
@@ -1222,7 +1304,11 @@ syncprov_playlog( Operation *op, SlapRep
SlapReply frs = { REP_RESULT };
int rc;
Filter mf, af;
+#ifdef LDAP_COMP_MATCH
+ AttributeAssertion eq = { NULL, BER_BVNULL, NULL };
+#else
AttributeAssertion eq;
+#endif
slap_callback cb = {0};
fop = *op;
@@ -1294,6 +1380,13 @@ syncprov_op_response( Operation *op, Sla
}
}
+ /* Don't do any processing for consumer contextCSN updates */
+ if ( SLAP_SYNC_SHADOW( op->o_bd ) &&
+ op->o_msgid == SLAP_SYNC_UPDATE_MSGID ) {
+ ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ return SLAP_CB_CONTINUE;
+ }
+
si->si_numops++;
if ( si->si_chkops || si->si_chktime ) {
int do_check=0;
@@ -1328,14 +1421,12 @@ syncprov_op_response( Operation *op, Sla
/* for each match in opc->smatches:
* send DELETE msg
*/
- ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
if ( sm->sm_op->s_op->o_abandon )
continue;
syncprov_sendresp( op, opc, sm->sm_op, NULL,
LDAP_SYNC_DELETE, 1 );
}
- ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
break;
}
}
@@ -1461,6 +1552,18 @@ syncprov_op_mod( Operation *op, SlapRepl
ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
ldap_pvt_thread_yield();
ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+
+ /* clean up if the caller is giving up */
+ if ( op->o_abandon ) {
+ modinst *m2;
+ for ( m2 = mt->mt_mods; m2->mi_next != mi;
+ m2 = m2->mi_next );
+ m2->mi_next = mi->mi_next;
+ if ( mt->mt_tail == mi ) mt->mt_tail = m2;
+ op->o_tmpfree( cb, op->o_tmpmemctx );
+ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+ return SLAPD_ABANDON;
+ }
}
ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
} else {
@@ -1477,7 +1580,6 @@ syncprov_op_mod( Operation *op, SlapRepl
if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
syncprov_matchops( op, opc, 1 );
-
return SLAP_CB_CONTINUE;
}
@@ -1581,6 +1683,7 @@ syncprov_detach_op( Operation *op, synco
op->o_conn->c_n_ops_executing++;
op->o_conn->c_n_ops_completed--;
LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next );
+ so->s_flags |= PS_IS_DETACHED;
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
}
@@ -1604,11 +1707,12 @@ syncprov_search_response( Operation *op,
Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 );
return SLAP_CB_CONTINUE;
}
- if ( srs->sr_state.ctxcsn ) {
+ if ( !BER_BVISNULL( &srs->sr_state.ctxcsn )) {
Attribute *a = attr_find( rs->sr_entry->e_attrs,
slap_schema.si_ad_entryCSN );
+
/* Don't send the ctx entry twice */
- if ( bvmatch( &a->a_nvals[0], srs->sr_state.ctxcsn ))
+ if ( a && bvmatch( &a->a_nvals[0], &srs->sr_state.ctxcsn ) )
return LDAP_SUCCESS;
}
rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
@@ -1621,7 +1725,7 @@ syncprov_search_response( Operation *op,
slap_compose_sync_cookie( op, &cookie,
&op->ors_filter->f_and->f_ava->aa_value,
- srs->sr_state.sid, srs->sr_state.rid );
+ srs->sr_state.rid );
/* Is this a regular refresh? */
if ( !ss->ss_so ) {
@@ -1639,41 +1743,9 @@ syncprov_search_response( Operation *op,
&cookie, 1, NULL, 0 );
/* Flush any queued persist messages */
if ( ss->ss_so->s_res ) {
- syncres *sr, *srnext;
- Entry *e;
- opcookie opc;
-
- opc.son = on;
ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
locked = 1;
- for (sr = ss->ss_so->s_res; sr; sr=srnext) {
- int rc = LDAP_SUCCESS;
- srnext = sr->s_next;
- opc.sdn = sr->s_dn;
- opc.sndn = sr->s_ndn;
- opc.suuid = sr->s_uuid;
- opc.sctxcsn = sr->s_csn;
- opc.sreference = sr->s_isreference;
- e = NULL;
-
- if ( sr->s_mode != LDAP_SYNC_DELETE ) {
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
- rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
- op->o_bd->bd_info = (BackendInfo *)on;
- }
- if ( rc == LDAP_SUCCESS )
- syncprov_sendresp( op, &opc, ss->ss_so, e,
- sr->s_mode, 0 );
-
- if ( e ) {
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
- be_entry_release_r( op, e );
- op->o_bd->bd_info = (BackendInfo *)on;
- }
- ch_free( sr );
- }
- ss->ss_so->s_res = NULL;
- ss->ss_so->s_restail = NULL;
+ syncprov_qplay( op, on, ss->ss_so );
}
/* Turn off the refreshing flag */
@@ -1713,6 +1785,7 @@ syncprov_op_search( Operation *op, SlapR
}
srs = op->o_controls[slap_cids.sc_LDAPsync];
+ op->o_managedsait = SLAP_CONTROL_NONCRITICAL;
/* If this is a persistent search, set it up right away */
if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) {
@@ -1741,7 +1814,6 @@ syncprov_op_search( Operation *op, SlapR
sop = ch_malloc( sizeof( syncops ));
*sop = so;
ldap_pvt_thread_mutex_init( &sop->s_mutex );
- sop->s_sid = srs->sr_state.sid;
sop->s_rid = srs->sr_state.rid;
sop->s_inuse = 1;
@@ -1759,13 +1831,13 @@ syncprov_op_search( Operation *op, SlapR
ctxcsn.bv_val = csnbuf;
/* If we have a cookie, handle the PRESENT lookups */
- if ( srs->sr_state.ctxcsn ) {
+ if ( !BER_BVISNULL( &srs->sr_state.ctxcsn )) {
sessionlog *sl;
/* The cookie was validated when it was parsed, just use it */
/* If just Refreshing and nothing has changed, shortcut it */
- if ( bvmatch( srs->sr_state.ctxcsn, &ctxcsn )) {
+ if ( bvmatch( &srs->sr_state.ctxcsn, &ctxcsn )) {
nochange = 1;
if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
LDAPControl *ctrls[2];
@@ -1783,14 +1855,13 @@ syncprov_op_search( Operation *op, SlapR
goto shortcut;
}
/* Do we have a sessionlog for this search? */
- for ( sl=si->si_logs; sl; sl=sl->sl_next )
- if ( sl->sl_sid == srs->sr_state.sid ) break;
+ sl=si->si_logs;
if ( sl ) {
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
- if ( ber_bvcmp( srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
+ if ( ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
do_present = 0;
/* mutex is unlocked in playlog */
- syncprov_playlog( op, rs, sl, srs->sr_state.ctxcsn, &ctxcsn );
+ syncprov_playlog( op, rs, sl, &srs->sr_state.ctxcsn, &ctxcsn );
} else {
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
}
@@ -1830,6 +1901,9 @@ shortcut:
fava->f_choice = LDAP_FILTER_LE;
fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
+#ifdef LDAP_COMP_MATCH
+ fava->f_ava->aa_cf = NULL;
+#endif
ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx );
fand->f_and = fava;
if ( gotstate ) {
@@ -1838,7 +1912,10 @@ shortcut:
fava->f_choice = LDAP_FILTER_GE;
fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
- ber_dupbv_x( &fava->f_ava->aa_value, srs->sr_state.ctxcsn, op->o_tmpmemctx );
+#ifdef LDAP_COMP_MATCH
+ fava->f_ava->aa_cf = NULL;
+#endif
+ ber_dupbv_x( &fava->f_ava->aa_value, &srs->sr_state.ctxcsn, op->o_tmpmemctx );
}
fava->f_next = op->ors_filter;
op->ors_filter = fand;
@@ -1918,72 +1995,109 @@ syncprov_operational(
return SLAP_CB_CONTINUE;
}
+enum {
+ SP_CHKPT = 1,
+ SP_SESSL
+};
+
+static ConfigDriver sp_cf_gen;
+
+static ConfigTable spcfg[] = {
+ { "syncprov-checkpoint", "ops> bd_info;
+ slap_overinst *on = (slap_overinst *)c->bi;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+ int rc = 0;
- if ( strcasecmp( argv[ 0 ], "syncprov-checkpoint" ) == 0 ) {
- if ( argc != 3 ) {
- fprintf( stderr, "%s: line %d: wrong number of arguments in "
- "\"syncprov-checkpoint \"\n", fname, lineno );
- return -1;
+ if ( c->op == SLAP_CONFIG_EMIT ) {
+ switch ( c->type ) {
+ case SP_CHKPT:
+ if ( si->si_chkops || si->si_chktime ) {
+ struct berval bv;
+ bv.bv_len = sprintf( c->msg, "%d %d",
+ si->si_chkops, si->si_chktime );
+ bv.bv_val = c->msg;
+ value_add_one( &c->rvalue_vals, &bv );
+ } else {
+ rc = 1;
+ }
+ break;
+ case SP_SESSL:
+ if ( si->si_logs ) {
+ c->value_int = si->si_logs->sl_size;
+ } else {
+ rc = 1;
+ }
+ break;
}
- si->si_chkops = atoi( argv[1] );
- si->si_chktime = atoi( argv[2] ) * 60;
- return 0;
-
- } else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) {
- sessionlog *sl;
- int sid, size;
- if ( argc != 3 ) {
- fprintf( stderr, "%s: line %d: wrong number of arguments in "
- "\"syncprov-sessionlog \"\n", fname, lineno );
- return -1;
- }
- sid = atoi( argv[1] );
- if ( sid < 0 || sid > 999 ) {
- fprintf( stderr,
- "%s: line %d: session log id %d is out of range [0..999]\n",
- fname, lineno, sid );
- return -1;
+ return rc;
+ } else if ( c->op == LDAP_MOD_DELETE ) {
+ switch ( c->type ) {
+ case SP_CHKPT:
+ si->si_chkops = 0;
+ si->si_chktime = 0;
+ break;
+ case SP_SESSL:
+ if ( si->si_logs )
+ si->si_logs->sl_size = 0;
+ else
+ rc = LDAP_NO_SUCH_ATTRIBUTE;
+ break;
}
- size = atoi( argv[2] );
+ return rc;
+ }
+ switch ( c->type ) {
+ case SP_CHKPT:
+ si->si_chkops = atoi( c->argv[1] );
+ si->si_chktime = atoi( c->argv[2] ) * 60;
+ break;
+ case SP_SESSL: {
+ sessionlog *sl;
+ int size = c->value_int;
+
if ( size < 0 ) {
- fprintf( stderr,
- "%s: line %d: session log size %d is negative\n",
- fname, lineno, size );
- return -1;
- }
- for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
- if ( sl->sl_sid == sid ) {
- sl->sl_size = size;
- break;
- }
+ sprintf( c->msg, "%s size %d is negative",
+ c->argv[0], size );
+ Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ return ARG_BAD_CONF;
}
+ sl = si->si_logs;
if ( !sl ) {
sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
sl->sl_mincsn.bv_val = (char *)(sl+1);
sl->sl_mincsn.bv_len = 0;
- sl->sl_sid = sid;
- sl->sl_size = size;
sl->sl_num = 0;
sl->sl_head = sl->sl_tail = NULL;
- sl->sl_next = si->si_logs;
ldap_pvt_thread_mutex_init( &sl->sl_mutex );
si->si_logs = sl;
}
- return 0;
+ sl->sl_size = size;
+ }
+ break;
}
-
- return SLAP_CONF_UNKNOWN;
+ return rc;
}
/* Cheating - we have no thread pool context for these functions,
@@ -2019,6 +2133,15 @@ syncprov_db_open(
Attribute *a;
int rc;
+ if ( slapMode & SLAP_TOOL_MODE ) {
+ return 0;
+ }
+
+ rc = overlay_register_control( be, LDAP_CONTROL_SYNC );
+ if ( rc ) {
+ return rc;
+ }
+
connection_fake_init( &conn, op, thrctx );
op->o_bd = be;
op->o_dn = be->be_rootdn;
@@ -2041,12 +2164,18 @@ syncprov_db_open(
si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0';
strcpy( ctxcsnbuf, si->si_ctxcsnbuf );
}
- be_entry_release_r( op, e );
+ be_entry_release_rw( op, e, 0 );
op->o_bd->bd_info = (BackendInfo *)on;
op->o_req_dn = be->be_suffix[0];
op->o_req_ndn = be->be_nsuffix[0];
op->ors_scope = LDAP_SCOPE_SUBTREE;
syncprov_findcsn( op, FIND_MAXCSN );
+ } else if ( SLAP_SYNC_SHADOW( op->o_bd )) {
+ /* If we're also a consumer, and we didn't find the context entry,
+ * then don't generate anything, wait for our provider to send it
+ * to us.
+ */
+ goto out;
}
if ( BER_BVISEMPTY( &si->si_ctxcsn ) ) {
@@ -2062,6 +2191,7 @@ syncprov_db_open(
syncprov_checkpoint( op, &rs, on );
}
+out:
op->o_bd->bd_info = (BackendInfo *)on;
return 0;
}
@@ -2077,6 +2207,9 @@ syncprov_db_close(
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
int i;
+ if ( slapMode & SLAP_TOOL_MODE ) {
+ return 0;
+ }
if ( si->si_numops ) {
Connection conn;
char opbuf[OPERATION_BUFFER_SIZE];
@@ -2224,7 +2357,7 @@ static int syncprov_parseCtrl (
sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx );
sr->sr_rhint = rhint;
if (!BER_BVISNULL(&cookie)) {
- ber_bvarray_add( &sr->sr_state.octet_str, &cookie );
+ ber_dupbv( &sr->sr_state.octet_str, &cookie );
slap_parse_sync_cookie( &sr->sr_state );
}
@@ -2257,13 +2390,13 @@ syncprov_init()
SLAP_CTRL_HIDE|SLAP_CTRL_SEARCH, NULL,
syncprov_parseCtrl, &slap_cids.sc_LDAPsync );
if ( rc != LDAP_SUCCESS ) {
- fprintf( stderr, "Failed to register control %d\n", rc );
+ Debug( LDAP_DEBUG_ANY,
+ "syncprov_init: Failed to register control %d\n", rc, 0, 0 );
return rc;
}
syncprov.on_bi.bi_type = "syncprov";
syncprov.on_bi.bi_db_init = syncprov_db_init;
- syncprov.on_bi.bi_db_config = syncprov_db_config;
syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
syncprov.on_bi.bi_db_open = syncprov_db_open;
syncprov.on_bi.bi_db_close = syncprov_db_close;
@@ -2280,6 +2413,11 @@ syncprov_init()
syncprov.on_bi.bi_extended = syncprov_op_extended;
syncprov.on_bi.bi_operational = syncprov_operational;
+ syncprov.on_bi.bi_cf_ocs = spocs;
+
+ rc = config_register_schema( spcfg, spocs );
+ if ( rc ) return rc;
+
return overlay_register( &syncprov );
}