--- servers/slapd/overlays/syncprov.c 2005/03/28 22:12:05 1.72
+++ servers/slapd/overlays/syncprov.c 2005/12/07 11:09:00 1.135
@@ -1,4 +1,4 @@
-/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.71 2005/03/13 23:12:48 hyc Exp $ */
+/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.134 2005/12/07 08:51:12 ando Exp $ */
/* syncprov.c - syncrepl provider */
/* This work is part of OpenLDAP Software .
*
@@ -25,6 +25,8 @@
#include
#include "lutil.h"
#include "slap.h"
+#include "config.h"
+#include "ldap_rq.h"
/* A modify request on a particular entry */
typedef struct modinst {
@@ -59,9 +61,16 @@ typedef struct syncops {
int s_rid;
struct berval s_filterstr;
int s_flags; /* search status */
+#define PS_IS_REFRESHING 0x01
+#define PS_IS_DETACHED 0x02
+#define PS_WROTE_BASE 0x04
+#define PS_FIND_BASE 0x08
+
int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
+ struct re_s *s_qtask; /* task for playing psearch responses */
+#define RUNQ_INTERVAL 36000 /* a long time */
ldap_pvt_thread_mutex_t s_mutex;
} syncops;
@@ -82,9 +91,6 @@ typedef struct sync_control {
#define SLAP_SYNC_PERSIST (LDAP_SYNC_RESERVED<o_tmpmemctx );
-
- ctrls[num_ctrls] = ch_malloc ( sizeof ( LDAPControl ) );
-
- entryuuid_bv = slog_e->sl_uuid;
-
- if ( send_cookie && cookie ) {
- ber_printf( ber, "{eOON}",
- entry_sync_state, &entryuuid_bv, cookie );
- } else {
- ber_printf( ber, "{eON}",
- entry_sync_state, &entryuuid_bv );
- }
-
- ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_STATE;
- ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
- ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 );
-
- ber_free_buf( ber );
-
- if ( ret < 0 ) {
- Debug( LDAP_DEBUG_TRACE,
- "slap_build_sync_ctrl: ber_flatten2 failed\n",
- 0, 0, 0 );
- send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
- return ret;
- }
-
- return LDAP_SUCCESS;
-}
-#endif
-
static int
syncprov_sendinfo(
Operation *op,
@@ -414,36 +366,15 @@ findbase_cb( Operation *op, SlapReply *r
* Just store whatever we got.
*/
if ( fc->fss->s_eid == NOID ) {
- fc->fbase = 1;
+ fc->fbase = 2;
fc->fss->s_eid = rs->sr_entry->e_id;
ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname );
} else if ( rs->sr_entry->e_id == fc->fss->s_eid &&
dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) {
- /* OK, the DN is the same and the entryID is the same. Now
- * see if the fdn resides in the scope.
- */
+ /* OK, the DN is the same and the entryID is the same. */
fc->fbase = 1;
- switch ( fc->fss->s_op->ors_scope ) {
- case LDAP_SCOPE_BASE:
- fc->fscope = dn_match( fc->fdn, &rs->sr_entry->e_nname );
- break;
- case LDAP_SCOPE_ONELEVEL: {
- struct berval pdn;
- dnParent( fc->fdn, &pdn );
- fc->fscope = dn_match( &pdn, &rs->sr_entry->e_nname );
- break; }
- case LDAP_SCOPE_SUBTREE:
- fc->fscope = dnIsSuffix( fc->fdn, &rs->sr_entry->e_nname );
- break;
-#ifdef LDAP_SCOPE_SUBORDINATE
- case LDAP_SCOPE_SUBORDINATE:
- fc->fscope = dnIsSuffix( fc->fdn, &rs->sr_entry->e_nname ) &&
- !dn_match( fc->fdn, &rs->sr_entry->e_nname );
- break;
-#endif
- }
}
}
if ( rs->sr_err != LDAP_SUCCESS ) {
@@ -452,43 +383,82 @@ findbase_cb( Operation *op, SlapReply *r
return LDAP_SUCCESS;
}
+static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL };
+static struct berval generic_filterstr = BER_BVC("(objectclass=*)");
+
static int
syncprov_findbase( Operation *op, fbase_cookie *fc )
{
opcookie *opc = op->o_callback->sc_private;
slap_overinst *on = opc->son;
- syncprov_info_t *si = on->on_bi.bi_private;
- slap_callback cb = {0};
- Operation fop;
- SlapReply frs = { REP_RESULT };
- int rc;
+ /* Use basic parameters from syncrepl search, but use
+ * current op's threadctx / tmpmemctx
+ */
+ ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex );
+ if ( fc->fss->s_flags & PS_FIND_BASE ) {
+ slap_callback cb = {0};
+ Operation fop;
+ SlapReply frs = { REP_RESULT };
+ int rc;
- fop = *op;
+ fc->fss->s_flags ^= PS_FIND_BASE;
+ ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
- cb.sc_response = findbase_cb;
- cb.sc_private = fc;
+ fop = *fc->fss->s_op;
- fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync mode */
- fop.o_callback = &cb;
- fop.o_tag = LDAP_REQ_SEARCH;
- fop.ors_scope = LDAP_SCOPE_BASE;
- fop.ors_deref = fc->fss->s_op->ors_deref;
- fop.ors_limit = NULL;
- fop.ors_slimit = 1;
- fop.ors_tlimit = SLAP_NO_LIMIT;
- fop.ors_attrs = slap_anlist_no_attrs;
- fop.ors_attrsonly = 1;
- fop.ors_filter = fc->fss->s_op->ors_filter;
- fop.ors_filterstr = fc->fss->s_op->ors_filterstr;
+ fop.o_hdr = op->o_hdr;
+ fop.o_bd = op->o_bd;
+ fop.o_time = op->o_time;
+ fop.o_tincr = op->o_tincr;
- fop.o_req_ndn = fc->fss->s_op->o_req_ndn;
+ cb.sc_response = findbase_cb;
+ cb.sc_private = fc;
- fop.o_bd->bd_info = on->on_info->oi_orig;
- rc = fop.o_bd->be_search( &fop, &frs );
- fop.o_bd->bd_info = (BackendInfo *)on;
+ fop.o_sync_mode = 0; /* turn off sync mode */
+ fop.o_managedsait = SLAP_CONTROL_CRITICAL;
+ fop.o_callback = &cb;
+ fop.o_tag = LDAP_REQ_SEARCH;
+ fop.ors_scope = LDAP_SCOPE_BASE;
+ fop.ors_limit = NULL;
+ fop.ors_slimit = 1;
+ fop.ors_tlimit = SLAP_NO_LIMIT;
+ fop.ors_attrs = slap_anlist_no_attrs;
+ fop.ors_attrsonly = 1;
+ fop.ors_filter = &generic_filter;
+ fop.ors_filterstr = generic_filterstr;
+
+ fop.o_bd->bd_info = on->on_info->oi_orig;
+ rc = fop.o_bd->be_search( &fop, &frs );
+ fop.o_bd->bd_info = (BackendInfo *)on;
+ } else {
+ ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
+ fc->fbase = 1;
+ }
+
+ /* After the first call, see if the fdn resides in the scope */
+ if ( fc->fbase == 1 ) {
+ switch ( fc->fss->s_op->ors_scope ) {
+ case LDAP_SCOPE_BASE:
+ fc->fscope = dn_match( fc->fdn, &fc->fss->s_base );
+ break;
+ case LDAP_SCOPE_ONELEVEL: {
+ struct berval pdn;
+ dnParent( fc->fdn, &pdn );
+ fc->fscope = dn_match( &pdn, &fc->fss->s_base );
+ break; }
+ case LDAP_SCOPE_SUBTREE:
+ fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base );
+ break;
+ case LDAP_SCOPE_SUBORDINATE:
+ fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) &&
+ !dn_match( fc->fdn, &fc->fss->s_base );
+ break;
+ }
+ }
- if ( fc->fbase ) return LDAP_SUCCESS;
+ if ( fc->fbase )
+ return LDAP_SUCCESS;
/* If entryID has changed, then the base of this search has
* changed. Invalidate the psearch.
@@ -504,16 +474,18 @@ syncprov_findbase( Operation *op, fbase_
* was not checkpointed at the previous shutdown.
*
* 2: when the current contextCSN is known and we have a sync cookie, we search
- * for one entry with CSN <= the cookie CSN. (Used to search for =.) If an
- * entry is found, the cookie CSN is valid, otherwise it is stale.
+ * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN.
+ * If an entry is found, the cookie CSN is valid, otherwise it is stale.
*
* 3: during a refresh phase, we search for all entries with CSN <= the cookie
* CSN, and generate Present records for them. We always collect this result
* in SyncID sets, even if there's only one match.
*/
-#define FIND_MAXCSN 1
-#define FIND_CSN 2
-#define FIND_PRESENT 3
+typedef enum find_csn_t {
+ FIND_MAXCSN = 1,
+ FIND_CSN = 2,
+ FIND_PRESENT = 3
+} find_csn_t;
static int
findmax_cb( Operation *op, SlapReply *rs )
@@ -536,7 +508,11 @@ findcsn_cb( Operation *op, SlapReply *rs
{
slap_callback *sc = op->o_callback;
- if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
+ /* We just want to know that at least one exists, so it's OK if
+ * we exceed the unchecked limit.
+ */
+ if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED ||
+ (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) {
sc->sc_private = (void *)1;
}
return LDAP_SUCCESS;
@@ -592,7 +568,7 @@ findpres_cb( Operation *op, SlapReply *r
}
static int
-syncprov_findcsn( Operation *op, int mode )
+syncprov_findcsn( Operation *op, find_csn_t mode )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = on->on_bi.bi_private;
@@ -602,16 +578,17 @@ syncprov_findcsn( Operation *op, int mod
SlapReply frs = { REP_RESULT };
char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
- struct berval fbuf, maxcsn;
+ struct berval maxcsn;
Filter cf, af;
#ifdef LDAP_COMP_MATCH
AttributeAssertion eq = { NULL, BER_BVNULL, NULL };
#else
AttributeAssertion eq = { NULL, BER_BVNULL };
#endif
- int i, rc = LDAP_SUCCESS;
fpres_cookie pcookie;
sync_control *srs = NULL;
+ struct slap_limits_set fc_limits;
+ int i, rc = LDAP_SUCCESS, findcsn_retry = 1;
if ( mode != FIND_MAXCSN ) {
srs = op->o_controls[slap_cids.sc_LDAPsync];
@@ -623,8 +600,9 @@ syncprov_findcsn( Operation *op, int mod
fop = *op;
fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */
+ /* We want pure entries, not referrals */
+ fop.o_managedsait = SLAP_CONTROL_CRITICAL;
- fbuf.bv_val = buf;
cf.f_ava = &eq;
cf.f_av_desc = slap_schema.si_ad_entryCSN;
cf.f_next = NULL;
@@ -633,13 +611,14 @@ syncprov_findcsn( Operation *op, int mod
fop.ors_limit = NULL;
fop.ors_tlimit = SLAP_NO_LIMIT;
fop.ors_filter = &cf;
- fop.ors_filterstr = fbuf;
+ fop.ors_filterstr.bv_val = buf;
+again:
switch( mode ) {
case FIND_MAXCSN:
cf.f_choice = LDAP_FILTER_GE;
cf.f_av_value = si->si_ctxcsn;
- fbuf.bv_len = sprintf( buf, "(entryCSN>=%s)",
+ fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN>=%s)",
cf.f_av_value.bv_val );
fop.ors_attrsonly = 0;
fop.ors_attrs = csn_anlist;
@@ -651,10 +630,20 @@ syncprov_findcsn( Operation *op, int mod
maxcsn.bv_len = si->si_ctxcsn.bv_len;
break;
case FIND_CSN:
- cf.f_choice = LDAP_FILTER_LE;
cf.f_av_value = srs->sr_state.ctxcsn;
- fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)",
- cf.f_av_value.bv_val );
+ /* Look for exact match the first time */
+ if ( findcsn_retry ) {
+ cf.f_choice = LDAP_FILTER_EQUALITY;
+ fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN=%s)",
+ cf.f_av_value.bv_val );
+ /* On retry, look for <= */
+ } else {
+ cf.f_choice = LDAP_FILTER_LE;
+ fop.ors_limit = &fc_limits;
+ fc_limits.lms_s_unchecked = 1;
+ fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN<=%s)",
+ cf.f_av_value.bv_val );
+ }
fop.ors_attrsonly = 1;
fop.ors_attrs = slap_anlist_no_attrs;
fop.ors_slimit = 1;
@@ -673,8 +662,6 @@ syncprov_findcsn( Operation *op, int mod
fop.ors_attrsonly = 0;
fop.ors_attrs = uuid_anlist;
fop.ors_slimit = SLAP_NO_LIMIT;
- /* We want pure entries, not referrals */
- fop.o_managedsait = SLAP_CONTROL_CRITICAL;
cb.sc_private = &pcookie;
cb.sc_response = findpres_cb;
pcookie.num = 0;
@@ -704,7 +691,14 @@ syncprov_findcsn( Operation *op, int mod
break;
case FIND_CSN:
/* If matching CSN was not found, invalidate the context. */
- if ( !cb.sc_private ) rc = LDAP_NO_SUCH_OBJECT;
+ if ( !cb.sc_private ) {
+ /* If we didn't find an exact match, then try for <= */
+ if ( findcsn_retry ) {
+ findcsn_retry = 0;
+ goto again;
+ }
+ rc = LDAP_NO_SUCH_OBJECT;
+ }
break;
case FIND_PRESENT:
op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx );
@@ -715,61 +709,50 @@ syncprov_findcsn( Operation *op, int mod
return rc;
}
-/* Queue a persistent search response if still in Refresh stage */
-static int
-syncprov_qresp( opcookie *opc, syncops *so, int mode )
+static void
+syncprov_free_syncop( syncops *so )
{
- syncres *sr;
-
- sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
- opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
- sr->s_next = NULL;
- sr->s_dn.bv_val = (char *)(sr + 1);
- sr->s_mode = mode;
- sr->s_isreference = opc->sreference;
- sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val );
- *(sr->s_ndn.bv_val++) = '\0';
- sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val );
- *(sr->s_uuid.bv_val++) = '\0';
- sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val );
+ syncres *sr, *srnext;
+ GroupAssertion *ga, *gnext;
- if ( !so->s_res ) {
- so->s_res = sr;
- } else {
- so->s_restail->s_next = sr;
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ if ( --so->s_inuse > 0 ) {
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ return;
}
- so->s_restail = sr;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- return LDAP_SUCCESS;
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ filter_free( so->s_op->ors_filter );
+ for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
+ gnext = ga->ga_next;
+ ch_free( ga );
+ }
+ ch_free( so->s_op );
+ }
+ ch_free( so->s_base.bv_val );
+ for ( sr=so->s_res; sr; sr=srnext ) {
+ srnext = sr->s_next;
+ ch_free( sr );
+ }
+ ldap_pvt_thread_mutex_destroy( &so->s_mutex );
+ ch_free( so );
}
/* Send a persistent search response */
static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode, int queue )
+syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
+ Entry **e, int mode )
{
slap_overinst *on = opc->son;
- syncprov_info_t *si = on->on_bi.bi_private;
SlapReply rs = { REP_SEARCH };
LDAPControl *ctrls[2];
struct berval cookie;
Entry e_uuid = {0};
Attribute a_uuid = {0};
- Operation sop = *so->s_op;
- Opheader ohdr;
- ohdr = *sop.o_hdr;
- sop.o_hdr = &ohdr;
- sop.o_tmpmemctx = op->o_tmpmemctx;
- sop.o_bd = op->o_bd;
- sop.o_controls = op->o_controls;
-
- if ( queue && (so->s_flags & PS_IS_REFRESHING) ) {
- ldap_pvt_thread_mutex_lock( &so->s_mutex );
- if ( so->s_flags & PS_IS_REFRESHING )
- return syncprov_qresp( opc, so, mode );
- ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- }
+ if ( so->s_op->o_abandon )
+ return SLAPD_ABANDON;
ctrls[1] = NULL;
slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid );
@@ -777,23 +760,34 @@ syncprov_sendresp( Operation *op, opcook
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
a_uuid.a_nvals = &opc->suuid;
- rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid,
+ rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
mode, ctrls, 0, 1, &cookie );
+ op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
- rs.sr_entry = e;
rs.sr_ctrls = ctrls;
+ op->o_bd->bd_info = (BackendInfo *)on->on_info;
switch( mode ) {
case LDAP_SYNC_ADD:
+ rs.sr_entry = *e;
+ if ( rs.sr_entry->e_private )
+ rs.sr_flags = REP_ENTRY_MUSTRELEASE;
if ( opc->sreference ) {
- rs.sr_ref = get_entry_referrals( &sop, e );
- send_search_reference( &sop, &rs );
+ rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
+ send_search_reference( op, &rs );
ber_bvarray_free( rs.sr_ref );
+ if ( !rs.sr_entry )
+ *e = NULL;
break;
}
/* fallthru */
case LDAP_SYNC_MODIFY:
- rs.sr_attrs = sop.ors_attrs;
- send_search_entry( &sop, &rs );
+ rs.sr_entry = *e;
+ if ( rs.sr_entry->e_private )
+ rs.sr_flags = REP_ENTRY_MUSTRELEASE;
+ rs.sr_attrs = op->ors_attrs;
+ send_search_entry( op, &rs );
+ if ( !rs.sr_entry )
+ *e = NULL;
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
@@ -803,47 +797,177 @@ syncprov_sendresp( Operation *op, opcook
if ( opc->sreference ) {
struct berval bv = BER_BVNULL;
rs.sr_ref = &bv;
- send_search_reference( &sop, &rs );
+ send_search_reference( op, &rs );
} else {
- send_search_entry( &sop, &rs );
+ send_search_entry( op, &rs );
}
break;
default:
assert(0);
}
- op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
- rs.sr_ctrls = NULL;
+ /* In case someone else freed it already? */
+ if ( rs.sr_ctrls ) {
+ op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
+ rs.sr_ctrls = NULL;
+ }
+
return rs.sr_err;
}
-static void
-syncprov_free_syncop( syncops *so )
+/* Play back queued responses */
+static int
+syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
{
- syncres *sr, *srnext;
- GroupAssertion *ga, *gnext;
+ syncres *sr;
+ Entry *e;
+ opcookie opc;
+ int rc;
- ldap_pvt_thread_mutex_lock( &so->s_mutex );
- so->s_inuse--;
- if ( so->s_inuse > 0 ) {
+ opc.son = on;
+ op->o_bd->bd_info = (BackendInfo *)on->on_info;
+
+ for (;;) {
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ sr = so->s_res;
+ if ( sr )
+ so->s_res = sr->s_next;
+ if ( !so->s_res )
+ so->s_restail = NULL;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- return;
+
+ if ( !sr || so->s_op->o_abandon )
+ break;
+
+ opc.sdn = sr->s_dn;
+ opc.sndn = sr->s_ndn;
+ opc.suuid = sr->s_uuid;
+ opc.sctxcsn = sr->s_csn;
+ opc.sreference = sr->s_isreference;
+ e = NULL;
+
+ if ( sr->s_mode != LDAP_SYNC_DELETE ) {
+ rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
+ if ( rc ) {
+ ch_free( sr );
+ continue;
+ }
+ }
+ rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
+
+ if ( e ) {
+ be_entry_release_rw( op, e, 0 );
+ }
+
+ ch_free( sr );
+
+ if ( rc )
+ break;
+ }
+ op->o_bd->bd_info = (BackendInfo *)on;
+ return rc;
+}
+
+/* runqueue task for playing back queued responses */
+static void *
+syncprov_qtask( void *ctx, void *arg )
+{
+ struct re_s *rtask = arg;
+ syncops *so = rtask->arg;
+ slap_overinst *on = so->s_op->o_private;
+ OperationBuffer opbuf;
+ Operation *op;
+ BackendDB be;
+
+ op = (Operation *) &opbuf;
+ *op = *so->s_op;
+ op->o_hdr = (Opheader *)(op+1);
+ op->o_controls = (void **)(op->o_hdr+1);
+ memset( op->o_controls, 0, SLAP_MAX_CIDS * sizeof(void *));
+
+ *op->o_hdr = *so->s_op->o_hdr;
+
+ op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx);
+ op->o_tmpmfuncs = &slap_sl_mfuncs;
+ op->o_threadctx = ctx;
+
+ /* syncprov_qplay expects a fake db */
+ be = *so->s_op->o_bd;
+ be.be_flags |= SLAP_DBFLAG_OVERLAY;
+ op->o_bd = &be;
+ op->o_private = NULL;
+ op->o_callback = NULL;
+
+ syncprov_qplay( op, on, so );
+
+ /* decrement use count... */
+ syncprov_free_syncop( so );
+
+ /* wait until we get explicitly scheduled again */
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 );
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
+ return NULL;
+}
+
+/* Queue a persistent search response */
+static int
+syncprov_qresp( opcookie *opc, syncops *so, int mode )
+{
+ syncres *sr;
+
+ sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
+ opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
+ sr->s_next = NULL;
+ sr->s_dn.bv_val = (char *)(sr + 1);
+ sr->s_dn.bv_len = opc->sdn.bv_len;
+ sr->s_mode = mode;
+ sr->s_isreference = opc->sreference;
+ sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val,
+ opc->sdn.bv_val ) + 1;
+ sr->s_ndn.bv_len = opc->sndn.bv_len;
+ sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val,
+ opc->sndn.bv_val ) + 1;
+ sr->s_uuid.bv_len = opc->suuid.bv_len;
+ AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
+ sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1;
+ sr->s_csn.bv_len = opc->sctxcsn.bv_len;
+ strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
+
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ if ( !so->s_res ) {
+ so->s_res = sr;
+ } else {
+ so->s_restail->s_next = sr;
+ }
+ so->s_restail = sr;
+
+ /* If the base of the psearch was modified, check it next time round */
+ if ( so->s_flags & PS_WROTE_BASE ) {
+ so->s_flags ^= PS_WROTE_BASE;
+ so->s_flags |= PS_FIND_BASE;
}
- ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( so->s_flags & PS_IS_DETACHED ) {
- filter_free( so->s_op->ors_filter );
- for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
- gnext = ga->ga_next;
- ch_free( ga );
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ if ( !so->s_qtask ) {
+ so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
+ syncprov_qtask, so, "syncprov_qtask",
+ so->s_op->o_conn->c_peer_name.bv_val );
+ ++so->s_inuse;
+ } else {
+ if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
+ !so->s_qtask->next_sched.tv_sec ) {
+ so->s_qtask->interval.tv_sec = 0;
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
+ so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
+ ++so->s_inuse;
+ }
}
- ch_free( so->s_op );
- }
- ch_free( so->s_base.bv_val );
- for ( sr=so->s_res; sr; sr=srnext ) {
- srnext = sr->s_next;
- ch_free( sr );
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
- ldap_pvt_thread_mutex_destroy( &so->s_mutex );
- ch_free( so );
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ return LDAP_SUCCESS;
}
static int
@@ -940,21 +1064,30 @@ syncprov_matchops( Operation *op, opcook
if ( op->o_tag != LDAP_REQ_ADD ) {
op->o_bd->bd_info = (BackendInfo *)on->on_info;
rc = be_entry_get_rw( op, fc.fdn, NULL, NULL, 0, &e );
+ /* If we're sending responses now, make a copy and unlock the DB */
+ if ( e && !saveit ) {
+ Entry *e2 = entry_dup( e );
+ be_entry_release_rw( op, e, 0 );
+ e = e2;
+ }
op->o_bd->bd_info = (BackendInfo *)on;
if ( rc ) return;
} else {
e = op->ora_e;
}
- if ( saveit ) {
+ if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
opc->sreference = is_entry_referral( e );
- }
- if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
if ( a )
ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
+ } else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
+ op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
+ op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
+ ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
+ ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
}
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
@@ -978,12 +1111,22 @@ syncprov_matchops( Operation *op, opcook
"search base has changed" );
sprev->s_next = snext;
syncprov_drop_psearch( ss, 1 );
+ ss = sprev;
continue;
}
+
/* If we're sending results now, look for this op in old matches */
if ( !saveit ) {
syncmatches *old;
+
+ /* Did we modify the search base? */
+ if ( dn_match( &op->o_req_ndn, &ss->s_base )) {
+ ldap_pvt_thread_mutex_lock( &ss->s_mutex );
+ ss->s_flags |= PS_WROTE_BASE;
+ ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
+ }
+
for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm;
old=sm, sm=sm->sm_next ) {
if ( sm->sm_op == ss ) {
@@ -1002,21 +1145,23 @@ syncprov_matchops( Operation *op, opcook
sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
sm->sm_next = opc->smatches;
sm->sm_op = ss;
- ss->s_inuse++;
+ ldap_pvt_thread_mutex_lock( &ss->s_mutex );
+ ++ss->s_inuse;
+ ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
opc->smatches = sm;
} else {
/* if found send UPDATE else send ADD */
- syncprov_sendresp( op, opc, ss, e,
- found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 );
+ syncprov_qresp( opc, ss,
+ found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
}
} else if ( !saveit && found ) {
/* send DELETE */
- syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 );
+ syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
}
}
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
-done:
- if ( op->o_tag != LDAP_REQ_ADD ) {
+
+ if ( op->o_tag != LDAP_REQ_ADD && e ) {
op->o_bd->bd_info = (BackendInfo *)on->on_info;
be_entry_release_rw( op, e, 0 );
op->o_bd->bd_info = (BackendInfo *)on;
@@ -1046,7 +1191,6 @@ syncprov_op_cleanup( Operation *op, Slap
mtdummy.mt_op = op;
ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
- ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
if ( mt ) {
modinst *mi = mt->mt_mods;
@@ -1057,14 +1201,13 @@ syncprov_op_cleanup( Operation *op, Slap
mt->mt_op = mt->mt_mods->mi_op;
ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
} else {
- ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
avl_delete( &si->si_mods, mt, sp_avl_cmp );
- ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
ch_free( mt );
}
}
+ ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
if ( !BER_BVISNULL( &opc->suuid ))
op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );
if ( !BER_BVISNULL( &opc->sndn ))
@@ -1083,8 +1226,8 @@ syncprov_checkpoint( Operation *op, Slap
syncprov_info_t *si = on->on_bi.bi_private;
Modifications mod;
Operation opm;
+ SlapReply rsm = { 0 };
struct berval bv[2];
- BackendInfo *orig;
slap_callback cb = {0};
mod.sml_values = bv;
@@ -1093,6 +1236,7 @@ syncprov_checkpoint( Operation *op, Slap
mod.sml_nvalues = NULL;
mod.sml_desc = slap_schema.si_ad_contextCSN;
mod.sml_op = LDAP_MOD_REPLACE;
+ mod.sml_flags = 0;
mod.sml_next = NULL;
cb.sc_response = slap_null_cb;
@@ -1102,9 +1246,14 @@ syncprov_checkpoint( Operation *op, Slap
opm.orm_modlist = &mod;
opm.o_req_dn = op->o_bd->be_suffix[0];
opm.o_req_ndn = op->o_bd->be_nsuffix[0];
- orig = opm.o_bd->bd_info;
opm.o_bd->bd_info = on->on_info->oi_orig;
- opm.o_bd->be_modify( &opm, rs );
+ opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
+ SLAP_DBFLAGS( opm.o_bd ) |= SLAP_DBFLAG_NOLASTMOD;
+ opm.o_bd->be_modify( &opm, &rsm );
+ SLAP_DBFLAGS( opm.o_bd ) ^= SLAP_DBFLAG_NOLASTMOD;
+ if ( mod.sml_next != NULL ) {
+ slap_mods_free( mod.sml_next, 1 );
+ }
}
static void
@@ -1125,10 +1274,10 @@ syncprov_add_slog( Operation *op, struct
se->se_tag = op->o_tag;
se->se_uuid.bv_val = (char *)(se+1);
- se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len + 1;
AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
se->se_uuid.bv_len = opc->suuid.bv_len;
+ se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len;
AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len );
se->se_csn.bv_val[csn->bv_len] = '\0';
se->se_csn.bv_len = csn->bv_len;
@@ -1172,7 +1321,6 @@ syncprov_playlog( Operation *op, SlapRep
struct berval *oldcsn, struct berval *ctxcsn )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
- syncprov_info_t *si = on->on_bi.bi_private;
slog_entry *se;
int i, j, ndel, num, nmods, mmods;
BerVarray uuids;
@@ -1362,14 +1510,11 @@ syncprov_op_response( Operation *op, Sla
/* for each match in opc->smatches:
* send DELETE msg
*/
- ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
if ( sm->sm_op->s_op->o_abandon )
continue;
- syncprov_sendresp( op, opc, sm->sm_op, NULL,
- LDAP_SYNC_DELETE, 1 );
+ syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
}
- ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
break;
}
}
@@ -1523,7 +1668,6 @@ syncprov_op_mod( Operation *op, SlapRepl
if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
syncprov_matchops( op, opc, 1 );
-
return SLAP_CB_CONTINUE;
}
@@ -1541,6 +1685,8 @@ typedef struct searchstate {
slap_overinst *ss_on;
syncops *ss_so;
int ss_present;
+ struct berval ss_ctxcsn;
+ char ss_csnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
} searchstate;
static int
@@ -1555,7 +1701,7 @@ syncprov_search_cleanup( Operation *op,
}
static void
-syncprov_detach_op( Operation *op, syncops *so )
+syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
{
Operation *op2;
int i, alen = 0;
@@ -1581,8 +1727,9 @@ syncprov_detach_op( Operation *op, synco
*op2->o_hdr = *op->o_hdr;
op2->o_tag = op->o_tag;
op2->o_time = op->o_time;
- op2->o_bd = op->o_bd;
+ op2->o_bd = on->on_info->oi_origdb;
op2->o_request = op->o_request;
+ op2->o_private = on;
if ( i ) {
op2->ors_attrs = (AttributeName *)(op2->o_hdr + 1);
@@ -1621,6 +1768,8 @@ syncprov_detach_op( Operation *op, synco
g2->ga_next = op2->o_groups;
op2->o_groups = g2;
}
+ /* Don't allow any further group caching */
+ op2->o_do_not_cache = 1;
/* Add op2 to conn so abandon will find us */
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
@@ -1636,27 +1785,32 @@ syncprov_search_response( Operation *op,
{
searchstate *ss = op->o_callback->sc_private;
slap_overinst *on = ss->ss_on;
- syncprov_info_t *si = on->on_bi.bi_private;
sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync];
if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) {
- int i;
+ Attribute *a;
/* If we got a referral without a referral object, there's
* something missing that we cannot replicate. Just ignore it.
* The consumer will abort because we didn't send the expected
* control.
*/
if ( !rs->sr_entry ) {
- assert( rs->sr_entry );
+ assert( rs->sr_entry != NULL );
Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 );
return SLAP_CB_CONTINUE;
}
- if ( !BER_BVISNULL( &srs->sr_state.ctxcsn )) {
- Attribute *a = attr_find( rs->sr_entry->e_attrs,
- slap_schema.si_ad_entryCSN );
-
+ a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN );
+ if ( a == NULL && rs->sr_operational_attrs != NULL ) {
+ a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN );
+ }
+ if ( a ) {
+ /* Make sure entry is less than the snaphot'd contextCSN */
+ if ( ber_bvcmp( &a->a_nvals[0], &ss->ss_ctxcsn ) > 0 )
+ return LDAP_SUCCESS;
+
/* Don't send the ctx entry twice */
- if ( a && bvmatch( &a->a_nvals[0], &srs->sr_state.ctxcsn ) )
+ if ( !BER_BVISNULL( &srs->sr_state.ctxcsn ) &&
+ bvmatch( &a->a_nvals[0], &srs->sr_state.ctxcsn ) )
return LDAP_SUCCESS;
}
rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
@@ -1667,8 +1821,7 @@ syncprov_search_response( Operation *op,
} else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) {
struct berval cookie;
- slap_compose_sync_cookie( op, &cookie,
- &op->ors_filter->f_and->f_ava->aa_value,
+ slap_compose_sync_cookie( op, &cookie, &ss->ss_ctxcsn,
srs->sr_state.rid );
/* Is this a regular refresh? */
@@ -1679,58 +1832,22 @@ syncprov_search_response( Operation *op,
rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS :
LDAP_SYNC_REFRESH_DELETES );
+ op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
} else {
- int locked = 0;
/* It's RefreshAndPersist, transition to Persist phase */
syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
- /* Flush any queued persist messages */
- if ( ss->ss_so->s_res ) {
- syncres *sr, *srnext;
- Entry *e;
- opcookie opc;
-
- opc.son = on;
- ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
- locked = 1;
- for (sr = ss->ss_so->s_res; sr; sr=srnext) {
- int rc = LDAP_SUCCESS;
- srnext = sr->s_next;
- opc.sdn = sr->s_dn;
- opc.sndn = sr->s_ndn;
- opc.suuid = sr->s_uuid;
- opc.sctxcsn = sr->s_csn;
- opc.sreference = sr->s_isreference;
- e = NULL;
-
- if ( sr->s_mode != LDAP_SYNC_DELETE ) {
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
- rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
- op->o_bd->bd_info = (BackendInfo *)on;
- }
- if ( rc == LDAP_SUCCESS )
- syncprov_sendresp( op, &opc, ss->ss_so, e,
- sr->s_mode, 0 );
-
- if ( e ) {
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
- be_entry_release_rw( op, e, 0 );
- op->o_bd->bd_info = (BackendInfo *)on;
- }
- ch_free( sr );
- }
- ss->ss_so->s_res = NULL;
- ss->ss_so->s_restail = NULL;
- }
+ op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
+
+ /* Detach this Op from frontend control */
+ ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
/* Turn off the refreshing flag */
ss->ss_so->s_flags ^= PS_IS_REFRESHING;
- if ( locked )
- ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
- /* Detach this Op from frontend control */
- syncprov_detach_op( op, ss->ss_so );
+ syncprov_detach_op( op, ss->ss_so, on );
+ ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
return LDAP_SUCCESS;
}
@@ -1745,8 +1862,7 @@ syncprov_op_search( Operation *op, SlapR
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
slap_callback *cb;
- int gotstate = 0, nochange = 0, do_present = 1;
- Filter *fand, *fava;
+ int gotstate = 0, nochange = 0, do_present;
syncops *sop = NULL;
searchstate *ss;
sync_control *srs;
@@ -1760,7 +1876,10 @@ syncprov_op_search( Operation *op, SlapR
return rs->sr_err;
}
+ do_present = si->si_nopres ? 0 : 1;
+
srs = op->o_controls[slap_cids.sc_LDAPsync];
+ op->o_managedsait = SLAP_CONTROL_NONCRITICAL;
/* If this is a persistent search, set it up right away */
if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) {
@@ -1773,7 +1892,7 @@ syncprov_op_search( Operation *op, SlapR
fc.fbase = 0;
so.s_eid = NOID;
so.s_op = op;
- so.s_flags = PS_IS_REFRESHING;
+ so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE;
/* syncprov_findbase expects to be called as a callback... */
sc.sc_private = &opc;
opc.son = on;
@@ -1844,12 +1963,11 @@ syncprov_op_search( Operation *op, SlapR
/* Is the CSN still present in the database? */
if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) {
/* No, so a reload is required */
-#if 0 /* the consumer doesn't seem to send this hint */
- if ( op->o_sync_rhint == 0 ) {
+ /* the 2.2 consumer doesn't send this hint */
+ if ( si->si_usehint && srs->sr_rhint == 0 ) {
send_ldap_error( op, rs, LDAP_SYNC_REFRESH_REQUIRED, "sync cookie is stale" );
return rs->sr_err;
}
-#endif
} else {
gotstate = 1;
/* If changed and doing Present lookup, send Present UUIDs */
@@ -1869,21 +1987,15 @@ shortcut:
sop->s_filterstr= op->ors_filterstr;
}
- fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
- fand->f_choice = LDAP_FILTER_AND;
- fand->f_next = NULL;
- fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
- fava->f_choice = LDAP_FILTER_LE;
- fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
- fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
-#ifdef LDAP_COMP_MATCH
- fava->f_ava->aa_cf = NULL;
-#endif
- ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx );
- fand->f_and = fava;
- if ( gotstate ) {
- fava->f_next = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
- fava = fava->f_next;
+ /* If something changed, find the changes */
+ if ( gotstate && !nochange ) {
+ Filter *fand, *fava;
+
+ fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
+ fand->f_choice = LDAP_FILTER_AND;
+ fand->f_next = NULL;
+ fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
+ fand->f_and = fava;
fava->f_choice = LDAP_FILTER_GE;
fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
@@ -1891,10 +2003,10 @@ shortcut:
fava->f_ava->aa_cf = NULL;
#endif
ber_dupbv_x( &fava->f_ava->aa_value, &srs->sr_state.ctxcsn, op->o_tmpmemctx );
+ fava->f_next = op->ors_filter;
+ op->ors_filter = fand;
+ filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
}
- fava->f_next = op->ors_filter;
- op->ors_filter = fand;
- filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
/* Let our callback add needed info to returned entries */
cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx);
@@ -1902,6 +2014,9 @@ shortcut:
ss->ss_on = on;
ss->ss_so = sop;
ss->ss_present = do_present;
+ ss->ss_ctxcsn.bv_len = ctxcsn.bv_len;
+ ss->ss_ctxcsn.bv_val = ss->ss_csnbuf;
+ strcpy( ss->ss_ctxcsn.bv_val, ctxcsn.bv_val );
cb->sc_response = syncprov_search_response;
cb->sc_cleanup = syncprov_search_cleanup;
cb->sc_private = ss;
@@ -1970,42 +2085,152 @@ syncprov_operational(
return SLAP_CB_CONTINUE;
}
+enum {
+ SP_CHKPT = 1,
+ SP_SESSL,
+ SP_NOPRES,
+ SP_USEHINT
+};
+
+static ConfigDriver sp_cf_gen;
+
+static ConfigTable spcfg[] = {
+ { "syncprov-checkpoint", "ops> bd_info;
+ slap_overinst *on = (slap_overinst *)c->bi;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+ int rc = 0;
- if ( strcasecmp( argv[ 0 ], "syncprov-checkpoint" ) == 0 ) {
- if ( argc != 3 ) {
- fprintf( stderr, "%s: line %d: wrong number of arguments in "
- "\"syncprov-checkpoint \"\n", fname, lineno );
- return -1;
+ if ( c->op == SLAP_CONFIG_EMIT ) {
+ switch ( c->type ) {
+ case SP_CHKPT:
+ if ( si->si_chkops || si->si_chktime ) {
+ struct berval bv;
+ bv.bv_len = sprintf( c->msg, "%d %d",
+ si->si_chkops, si->si_chktime );
+ bv.bv_val = c->msg;
+ value_add_one( &c->rvalue_vals, &bv );
+ } else {
+ rc = 1;
+ }
+ break;
+ case SP_SESSL:
+ if ( si->si_logs ) {
+ c->value_int = si->si_logs->sl_size;
+ } else {
+ rc = 1;
+ }
+ break;
+ case SP_NOPRES:
+ if ( si->si_nopres ) {
+ c->value_int = 1;
+ } else {
+ rc = 1;
+ }
+ break;
+ case SP_USEHINT:
+ if ( si->si_usehint ) {
+ c->value_int = 1;
+ } else {
+ rc = 1;
+ }
+ break;
}
- si->si_chkops = atoi( argv[1] );
- si->si_chktime = atoi( argv[2] ) * 60;
- return 0;
-
- } else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) {
- sessionlog *sl;
- int size;
- if ( argc != 2 ) {
- fprintf( stderr, "%s: line %d: wrong number of arguments in "
- "\"syncprov-sessionlog \"\n", fname, lineno );
- return -1;
+ return rc;
+ } else if ( c->op == LDAP_MOD_DELETE ) {
+ switch ( c->type ) {
+ case SP_CHKPT:
+ si->si_chkops = 0;
+ si->si_chktime = 0;
+ break;
+ case SP_SESSL:
+ if ( si->si_logs )
+ si->si_logs->sl_size = 0;
+ else
+ rc = LDAP_NO_SUCH_ATTRIBUTE;
+ break;
+ case SP_NOPRES:
+ if ( si->si_nopres )
+ si->si_nopres = 0;
+ else
+ rc = LDAP_NO_SUCH_ATTRIBUTE;
+ break;
+ case SP_USEHINT:
+ if ( si->si_usehint )
+ si->si_usehint = 0;
+ else
+ rc = LDAP_NO_SUCH_ATTRIBUTE;
+ break;
}
- size = atoi( argv[1] );
+ return rc;
+ }
+ switch ( c->type ) {
+ case SP_CHKPT:
+ if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) {
+ sprintf( c->msg, "%s unable to parse checkpoint ops # \"%s\"",
+ c->argv[0], c->argv[1] );
+ Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ return ARG_BAD_CONF;
+ }
+ if ( si->si_chkops <= 0 ) {
+ sprintf( c->msg, "%s invalid checkpoint ops # \"%d\"",
+ c->argv[0], si->si_chkops );
+ Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ return ARG_BAD_CONF;
+ }
+ if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) {
+ sprintf( c->msg, "%s unable to parse checkpoint time \"%s\"",
+ c->argv[0], c->argv[1] );
+ Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ return ARG_BAD_CONF;
+ }
+ if ( si->si_chktime <= 0 ) {
+ sprintf( c->msg, "%s invalid checkpoint time \"%d\"",
+ c->argv[0], si->si_chkops );
+ Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ return ARG_BAD_CONF;
+ }
+ si->si_chktime *= 60;
+ break;
+ case SP_SESSL: {
+ sessionlog *sl;
+ int size = c->value_int;
+
if ( size < 0 ) {
- fprintf( stderr,
- "%s: line %d: session log size %d is negative\n",
- fname, lineno, size );
- return -1;
+ sprintf( c->msg, "%s size %d is negative",
+ c->argv[0], size );
+ Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ return ARG_BAD_CONF;
}
sl = si->si_logs;
if ( !sl ) {
@@ -2018,24 +2243,29 @@ syncprov_db_config(
si->si_logs = sl;
}
sl->sl_size = size;
- return 0;
+ }
+ break;
+ case SP_NOPRES:
+ si->si_nopres = c->value_int;
+ break;
+ case SP_USEHINT:
+ si->si_usehint = c->value_int;
+ break;
}
-
- return SLAP_CONF_UNKNOWN;
+ return rc;
}
-/* Cheating - we have no thread pool context for these functions,
- * so make one.
+/* ITS#3456 we cannot run this search on the main thread, must use a
+ * child thread in order to insure we have a big enough stack.
*/
-typedef struct thread_keys {
- void *key;
- void *data;
- ldap_pvt_thread_pool_keyfree_t *xfree;
-} thread_keys;
-
-#define MAXKEYS 32
-/* A fake thread context */
-static thread_keys thrctx[MAXKEYS];
+static void *
+syncprov_db_otask(
+ void *ptr
+)
+{
+ syncprov_findcsn( ptr, FIND_MAXCSN );
+ return NULL;
+}
/* Read any existing contextCSN from the underlying db.
* Then search for any entries newer than that. If no value exists,
@@ -2050,18 +2280,24 @@ syncprov_db_open(
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
Connection conn;
- char opbuf[OPERATION_BUFFER_SIZE];
+ OperationBuffer opbuf;
char ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
- Operation *op = (Operation *)opbuf;
+ Operation *op = (Operation *) &opbuf;
Entry *e;
Attribute *a;
int rc;
+ void *thrctx = NULL;
+
+ if ( slapMode & SLAP_TOOL_MODE ) {
+ return 0;
+ }
rc = overlay_register_control( be, LDAP_CONTROL_SYNC );
if ( rc ) {
return rc;
}
+ thrctx = ldap_pvt_thread_pool_context();
connection_fake_init( &conn, op, thrctx );
op->o_bd = be;
op->o_dn = be->be_rootdn;
@@ -2074,6 +2310,8 @@ syncprov_db_open(
slap_schema.si_ad_contextCSN, 0, &e );
if ( e ) {
+ ldap_pvt_thread_t tid;
+
a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
if ( a ) {
si->si_ctxcsn.bv_len = a->a_nvals[0].bv_len;
@@ -2085,27 +2323,37 @@ syncprov_db_open(
strcpy( ctxcsnbuf, si->si_ctxcsnbuf );
}
be_entry_release_rw( op, e, 0 );
- op->o_bd->bd_info = (BackendInfo *)on;
- op->o_req_dn = be->be_suffix[0];
- op->o_req_ndn = be->be_nsuffix[0];
- op->ors_scope = LDAP_SCOPE_SUBTREE;
- syncprov_findcsn( op, FIND_MAXCSN );
+ if ( !BER_BVISEMPTY( &si->si_ctxcsn ) ) {
+ op->o_bd->bd_info = (BackendInfo *)on;
+ op->o_req_dn = be->be_suffix[0];
+ op->o_req_ndn = be->be_nsuffix[0];
+ op->ors_scope = LDAP_SCOPE_SUBTREE;
+ ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op );
+ ldap_pvt_thread_join( tid, NULL );
+ }
+ } else if ( SLAP_SYNC_SHADOW( op->o_bd )) {
+ /* If we're also a consumer, and we didn't find the context entry,
+ * then don't generate anything, wait for our provider to send it
+ * to us.
+ */
+ goto out;
}
if ( BER_BVISEMPTY( &si->si_ctxcsn ) ) {
- slap_get_csn( op, si->si_ctxcsnbuf, sizeof(si->si_ctxcsnbuf),
- &si->si_ctxcsn, 0 );
+ si->si_ctxcsn.bv_len = sizeof( si->si_ctxcsnbuf );
+ slap_get_csn( op, &si->si_ctxcsn, 0 );
}
/* If our ctxcsn is different from what was read from the root
- * entry, write the new value out.
+ * entry, make sure we do a checkpoint on close
*/
if ( strcmp( si->si_ctxcsnbuf, ctxcsnbuf )) {
- SlapReply rs = {REP_RESULT};
- syncprov_checkpoint( op, &rs, on );
+ si->si_numops++;
}
+out:
op->o_bd->bd_info = (BackendInfo *)on;
+ ldap_pvt_thread_pool_context_reset( thrctx );
return 0;
}
@@ -2118,24 +2366,24 @@ syncprov_db_close(
{
slap_overinst *on = (slap_overinst *) be->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
- int i;
+ if ( slapMode & SLAP_TOOL_MODE ) {
+ return 0;
+ }
if ( si->si_numops ) {
Connection conn;
- char opbuf[OPERATION_BUFFER_SIZE];
- Operation *op = (Operation *)opbuf;
+ OperationBuffer opbuf;
+ Operation *op = (Operation *) &opbuf;
SlapReply rs = {REP_RESULT};
+ void *thrctx;
+ thrctx = ldap_pvt_thread_pool_context();
connection_fake_init( &conn, op, thrctx );
op->o_bd = be;
op->o_dn = be->be_rootdn;
op->o_ndn = be->be_rootndn;
syncprov_checkpoint( op, &rs, on );
- }
- for ( i=0; thrctx[i].key; i++) {
- if ( thrctx[i].xfree )
- thrctx[i].xfree( thrctx[i].key, thrctx[i].data );
- thrctx[i].key = NULL;
+ ldap_pvt_thread_pool_context_reset( thrctx );
}
return 0;
@@ -2158,6 +2406,8 @@ syncprov_db_init(
csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname;
+ csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID;
+ csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname;
uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID;
uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname;
@@ -2174,6 +2424,9 @@ syncprov_db_destroy(
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
if ( si ) {
+ if ( si->si_logs ) {
+ ch_free( si->si_logs );
+ }
ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );
@@ -2189,7 +2442,8 @@ static int syncprov_parseCtrl (
LDAPControl *ctrl )
{
ber_tag_t tag;
- BerElement *ber;
+ BerElementBuffer berbuf;
+ BerElement *ber = (BerElement *)&berbuf;
ber_int_t mode;
ber_len_t len;
struct berval cookie = BER_BVNULL;
@@ -2223,11 +2477,7 @@ static int syncprov_parseCtrl (
* }
*/
- ber = ber_init( &ctrl->ldctl_value );
- if( ber == NULL ) {
- rs->sr_text = "internal error";
- return LDAP_OTHER;
- }
+ ber_init2( ber, &ctrl->ldctl_value, 0 );
if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) {
rs->sr_text = "Sync control : mode decoding error";
@@ -2249,10 +2499,11 @@ static int syncprov_parseCtrl (
tag = ber_peek_tag( ber, &len );
if ( tag == LDAP_TAG_SYNC_COOKIE ) {
- if (( ber_scanf( ber, /*{*/ "o", &cookie )) == LBER_ERROR ) {
+ if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) {
rs->sr_text = "Sync control : cookie decoding error";
return LDAP_PROTOCOL_ERROR;
}
+ tag = ber_peek_tag( ber, &len );
}
if ( tag == LDAP_TAG_RELOAD_HINT ) {
if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) {
@@ -2267,14 +2518,16 @@ static int syncprov_parseCtrl (
sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx );
sr->sr_rhint = rhint;
if (!BER_BVISNULL(&cookie)) {
- ber_dupbv( &sr->sr_state.octet_str, &cookie );
- slap_parse_sync_cookie( &sr->sr_state );
+ ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx );
+ slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx );
+ if ( sr->sr_state.rid == -1 ) {
+ rs->sr_text = "Sync control : cookie parsing error";
+ return LDAP_PROTOCOL_ERROR;
+ }
}
op->o_controls[slap_cids.sc_LDAPsync] = sr;
- (void) ber_free( ber, 1 );
-
op->o_sync = ctrl->ldctl_iscritical
? SLAP_CONTROL_CRITICAL
: SLAP_CONTROL_NONCRITICAL;
@@ -2292,7 +2545,7 @@ static int syncprov_parseCtrl (
static slap_overinst syncprov;
int
-syncprov_init()
+syncprov_initialize()
{
int rc;
@@ -2300,13 +2553,13 @@ syncprov_init()
SLAP_CTRL_HIDE|SLAP_CTRL_SEARCH, NULL,
syncprov_parseCtrl, &slap_cids.sc_LDAPsync );
if ( rc != LDAP_SUCCESS ) {
- fprintf( stderr, "Failed to register control %d\n", rc );
+ Debug( LDAP_DEBUG_ANY,
+ "syncprov_init: Failed to register control %d\n", rc, 0, 0 );
return rc;
}
syncprov.on_bi.bi_type = "syncprov";
syncprov.on_bi.bi_db_init = syncprov_db_init;
- syncprov.on_bi.bi_db_config = syncprov_db_config;
syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
syncprov.on_bi.bi_db_open = syncprov_db_open;
syncprov.on_bi.bi_db_close = syncprov_db_close;
@@ -2323,6 +2576,13 @@ syncprov_init()
syncprov.on_bi.bi_extended = syncprov_op_extended;
syncprov.on_bi.bi_operational = syncprov_operational;
+ syncprov.on_bi.bi_cf_ocs = spocs;
+
+ generic_filter.f_desc = slap_schema.si_ad_objectClass;
+
+ rc = config_register_schema( spcfg, spocs );
+ if ( rc ) return rc;
+
return overlay_register( &syncprov );
}
@@ -2330,7 +2590,7 @@ syncprov_init()
int
init_module( int argc, char *argv[] )
{
- return syncprov_init();
+ return syncprov_initialize();
}
#endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */