version 1.147.2.50, 2009/03/13 19:47:32
|
version 1.147.2.51, 2009/03/13 19:52:28
|
Line 1
|
Line 1
|
/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.49 2009/03/12 23:28:52 quanah Exp $ */ |
/* $OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.50 2009/03/13 19:47:32 quanah Exp $ */ |
/* syncprov.c - syncrepl provider */ |
/* syncprov.c - syncrepl provider */ |
/* This work is part of OpenLDAP Software <http://www.openldap.org/>. |
/* This work is part of OpenLDAP Software <http://www.openldap.org/>. |
* |
* |
Line 71 typedef struct syncops {
|
Line 71 typedef struct syncops {
|
#define PS_WROTE_BASE 0x04 |
#define PS_WROTE_BASE 0x04 |
#define PS_FIND_BASE 0x08 |
#define PS_FIND_BASE 0x08 |
#define PS_FIX_FILTER 0x10 |
#define PS_FIX_FILTER 0x10 |
|
#define PS_TASK_QUEUED 0x20 |
|
|
int s_inuse; /* reference count */ |
int s_inuse; /* reference count */ |
struct syncres *s_res; |
struct syncres *s_res; |
struct syncres *s_restail; |
struct syncres *s_restail; |
struct re_s *s_qtask; /* task for playing psearch responses */ |
|
#define RUNQ_INTERVAL 36000 /* a long time */ |
|
ldap_pvt_thread_mutex_t s_mutex; |
ldap_pvt_thread_mutex_t s_mutex; |
} syncops; |
} syncops; |
|
|
Line 748 syncprov_free_syncop( syncops *so )
|
Line 747 syncprov_free_syncop( syncops *so )
|
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
return; |
return; |
} |
} |
if ( so->s_qtask ) { |
|
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); |
|
if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) ) |
|
ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask ); |
|
ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask ); |
|
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); |
|
} |
|
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
if ( so->s_flags & PS_IS_DETACHED ) { |
if ( so->s_flags & PS_IS_DETACHED ) { |
filter_free( so->s_op->ors_filter ); |
filter_free( so->s_op->ors_filter ); |
Line 861 syncprov_sendresp( Operation *op, opcook
|
Line 853 syncprov_sendresp( Operation *op, opcook
|
return rs.sr_err; |
return rs.sr_err; |
} |
} |
|
|
|
static void |
|
syncprov_qstart( syncops *so ); |
|
|
/* Play back queued responses */ |
/* Play back queued responses */ |
static int |
static int |
syncprov_qplay( Operation *op, struct re_s *rtask ) |
syncprov_qplay( Operation *op, syncops *so ) |
{ |
{ |
syncops *so = rtask->arg; |
|
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; |
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; |
syncres *sr; |
syncres *sr; |
Entry *e; |
Entry *e; |
Line 874 syncprov_qplay( Operation *op, struct re
|
Line 868 syncprov_qplay( Operation *op, struct re
|
|
|
opc.son = on; |
opc.son = on; |
|
|
for (;;) { |
do { |
ldap_pvt_thread_mutex_lock( &so->s_mutex ); |
ldap_pvt_thread_mutex_lock( &so->s_mutex ); |
sr = so->s_res; |
sr = so->s_res; |
if ( sr ) |
if ( sr ) |
Line 918 syncprov_qplay( Operation *op, struct re
|
Line 912 syncprov_qplay( Operation *op, struct re
|
|
|
ch_free( sr ); |
ch_free( sr ); |
|
|
if ( rc ) { |
/* Exit loop with mutex held */ |
/* Exit loop with mutex held */ |
ldap_pvt_thread_mutex_lock( &so->s_mutex ); |
ldap_pvt_thread_mutex_lock( &so->s_mutex ); |
|
break; |
|
} |
|
} |
|
|
|
/* wait until we get explicitly scheduled again */ |
} while (0); |
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); |
|
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask ); |
/* We now only send one change at a time, to prevent one |
if ( rc == 0 ) { |
* psearch from hogging all the CPU. Resubmit this task if |
ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 ); |
* there are more responses queued and no errors occurred. |
} else { |
*/ |
/* bail out on any error */ |
|
ldap_pvt_runqueue_remove( &slapd_rq, rtask ); |
|
|
|
/* Prevent duplicate remove */ |
if ( rc == 0 && so->s_res ) { |
if ( so->s_qtask == rtask ) |
syncprov_qstart( so ); |
so->s_qtask = NULL; |
} else { |
|
so->s_flags ^= PS_TASK_QUEUED; |
} |
} |
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); |
|
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
return rc; |
return rc; |
} |
} |
|
|
/* runqueue task for playing back queued responses */ |
/* task for playing back queued responses */ |
static void * |
static void * |
syncprov_qtask( void *ctx, void *arg ) |
syncprov_qtask( void *ctx, void *arg ) |
{ |
{ |
struct re_s *rtask = arg; |
syncops *so = arg; |
syncops *so = rtask->arg; |
|
OperationBuffer opbuf; |
OperationBuffer opbuf; |
Operation *op; |
Operation *op; |
BackendDB be; |
BackendDB be; |
Line 973 syncprov_qtask( void *ctx, void *arg )
|
Line 961 syncprov_qtask( void *ctx, void *arg )
|
LDAP_SLIST_FIRST(&op->o_extra) = NULL; |
LDAP_SLIST_FIRST(&op->o_extra) = NULL; |
op->o_callback = NULL; |
op->o_callback = NULL; |
|
|
rc = syncprov_qplay( op, rtask ); |
rc = syncprov_qplay( op, so ); |
|
|
/* decrement use count... */ |
/* decrement use count... */ |
syncprov_free_syncop( so ); |
syncprov_free_syncop( so ); |
|
|
#if 0 /* FIXME: connection_close isn't exported from slapd. |
|
* should it be? |
|
*/ |
|
if ( rc ) { |
|
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); |
|
if ( connection_state_closing( op->o_conn )) { |
|
connection_close( op->o_conn ); |
|
} |
|
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); |
|
} |
|
#endif |
|
return NULL; |
return NULL; |
} |
} |
|
|
Line 996 syncprov_qtask( void *ctx, void *arg )
|
Line 973 syncprov_qtask( void *ctx, void *arg )
|
static void |
static void |
syncprov_qstart( syncops *so ) |
syncprov_qstart( syncops *so ) |
{ |
{ |
int wake=0; |
so->s_flags |= PS_TASK_QUEUED; |
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); |
so->s_inuse++; |
if ( !so->s_qtask ) { |
ldap_pvt_thread_pool_submit( &connection_pool, |
so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, |
syncprov_qtask, so ); |
syncprov_qtask, so, "syncprov_qtask", |
|
so->s_op->o_conn->c_peer_name.bv_val ); |
|
++so->s_inuse; |
|
wake = 1; |
|
} else { |
|
if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && |
|
!so->s_qtask->next_sched.tv_sec ) { |
|
so->s_qtask->interval.tv_sec = 0; |
|
ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); |
|
so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; |
|
++so->s_inuse; |
|
wake = 1; |
|
} |
|
} |
|
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); |
|
if ( wake ) |
|
slap_wake_listener(); |
|
} |
} |
|
|
/* Queue a persistent search response */ |
/* Queue a persistent search response */ |
Line 1076 syncprov_qresp( opcookie *opc, syncops *
|
Line 1036 syncprov_qresp( opcookie *opc, syncops *
|
so->s_flags ^= PS_WROTE_BASE; |
so->s_flags ^= PS_WROTE_BASE; |
so->s_flags |= PS_FIND_BASE; |
so->s_flags |= PS_FIND_BASE; |
} |
} |
if ( so->s_flags & PS_IS_DETACHED ) { |
if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { |
syncprov_qstart( so ); |
syncprov_qstart( so ); |
} |
} |
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); |