--- servers/slapd/overlays/syncprov.c 2007/06/08 07:08:39 1.56.2.43 +++ servers/slapd/overlays/syncprov.c 2006/07/27 08:44:23 1.153 @@ -1,8 +1,8 @@ -/* $OpenLDAP$ */ +/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.152 2006/07/23 22:32:27 hyc Exp $ */ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * - * Copyright 2004-2007 The OpenLDAP Foundation. + * Copyright 2004-2006 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -65,7 +65,6 @@ typedef struct syncops { #define PS_IS_DETACHED 0x02 #define PS_WROTE_BASE 0x04 #define PS_FIND_BASE 0x08 -#define PS_FIX_FILTER 0x10 int s_inuse; /* reference count */ struct syncres *s_res; @@ -127,7 +126,7 @@ typedef struct syncprov_info_t { time_t si_chklast; /* time of last checkpoint */ Avlnode *si_mods; /* entries being modified */ sessionlog *si_logs; - ldap_pvt_thread_rdwr_t si_csn_rwlock; + ldap_pvt_thread_mutex_t si_csn_mutex; ldap_pvt_thread_mutex_t si_ops_mutex; ldap_pvt_thread_mutex_t si_mods_mutex; char si_ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; @@ -580,7 +579,7 @@ syncprov_findcsn( Operation *op, find_cs char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; struct berval maxcsn; - Filter cf; + Filter cf, af; #ifdef LDAP_COMP_MATCH AttributeAssertion eq = { NULL, BER_BVNULL, NULL }; #else @@ -652,8 +651,14 @@ again: cb.sc_response = findcsn_cb; break; case FIND_PRESENT: - fop.ors_filter = op->ors_filter; - fop.ors_filterstr = op->ors_filterstr; + af.f_choice = LDAP_FILTER_AND; + af.f_next = NULL; + af.f_and = &cf; + cf.f_choice = LDAP_FILTER_LE; + cf.f_av_value = srs->sr_state.ctxcsn; + cf.f_next = op->ors_filter; + fop.ors_filter = ⁡ + filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr ); fop.ors_attrsonly = 0; fop.ors_attrs = uuid_anlist; fop.ors_slimit = SLAP_NO_LIMIT; @@ -697,6 +702,7 @@ again: break; case FIND_PRESENT: op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx ); + op->o_tmpfree( fop.ors_filterstr.bv_val, op->o_tmpmemctx ); break; } @@ -767,7 +773,7 @@ syncprov_sendresp( Operation *op, opcook rs.sr_flags = REP_ENTRY_MUSTRELEASE; if ( opc->sreference ) { rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); - rs.sr_err = send_search_reference( op, &rs ); + send_search_reference( op, &rs ); ber_bvarray_free( rs.sr_ref ); if ( !rs.sr_entry ) *e = NULL; @@ -779,7 +785,7 @@ syncprov_sendresp( Operation *op, opcook if ( rs.sr_entry->e_private ) rs.sr_flags = REP_ENTRY_MUSTRELEASE; rs.sr_attrs = op->ors_attrs; - rs.sr_err = send_search_entry( op, &rs ); + send_search_entry( op, &rs ); if ( !rs.sr_entry ) *e = NULL; break; @@ -791,9 +797,9 @@ syncprov_sendresp( Operation *op, opcook if ( opc->sreference ) { struct berval bv = BER_BVNULL; rs.sr_ref = &bv; - rs.sr_err = send_search_reference( op, &rs ); + send_search_reference( op, &rs ); } else { - rs.sr_err = send_search_entry( op, &rs ); + send_search_entry( op, &rs ); } break; default: @@ -842,10 +848,7 @@ syncprov_qplay( Operation *op, slap_over if ( sr->s_mode != LDAP_SYNC_DELETE ) { rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); if ( rc ) { - Debug( LDAP_DEBUG_SYNC, "syncprov_qplay: failed to get %s, " - "error (%d), ignoring...\n", opc.sndn.bv_val, rc, 0 ); ch_free( sr ); - rc = 0; continue; } } @@ -874,7 +877,6 @@ syncprov_qtask( void *ctx, void *arg ) OperationBuffer opbuf; Operation *op; BackendDB be; - int rc; op = (Operation *) &opbuf; *op = *so->s_op; @@ -895,52 +897,20 @@ syncprov_qtask( void *ctx, void *arg ) op->o_private = NULL; op->o_callback = NULL; - rc = syncprov_qplay( op, on, so ); + (void)syncprov_qplay( op, on, so ); /* decrement use count... */ syncprov_free_syncop( so ); /* wait until we get explicitly scheduled again */ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - ldap_pvt_runqueue_stoptask( &slapd_rq, rtask ); - if ( rc == 0 ) { - ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 ); - } else { - /* bail out on any error */ - ldap_pvt_runqueue_remove( &slapd_rq, rtask ); - } + ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask ); + ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 ); ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); return NULL; } -/* Start the task to play back queued psearch responses */ -static void -syncprov_qstart( syncops *so ) -{ - int wake=0; - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - if ( !so->s_qtask ) { - so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, - syncprov_qtask, so, "syncprov_qtask", - so->s_op->o_conn->c_peer_name.bv_val ); - ++so->s_inuse; - wake = 1; - } else { - if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && - !so->s_qtask->next_sched.tv_sec ) { - so->s_qtask->interval.tv_sec = 0; - ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); - so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; - ++so->s_inuse; - wake = 1; - } - } - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - if ( wake ) - slap_wake_listener(); -} - /* Queue a persistent search response */ static int syncprov_qresp( opcookie *opc, syncops *so, int mode ) @@ -979,7 +949,27 @@ syncprov_qresp( opcookie *opc, syncops * so->s_flags |= PS_FIND_BASE; } if ( so->s_flags & PS_IS_DETACHED ) { - syncprov_qstart( so ); + int wake=0; + ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); + if ( !so->s_qtask ) { + so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, + syncprov_qtask, so, "syncprov_qtask", + so->s_op->o_conn->c_peer_name.bv_val ); + ++so->s_inuse; + wake = 1; + } else { + if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && + !so->s_qtask->next_sched.tv_sec ) { + so->s_qtask->interval.tv_sec = 0; + ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); + so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; + ++so->s_inuse; + wake = 1; + } + } + ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); + if ( wake ) + slap_wake_listener(); } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return LDAP_SUCCESS; @@ -1286,7 +1276,7 @@ syncprov_checkpoint( Operation *op, Slap } static void -syncprov_add_slog( Operation *op ) +syncprov_add_slog( Operation *op, struct berval *csn ) { opcookie *opc = op->o_callback->sc_private; slap_overinst *on = opc->son; @@ -1298,7 +1288,7 @@ syncprov_add_slog( Operation *op ) { /* Allocate a record. UUIDs are not NUL-terminated. */ se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + - op->o_csn.bv_len + 1 ); + csn->bv_len + 1 ); se->se_next = NULL; se->se_tag = op->o_tag; @@ -1307,9 +1297,9 @@ syncprov_add_slog( Operation *op ) se->se_uuid.bv_len = opc->suuid.bv_len; se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len; - AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len ); - se->se_csn.bv_val[op->o_csn.bv_len] = '\0'; - se->se_csn.bv_len = op->o_csn.bv_len; + AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len ); + se->se_csn.bv_val[csn->bv_len] = '\0'; + se->se_csn.bv_len = csn->bv_len; ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); if ( sl->sl_head ) { @@ -1373,19 +1363,9 @@ syncprov_playlog( Operation *op, SlapRep * and everything else at the end. Do this first so we can * unlock the list mutex. */ - Debug( LDAP_DEBUG_SYNC, "srs csn %s\n", srs->sr_state.ctxcsn.bv_val, 0, 0 ); for ( se=sl->sl_head; se; se=se->se_next ) { - Debug( LDAP_DEBUG_SYNC, "log csn %s\n", se->se_csn.bv_val, 0, 0 ); - ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn ); - if ( ndel <= 0 ) { - Debug( LDAP_DEBUG_SYNC, "cmp %d, too old\n", ndel, 0, 0 ); - continue; - } - ndel = ber_bvcmp( &se->se_csn, ctxcsn ); - if ( ndel > 0 ) { - Debug( LDAP_DEBUG_SYNC, "cmp %d, too new\n", ndel, 0, 0 ); - break; - } + if ( ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn ) <= 0 ) continue; + if ( ber_bvcmp( &se->se_csn, ctxcsn ) > 0 ) break; if ( se->se_tag == LDAP_REQ_DELETE ) { j = i; i++; @@ -1503,14 +1483,13 @@ syncprov_op_response( Operation *op, Sla if ( rs->sr_err == LDAP_SUCCESS ) { - struct berval maxcsn = BER_BVNULL; + struct berval maxcsn = BER_BVNULL, curcsn = BER_BVNULL; char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; - int do_check=0; /* Update our context CSN */ cbuf[0] = '\0'; - ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock ); - slap_get_commit_csn( op, &maxcsn ); + ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); + slap_get_commit_csn( op, &maxcsn, &curcsn ); if ( !BER_BVISNULL( &maxcsn ) ) { strcpy( cbuf, maxcsn.bv_val ); if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) { @@ -1522,12 +1501,13 @@ syncprov_op_response( Operation *op, Sla /* Don't do any processing for consumer contextCSN updates */ if ( SLAP_SYNC_SHADOW( op->o_bd ) && op->o_msgid == SLAP_SYNC_UPDATE_MSGID ) { - ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); return SLAP_CB_CONTINUE; } si->si_numops++; if ( si->si_chkops || si->si_chktime ) { + int do_check=0; if ( si->si_chkops && si->si_numops >= si->si_chkops ) { do_check = 1; si->si_numops = 0; @@ -1537,14 +1517,11 @@ syncprov_op_response( Operation *op, Sla do_check = 1; si->si_chklast = op->o_time; } + if ( do_check ) { + syncprov_checkpoint( op, rs, on ); + } } - ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); - - if ( do_check ) { - ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); - syncprov_checkpoint( op, rs, on ); - ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); - } + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); opc->sctxcsn.bv_len = maxcsn.bv_len; opc->sctxcsn.bv_val = cbuf; @@ -1573,7 +1550,7 @@ syncprov_op_response( Operation *op, Sla /* Add any log records */ if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) { - syncprov_add_slog( op ); + syncprov_add_slog( op, &curcsn ); } } @@ -1608,7 +1585,7 @@ syncprov_op_compare( Operation *op, Slap a.a_vals = bv; a.a_nvals = a.a_vals; - ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc, &op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL ); @@ -1637,7 +1614,7 @@ syncprov_op_compare( Operation *op, Slap return_results:; - ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); send_ldap_result( op, rs ); @@ -1808,15 +1785,7 @@ syncprov_detach_op( Operation *op, synco op2->ors_filterstr.bv_val = ptr; strcpy( ptr, so->s_filterstr.bv_val ); op2->ors_filterstr.bv_len = so->s_filterstr.bv_len; - - /* Skip the AND/GE clause that we stuck on in front */ - if ( so->s_flags & PS_FIX_FILTER ) { - op2->ors_filter = op->ors_filter->f_and->f_next; - so->s_flags ^= PS_FIX_FILTER; - } else { - op2->ors_filter = op->ors_filter; - } - op2->ors_filter = filter_dup( op2->ors_filter, NULL ); + op2->ors_filter = filter_dup( op->ors_filter, NULL ); so->s_op = op2; /* Copy any cached group ACLs individually */ @@ -1908,7 +1877,7 @@ syncprov_search_response( Operation *op, op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); } else { /* It's RefreshAndPersist, transition to Persist phase */ - syncprov_sendinfo( op, rs, ss->ss_present ? + syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ? LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, &cookie, 1, NULL, 0 ); op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); @@ -1920,10 +1889,6 @@ syncprov_search_response( Operation *op, ss->ss_so->s_flags ^= PS_IS_REFRESHING; syncprov_detach_op( op, ss->ss_so, on ); - - /* If there are queued responses, fire them off */ - if ( ss->ss_so->s_res ) - syncprov_qstart( ss->ss_so ); ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); return LDAP_SUCCESS; @@ -1997,10 +1962,10 @@ syncprov_op_search( Operation *op, SlapR } /* snapshot the ctxcsn */ - ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); strcpy( csnbuf, si->si_ctxcsnbuf ); ctxcsn.bv_len = si->si_ctxcsn.bv_len; - ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); ctxcsn.bv_val = csnbuf; /* If we have a cookie, handle the PRESENT lookups */ @@ -2031,10 +1996,7 @@ syncprov_op_search( Operation *op, SlapR sl=si->si_logs; if ( sl ) { ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); - /* Are there any log entries, and is the consumer state - * present in the session log? - */ - if ( sl->sl_num > 0 && ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) { + if ( ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) { do_present = 0; /* mutex is unlocked in playlog */ syncprov_playlog( op, rs, sl, srs, &ctxcsn ); @@ -2088,8 +2050,6 @@ shortcut: fava->f_next = op->ors_filter; op->ors_filter = fand; filter2bv_x( op, op->ors_filter, &op->ors_filterstr ); - if ( sop ) - sop->s_flags |= PS_FIX_FILTER; } /* Let our callback add needed info to returned entries */ @@ -2157,13 +2117,13 @@ syncprov_operational( *ap = a; } - ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); if ( !ap ) { strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf ); } else { ber_dupbv( &a->a_vals[0], &si->si_ctxcsn ); } - ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); } } return SLAP_CB_CONTINUE; @@ -2281,31 +2241,27 @@ sp_cf_gen(ConfigArgs *c) switch ( c->type ) { case SP_CHKPT: if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) { - snprintf( c->msg, sizeof( c->msg ), "%s unable to parse checkpoint ops # \"%s\"", + sprintf( c->msg, "%s unable to parse checkpoint ops # \"%s\"", c->argv[0], c->argv[1] ); - Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, - "%s: %s\n", c->log, c->msg, 0 ); + Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 ); return ARG_BAD_CONF; } if ( si->si_chkops <= 0 ) { - snprintf( c->msg, sizeof( c->msg ), "%s invalid checkpoint ops # \"%d\"", + sprintf( c->msg, "%s invalid checkpoint ops # \"%d\"", c->argv[0], si->si_chkops ); - Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, - "%s: %s\n", c->log, c->msg, 0 ); + Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 ); return ARG_BAD_CONF; } if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) { - snprintf( c->msg, sizeof( c->msg ), "%s unable to parse checkpoint time \"%s\"", + sprintf( c->msg, "%s unable to parse checkpoint time \"%s\"", c->argv[0], c->argv[1] ); - Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, - "%s: %s\n", c->log, c->msg, 0 ); + Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 ); return ARG_BAD_CONF; } if ( si->si_chktime <= 0 ) { - snprintf( c->msg, sizeof( c->msg ), "%s invalid checkpoint time \"%d\"", + sprintf( c->msg, "%s invalid checkpoint time \"%d\"", c->argv[0], si->si_chkops ); - Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, - "%s: %s\n", c->log, c->msg, 0 ); + Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 ); return ARG_BAD_CONF; } si->si_chktime *= 60; @@ -2315,10 +2271,9 @@ sp_cf_gen(ConfigArgs *c) int size = c->value_int; if ( size < 0 ) { - snprintf( c->msg, sizeof( c->msg ), "%s size %d is negative", + sprintf( c->msg, "%s size %d is negative", c->argv[0], size ); - Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, - "%s: %s\n", c->log, c->msg, 0 ); + Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 ); return ARG_BAD_CONF; } sl = si->si_logs; @@ -2449,6 +2404,7 @@ syncprov_db_open( out: op->o_bd->bd_info = (BackendInfo *)on; + ldap_pvt_thread_pool_context_reset( thrctx ); return 0; } @@ -2478,6 +2434,7 @@ syncprov_db_close( op->o_dn = be->be_rootdn; op->o_ndn = be->be_rootndn; syncprov_checkpoint( op, &rs, on ); + ldap_pvt_thread_pool_context_reset( thrctx ); } return 0; @@ -2491,16 +2448,9 @@ syncprov_db_init( slap_overinst *on = (slap_overinst *)be->bd_info; syncprov_info_t *si; - if ( SLAP_ISGLOBALOVERLAY( be ) ) { - Debug( LDAP_DEBUG_ANY, - "syncprov must be instantiated within a database.\n", - 0, 0, 0 ); - return 1; - } - si = ch_calloc(1, sizeof(syncprov_info_t)); on->on_bi.bi_private = si; - ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_init( &si->si_csn_mutex ); ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); si->si_ctxcsn.bv_val = si->si_ctxcsnbuf; @@ -2538,7 +2488,7 @@ syncprov_db_destroy( } ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); - ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock ); + ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex ); ch_free( si ); } @@ -2628,8 +2578,8 @@ static int syncprov_parseCtrl ( sr->sr_rhint = rhint; if (!BER_BVISNULL(&cookie)) { ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx ); - if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) || - sr->sr_state.rid == -1 ) { + slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ); + if ( sr->sr_state.rid == -1 ) { rs->sr_text = "Sync control : cookie parsing error"; return LDAP_PROTOCOL_ERROR; }