/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2001, 2011 Oracle and/or its affiliates.  All rights reserved.
 *
 * $Id$
 */

#include "db_config.h"

#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"

#ifdef REP_DIAGNOSTIC
#include "dbinc/db_page.h"
#include "dbinc/fop.h"
#include "dbinc/btree.h"
#include "dbinc/hash.h"
#include "dbinc/qam.h"
#endif

/*
 * rep_util.c:
 *	Miscellaneous replication-related utility functions, including
 *	those called by other subsystems.
 */
#define	TIMESTAMP_CHECK(env, ts, renv) do {				\
	if (renv->op_timestamp != 0 &&					\
	    renv->op_timestamp + DB_REGENV_TIMEOUT < ts) {		\
		REP_SYSTEM_LOCK(env);					\
		F_CLR(renv, DB_REGENV_REPLOCKED);			\
		renv->op_timestamp = 0;					\
		REP_SYSTEM_UNLOCK(env);					\
	}								\
} while (0)

static int __rep_lockout_int __P((ENV *, REP *, u_int32_t *, u_int32_t,
    const char *, u_int32_t));
static int __rep_newmaster_empty __P((ENV *, int));
static int __rep_print_int __P((ENV *, u_int32_t, const char *, va_list));
#ifdef REP_DIAGNOSTIC
static void __rep_print_logmsg __P((ENV *, const DBT *, DB_LSN *));
#endif
static int __rep_show_progress __P((ENV *, const char *, int mins));

/*
 * __rep_bulk_message --
 *	This is a wrapper for putting a record into a bulk buffer.  Since
 * we have different bulk buffers, the caller must hand us the information
 * we need to put the record into the correct buffer.  All bulk buffers
 * are protected by the REP->mtx_clientdb.
 *
 * PUBLIC: int __rep_bulk_message __P((ENV *, REP_BULK *, REP_THROTTLE *,
 * PUBLIC:     DB_LSN *, const DBT *, u_int32_t));
 */
int
__rep_bulk_message(env, bulk, repth, lsn, dbt, flags)
	ENV *env;
	REP_BULK *bulk;
	REP_THROTTLE *repth;
	DB_LSN *lsn;
	const DBT *dbt;
	u_int32_t flags;
{
	DB_REP *db_rep;
	REP *rep;
	__rep_bulk_args b_args;
	size_t len;
	int ret;
	u_int32_t recsize, typemore;
	u_int8_t *p;

	db_rep = env->rep_handle;
	rep = db_rep->region;
	ret = 0;

	/*
	 * Figure out the total number of bytes needed for this record.
	 * !!! The marshalling code includes the given len, but also
	 * puts its own copy of the dbt->size with the DBT portion of
	 * the record.  Account for that here.
	 */
	recsize = sizeof(len) + dbt->size + sizeof(DB_LSN) + sizeof(dbt->size);

	/*
	 * If *this* buffer is actively being transmitted, don't wait,
	 * just return so that it can be sent as a singleton.
	 */
	MUTEX_LOCK(env, rep->mtx_clientdb);
	if (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) {
		MUTEX_UNLOCK(env, rep->mtx_clientdb);
		return (DB_REP_BULKOVF);
	}

	/*
	 * If the record is bigger than the buffer entirely, send the
	 * current buffer and then return DB_REP_BULKOVF so that this
	 * record is sent as a singleton.  Do we have enough info to
	 * do that here?  XXX
	 */
	if (recsize > bulk->len) {
		RPRINT(env, (env, DB_VERB_REP_MSGS,
		    "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x",
		    recsize, recsize, bulk->len));
		STAT(rep->stat.st_bulk_overflows++);
		(void)__rep_send_bulk(env, bulk, flags);
		/*
		 * XXX __rep_send_message...
		 */
		MUTEX_UNLOCK(env, rep->mtx_clientdb);
		return (DB_REP_BULKOVF);
	}
	/*
	 * If this record doesn't fit, send the current buffer.
	 * Sending the buffer will reset the offset, but we will
	 * drop the mutex while sending so we need to keep checking
	 * if we're racing.
	 */
	while (recsize + *(bulk->offp) > bulk->len) {
		RPRINT(env, (env, DB_VERB_REP_MSGS,
	    "bulk_msg: Record %lu (%#lx) doesn't fit.  Send %lu (%#lx) now.",
		    (u_long)recsize, (u_long)recsize,
		    (u_long)bulk->len, (u_long)bulk->len));
		STAT(rep->stat.st_bulk_fills++);
		if ((ret = __rep_send_bulk(env, bulk, flags)) != 0) {
			MUTEX_UNLOCK(env, rep->mtx_clientdb);
			return (ret);
		}
	}

	/*
	 * If we're using throttling, see if we are at the throttling
	 * limit before we do any more work here, by checking if the
	 * call to rep_send_throttle changed the repth->type to the
	 * *_MORE message type.  If the throttling code hits the limit
	 * then we're done here.
	 */
	if (bulk->type == REP_BULK_LOG)
		typemore = REP_LOG_MORE;
	else
		typemore = REP_PAGE_MORE;
	if (repth != NULL) {
		if ((ret = __rep_send_throttle(env,
		    bulk->eid, repth, REP_THROTTLE_ONLY, flags)) != 0) {
			MUTEX_UNLOCK(env, rep->mtx_clientdb);
			return (ret);
		}
		if (repth->type == typemore) {
			VPRINT(env, (env, DB_VERB_REP_MSGS,
			    "bulk_msg: Record %lu (0x%lx) hit throttle limit.",
			    (u_long)recsize, (u_long)recsize));
			MUTEX_UNLOCK(env, rep->mtx_clientdb);
			return (ret);
		}
	}

	/*
	 * Now we own the buffer, and we know our record fits into it.
	 * The buffer is structured with the len, LSN and then the record.
	 * Copy the record into the buffer.  Then if we need to,
	 * send the buffer.
	 */
	p = bulk->addr + *(bulk->offp);
	b_args.len = dbt->size;
	b_args.lsn = *lsn;
	b_args.bulkdata = *dbt;
	/*
	 * If we're the first record, we need to save the first
	 * LSN in the bulk structure.
	 */
	if (*(bulk->offp) == 0)
		bulk->lsn = *lsn;
	if (rep->version < DB_REPVERSION_47) {
		len = 0;
		memcpy(p, &dbt->size, sizeof(dbt->size));
		p += sizeof(dbt->size);
		memcpy(p, lsn, sizeof(DB_LSN));
		p += sizeof(DB_LSN);
		memcpy(p, dbt->data, dbt->size);
		p += dbt->size;
	} else if ((ret = __rep_bulk_marshal(env, &b_args, p,
	    bulk->len, &len)) != 0)
		goto err;
	*(bulk->offp) = (uintptr_t)p + (uintptr_t)len - (uintptr_t)bulk->addr;
	STAT(rep->stat.st_bulk_records++);
	/*
	 * Send the buffer if it is a perm record or a force.
	 */
	if (LF_ISSET(REPCTL_PERM)) {
		VPRINT(env, (env, DB_VERB_REP_MSGS,
		    "bulk_msg: Send buffer after copy due to PERM"));
		ret = __rep_send_bulk(env, bulk, flags);
	}
err:
	MUTEX_UNLOCK(env, rep->mtx_clientdb);
	return (ret);

}

/*
 * __rep_send_bulk --
 *	This function transmits the bulk buffer given.  It assumes the
 * caller holds the REP->mtx_clientdb.  We may release it and reacquire
 * it during this call.  We will return with it held.
 *
 * PUBLIC: int __rep_send_bulk __P((ENV *, REP_BULK *, u_int32_t));
 */
int
__rep_send_bulk(env, bulkp, ctlflags)
	ENV *env;
	REP_BULK *bulkp;
	u_int32_t ctlflags;
{
	DBT dbt;
	DB_REP *db_rep;
	REP *rep;
	int ret;

	/*
	 * If the offset is 0, we're done.  There is nothing to send.
	 */
	if (*(bulkp->offp) == 0)
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;

	/*
	 * Set that this buffer is being actively transmitted.
	 */
	FLD_SET(*(bulkp->flagsp), BULK_XMIT);
	DB_INIT_DBT(dbt, bulkp->addr, *(bulkp->offp));
	MUTEX_UNLOCK(env, rep->mtx_clientdb);
	VPRINT(env, (env, DB_VERB_REP_MSGS,
	    "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size));

	/*
	 * Unlocked the mutex and now send the message.
	 */
	STAT(rep->stat.st_bulk_transfers++);
	if ((ret = __rep_send_message(env,
	    bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, ctlflags, 0)) != 0)
		ret = DB_REP_UNAVAIL;

	MUTEX_LOCK(env, rep->mtx_clientdb);
	/*
	 * Ready the buffer for further records.
	 */
	*(bulkp->offp) = 0;
	FLD_CLR(*(bulkp->flagsp), BULK_XMIT);
	return (ret);
}

/*
 * __rep_bulk_alloc --
 *	This function allocates and initializes an internal bulk buffer.
 * This is used by the master when fulfilling a request for a chunk of
 * log records or a bunch of pages.
 *
 * PUBLIC: int __rep_bulk_alloc __P((ENV *, REP_BULK *, int, uintptr_t *,
 * PUBLIC:    u_int32_t *, u_int32_t));
 */
int
__rep_bulk_alloc(env, bulkp, eid, offp, flagsp, type)
	ENV *env;
	REP_BULK *bulkp;
	int eid;
	uintptr_t *offp;
	u_int32_t *flagsp, type;
{
	int ret;

	memset(bulkp, 0, sizeof(REP_BULK));
	*offp = *flagsp = 0;
	bulkp->len = MEGABYTE;
	if ((ret = __os_malloc(env, bulkp->len, &bulkp->addr)) != 0)
		return (ret);
	bulkp->offp = offp;
	bulkp->type = type;
	bulkp->eid = eid;
	bulkp->flagsp = flagsp;
	return (ret);
}

/*
 * __rep_bulk_free --
 *	This function sends the remainder of the bulk buffer and frees it.
 *
 * PUBLIC: int __rep_bulk_free __P((ENV *, REP_BULK *, u_int32_t));
 */
int
__rep_bulk_free(env, bulkp, flags)
	ENV *env;
	REP_BULK *bulkp;
	u_int32_t flags;
{
	DB_REP *db_rep;
	int ret;

	db_rep = env->rep_handle;

	MUTEX_LOCK(env, db_rep->region->mtx_clientdb);
	ret = __rep_send_bulk(env, bulkp, flags);
	MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb);
	__os_free(env, bulkp->addr);
	return (ret);
}

/*
 * __rep_send_message --
 *	This is a wrapper for sending a message.  It takes care of constructing
 * the control structure and calling the user's specified send function.
 *
 * PUBLIC: int __rep_send_message __P((ENV *, int,
 * PUBLIC:     u_int32_t, DB_LSN *, const DBT *, u_int32_t, u_int32_t));
 */
int
__rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags)
	ENV *env;
	int eid;
	u_int32_t rtype;
	DB_LSN *lsnp;
	const DBT *dbt;
	u_int32_t ctlflags, repflags;
{
	DBT cdbt, scrap_dbt;
	DB_ENV *dbenv;
	DB_LOG *dblp;
	DB_REP *db_rep;
	LOG *lp;
	REP *rep;
	REP_46_CONTROL cntrl46;
	REP_OLD_CONTROL ocntrl;
	__rep_control_args cntrl;
	db_timespec msg_time;
	int ret;
	u_int32_t myflags;
	u_int8_t buf[__REP_CONTROL_SIZE];
	size_t len;

	dbenv = env->dbenv;
	db_rep = env->rep_handle;
	rep = db_rep->region;
	dblp = env->lg_handle;
	lp = dblp->reginfo.primary;
	ret = 0;

#if defined(DEBUG_ROP) || defined(DEBUG_WOP)
	if (db_rep->send == NULL)
		return (0);
#endif

	/* Set up control structure. */
	memset(&cntrl, 0, sizeof(cntrl));
	memset(&ocntrl, 0, sizeof(ocntrl));
	memset(&cntrl46, 0, sizeof(cntrl46));
	if (lsnp == NULL)
		ZERO_LSN(cntrl.lsn);
	else
		cntrl.lsn = *lsnp;
	/*
	 * Set the rectype based on the version we need to speak.
	 */
	if (rep->version == DB_REPVERSION)
		cntrl.rectype = rtype;
	else if (rep->version < DB_REPVERSION) {
		cntrl.rectype = __rep_msg_to_old(rep->version, rtype);
		VPRINT(env, (env, DB_VERB_REP_MSGS,
		    "rep_send_msg: rtype %lu to version %lu record %lu.",
		    (u_long)rtype, (u_long)rep->version,
		    (u_long)cntrl.rectype));
		if (cntrl.rectype == REP_INVALID)
			return (ret);
	} else {
		__db_errx(env,
    "rep_send_message: Unknown rep version %lu, my version %lu",
		    (u_long)rep->version, (u_long)DB_REPVERSION);
		return (__env_panic(env, EINVAL));
	}
	cntrl.flags = ctlflags;
	cntrl.rep_version = rep->version;
	cntrl.log_version = lp->persist.version;
	cntrl.gen = rep->gen;

	/* Don't assume the send function will be tolerant of NULL records. */
	if (dbt == NULL) {
		memset(&scrap_dbt, 0, sizeof(DBT));
		dbt = &scrap_dbt;
	}

	/*
	 * There are several types of records: commit and checkpoint records
	 * that affect database durability, regular log records that might
	 * be buffered on the master before being transmitted, and control
	 * messages which don't require the guarantees of permanency, but
	 * should not be buffered.
	 *
	 * There are request records that can be sent anywhere, and there
	 * are rerequest records that the app might want to send to the master.
	 */
	myflags = repflags;
	if (FLD_ISSET(ctlflags, REPCTL_PERM)) {
		/*
		 * If we have the API locked out, this must be one of our own
		 * system transactions.  Don't set the PERM flag in that case:
		 * we don't care, plus we don't want to delay waiting for ack.
		 */
		if (!FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_API))
			myflags |= DB_REP_PERMANENT;
	} else if (rtype != REP_LOG || FLD_ISSET(ctlflags, REPCTL_RESEND))
		myflags |= DB_REP_NOBUFFER;

	/*
	 * Let everyone know if we've been in an established group.
	 */
	if (F_ISSET(rep, REP_F_GROUP_ESTD))
		F_SET(&cntrl, REPCTL_GROUP_ESTD);

	/*
	 * If we are a master sending a perm record, then set the
	 * REPCTL_LEASE flag to have the client reply.  Also set
	 * the start time that the client will echo back to us.
	 *
	 * !!! If we are a master, using leases, we had better not be
	 * sending to an older version.
	 */
	if (IS_REP_MASTER(env) && IS_USING_LEASES(env) &&
	    FLD_ISSET(ctlflags, REPCTL_PERM)) {
		F_SET(&cntrl, REPCTL_LEASE);
		DB_ASSERT(env, rep->version == DB_REPVERSION);
		__os_gettime(env, &msg_time, 1);
		cntrl.msg_sec = (u_int32_t)msg_time.tv_sec;
		cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec;
	}

	REP_PRINT_MESSAGE(env, eid, &cntrl, "rep_send_message", myflags);
#ifdef REP_DIAGNOSTIC
	if (FLD_ISSET(
	    env->dbenv->verbose, DB_VERB_REP_MSGS) && rtype == REP_LOG)
		__rep_print_logmsg(env, dbt, lsnp);
#endif

	/*
	 * If DB_REP_PERMANENT is set, the LSN better be non-zero.
	 */
	DB_ASSERT(env, !FLD_ISSET(myflags, DB_REP_PERMANENT) ||
	    !IS_ZERO_LSN(cntrl.lsn));

	/*
	 * If we're talking to an old version, send an old control structure.
	 */
	memset(&cdbt, 0, sizeof(cdbt));
	if (rep->version <= DB_REPVERSION_45) {
		if (rep->version == DB_REPVERSION_45 &&
		    F_ISSET(&cntrl, REPCTL_INIT)) {
			F_CLR(&cntrl, REPCTL_INIT);
			F_SET(&cntrl, REPCTL_INIT_45);
		}
		ocntrl.rep_version = cntrl.rep_version;
		ocntrl.log_version = cntrl.log_version;
		ocntrl.lsn = cntrl.lsn;
		ocntrl.rectype = cntrl.rectype;
		ocntrl.gen = cntrl.gen;
		ocntrl.flags = cntrl.flags;
		cdbt.data = &ocntrl;
		cdbt.size = sizeof(ocntrl);
	} else if (rep->version == DB_REPVERSION_46) {
		cntrl46.rep_version = cntrl.rep_version;
		cntrl46.log_version = cntrl.log_version;
		cntrl46.lsn = cntrl.lsn;
		cntrl46.rectype = cntrl.rectype;
		cntrl46.gen = cntrl.gen;
		cntrl46.msg_time.tv_sec = (time_t)cntrl.msg_sec;
		cntrl46.msg_time.tv_nsec = (long)cntrl.msg_nsec;
		cntrl46.flags = cntrl.flags;
		cdbt.data = &cntrl46;
		cdbt.size = sizeof(cntrl46);
	} else {
		(void)__rep_control_marshal(env, &cntrl, buf,
		    __REP_CONTROL_SIZE, &len);
		DB_INIT_DBT(cdbt, buf, len);
	}

	/*
	 * We set the LSN above to something valid.  Give the master the
	 * actual LSN so that they can coordinate with permanent records from
	 * the client if they want to.
	 *
	 * !!! Even though we marshalled the control message for transmission,
	 * give the transport function the real LSN.
	 */
	ret = db_rep->send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags);

	/*
	 * We don't hold the rep lock, so this could miscount if we race.
	 * I don't think it's worth grabbing the mutex for that bit of
	 * extra accuracy.
	 */
	if (ret != 0) {
		RPRINT(env, (env, DB_VERB_REP_MSGS,
		    "rep_send_function returned: %d", ret));
#ifdef HAVE_STATISTICS
		rep->stat.st_msgs_send_failures++;
	} else
		rep->stat.st_msgs_sent++;
#else
	}
#endif
	return (ret);
}

#ifdef REP_DIAGNOSTIC
/*
 * __rep_print_logmsg --
 *	This is a debugging routine for printing out log records that
 * we are about to transmit to a client.
 */
static void
__rep_print_logmsg(env, logdbt, lsnp)
	ENV *env;
	const DBT *logdbt;
	DB_LSN *lsnp;
{
	static int first = 1;
	static DB_DISTAB dtab;

	if (first) {
		first = 0;

		(void)__bam_init_print(env, &dtab);
		(void)__crdel_init_print(env, &dtab);
		(void)__db_init_print(env, &dtab);
		(void)__dbreg_init_print(env, &dtab);
		(void)__fop_init_print(env, &dtab);
		(void)__ham_init_print(env, &dtab);
		(void)__qam_init_print(env, &dtab);
		(void)__txn_init_print(env, &dtab);
	}

	(void)__db_dispatch(
	    env, &dtab, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
}
#endif

/*
 * __rep_new_master --
 *	Called after a master election to sync back up with a new master.
 * It's possible that we already know of this new master in which case
 * we don't need to do anything.
 *
 * This is written assuming that this message came from the master; we
 * need to enforce that in __rep_process_record, but right now, we have
 * no way to identify the master.
 *
 * PUBLIC: int __rep_new_master __P((ENV *, __rep_control_args *, int));
 */
int
__rep_new_master(env, cntrl, eid)
	ENV *env;
	__rep_control_args *cntrl;
	int eid;
{
	DBT dbt;
	DB_LOG *dblp;
	DB_LOGC *logc;
	DB_LSN first_lsn, lsn;
	DB_REP *db_rep;
	DB_THREAD_INFO *ip;
	LOG *lp;
	REGENV *renv;
	REGINFO *infop;
	REP *rep;
	db_timeout_t lease_to;
	u_int32_t unused, vers;
	int change, do_req, lockout_msg, ret, t_ret;

	db_rep = env->rep_handle;
	rep = db_rep->region;
	dblp = env->lg_handle;
	lp = dblp->reginfo.primary;
	ret = 0;
	logc = NULL;
	lockout_msg = 0;
	REP_SYSTEM_LOCK(env);
	change = rep->gen != cntrl->gen || rep->master_id != eid;
	/*
	 * If we're hearing from a current or new master, then we
	 * want to clear EPHASE0 in case this site is waiting to
	 * hear from the master.
	 */
	FLD_CLR(rep->elect_flags, REP_E_PHASE0);
	if (change) {
		/*
		 * If we are already locking out others, we're either
		 * in the middle of sync-up recovery or internal init
		 * when this newmaster comes in (we also lockout in
		 * rep_start, but we cannot be racing that because we
		 * don't allow rep_proc_msg when rep_start is going on).
		 *
		 * We're about to become the client of a new master.  Since we
		 * want to be able to sync with the new master as quickly as
		 * possible, interrupt any STARTSYNC from the old master.  The
		 * new master may need to rely on acks from us and the old
		 * STARTSYNC is now irrelevant.
		 *
		 * Note that, conveniently, the "lockout_msg" flag defines the
		 * section of this code path during which both "message lockout"
		 * and "memp sync interrupt" are in effect.
		 */
		if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG))
			goto lckout;

		if ((ret = __rep_lockout_msg(env, rep, 1)) != 0)
			goto errlck;

		(void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
		lockout_msg = 1;
		/*
		 * We must wait any remaining lease time before accepting
		 * this new master.  This must be after the lockout above
		 * so that no new message can be processed and re-grant
		 * the lease out from under us.
		 */
		if (IS_USING_LEASES(env) &&
		    ((lease_to = __rep_lease_waittime(env)) != 0)) {
			REP_SYSTEM_UNLOCK(env);
			__os_yield(env, 0, (u_long)lease_to);
			REP_SYSTEM_LOCK(env);
			F_SET(rep, REP_F_LEASE_EXPIRED);
		}

		vers = lp->persist.version;
		if (cntrl->log_version != vers) {
			/*
			 * Set everything up to the lower version.  If we're
			 * going to be upgrading to the latest version that
			 * can happen automatically as we process later log
			 * records.  We likely want to sync to earlier version.
			 */
			DB_ASSERT(env, vers != 0);
			if (cntrl->log_version < vers)
				vers = cntrl->log_version;
			RPRINT(env, (env, DB_VERB_REP_MISC,
			    "newmaster: Setting log version to %d",vers));
			__log_set_version(env, vers);
			if ((ret = __env_init_rec(env, vers)) != 0)
				goto errlck;
		}

		REP_SYSTEM_UNLOCK(env);

		MUTEX_LOCK(env, rep->mtx_clientdb);
		__os_gettime(env, &lp->rcvd_ts, 1);
		lp->wait_ts = rep->request_gap;
		ZERO_LSN(lp->verify_lsn);
		ZERO_LSN(lp->prev_ckp);
		ZERO_LSN(lp->waiting_lsn);
		ZERO_LSN(lp->max_wait_lsn);
		/*
		 * Open if we need to, in preparation for the truncate
		 * we'll do in a moment.
		 */
		if (db_rep->rep_db == NULL &&
		    (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
			MUTEX_UNLOCK(env, rep->mtx_clientdb);
			goto err;
		}

		/*
		 * If we were in the middle of an internal initialization
		 * and we've discovered a new master instead, clean up
		 * our old internal init information.  We need to clean
		 * up any flags and unlock our lockout.
		 */
		REP_SYSTEM_LOCK(env);
		if (ISSET_LOCKOUT_BDB(rep)) {
			ret = __rep_init_cleanup(env, rep, DB_FORCE);
			/*
			 * Note that if an in-progress internal init was indeed
			 * "cleaned up", clearing these flags now will allow the
			 * application to see a completely empty database
			 * environment for a moment (until the master responds
			 * to our ALL_REQ).
			 */
			F_CLR(rep, REP_F_ABBREVIATED);
			CLR_RECOVERY_SETTINGS(rep);
		}
		MUTEX_UNLOCK(env, rep->mtx_clientdb);
		if (ret != 0) {
			/* TODO: consider add'l error recovery steps. */
			goto errlck;
		}
		ENV_GET_THREAD_INFO(env, ip);
		if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused))
		    != 0)
			goto errlck;
		STAT(rep->stat.st_log_queued = 0);

		/*
		 * This needs to be performed under message lockout
		 * if we're actually changing master.
		 */
		__rep_elect_done(env, rep);
		RPRINT(env, (env, DB_VERB_REP_MISC,
		    "Updating gen from %lu to %lu from master %d",
		    (u_long)rep->gen, (u_long)cntrl->gen, eid));
		SET_GEN(cntrl->gen);
		if ((ret = __rep_notify_threads(env, AWAIT_GEN)) != 0)
			goto errlck;
		(void)__rep_write_gen(env, rep, rep->gen);
		if (rep->egen <= rep->gen)
			rep->egen = rep->gen + 1;
		rep->master_id = eid;
		STAT(rep->stat.st_master_changes++);
		rep->stat.st_startup_complete = 0;
		rep->version = cntrl->rep_version;
		RPRINT(env, (env, DB_VERB_REP_MISC,
		    "egen: %lu. rep version %lu",
		    (u_long)rep->egen, (u_long)rep->version));

		/*
		 * If we're delaying client sync-up, we know we have a
		 * new/changed master now, set flag indicating we are
		 * actively delaying.
		 */
		if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT))
			F_SET(rep, REP_F_DELAY);
		if ((ret = __rep_lockout_archive(env, rep)) != 0)
			goto errlck;
		rep->sync_state = SYNC_VERIFY;
		FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
		(void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
		lockout_msg = 0;
	} else
		__rep_elect_done(env, rep);
	REP_SYSTEM_UNLOCK(env);

	MUTEX_LOCK(env, rep->mtx_clientdb);
	lsn = lp->ready_lsn;

	if (!change) {
		ret = 0;
		do_req = __rep_check_doreq(env, rep);
		MUTEX_UNLOCK(env, rep->mtx_clientdb);
		/*
		 * If there wasn't a change, we might still have some
		 * catching up or verification to do.
		 */
		if (do_req &&
		    (rep->sync_state != SYNC_OFF ||
		    LOG_COMPARE(&lsn, &cntrl->lsn) < 0)) {
			ret = __rep_resend_req(env, 0);
			if (ret != 0)
				RPRINT(env, (env, DB_VERB_REP_MISC,
				    "resend_req ret is %lu", (u_long)ret));
		}
		/*
		 * If we're not in one of the recovery modes, we need to
		 * clear the ARCHIVE flag.  Elections set ARCHIVE
		 * and if we called an election and found the same
		 * master, we need to clear ARCHIVE here.
		 */
		if (rep->sync_state == SYNC_OFF) {
			REP_SYSTEM_LOCK(env);
			FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE);
			REP_SYSTEM_UNLOCK(env);
		}
		return (ret);
	}
	MUTEX_UNLOCK(env, rep->mtx_clientdb);

	/*
	 * If the master changed, we need to start the process of
	 * figuring out what our last valid log record is.  However,
	 * if both the master and we agree that the max LSN is 0,0,
	 * then there is no recovery to be done.  If we are at 0 and
	 * the master is not, then we just need to request all the log
	 * records from the master.
	 */
	if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
		if ((ret = __rep_newmaster_empty(env, eid)) != 0)
			goto err;
		goto newmaster_complete;
	}

	memset(&dbt, 0, sizeof(dbt));
	/*
	 * If this client is farther ahead on the log file than the master, see
	 * if there is any overlap in the logs.  If not, the client is too
	 * far ahead of the master and the client will start over.
	 */
	if (cntrl->lsn.file < lsn.file) {
		if ((ret = __log_cursor(env, &logc)) != 0)
			goto err;
		ret = __logc_get(logc, &first_lsn, &dbt, DB_FIRST);
		if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
			ret = t_ret;
		if (ret == DB_NOTFOUND)
			goto notfound;
		else if (ret != 0)
			goto err;
		if (cntrl->lsn.file < first_lsn.file)
			goto notfound;
	}
	if ((ret = __log_cursor(env, &logc)) != 0)
		goto err;
	ret = __rep_log_backup(env, rep, logc, &lsn);
	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
		ret = t_ret;
	if (ret == DB_NOTFOUND)
		goto notfound;
	else if (ret != 0)
		goto err;

	/*
	 * Finally, we have a record to ask for.
	 */
	MUTEX_LOCK(env, rep->mtx_clientdb);
	lp->verify_lsn = lsn;
	__os_gettime(env, &lp->rcvd_ts, 1);
	lp->wait_ts = rep->request_gap;
	MUTEX_UNLOCK(env, rep->mtx_clientdb);
	if (!F_ISSET(rep, REP_F_DELAY))
		(void)__rep_send_message(env,
		    eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE);
	goto newmaster_complete;

err:	/*
	 * If we failed, we need to clear the flags we may have set above
	 * because we're not going to be setting the verify_lsn.
	 */
	REP_SYSTEM_LOCK(env);
errlck:	if (lockout_msg) {
		FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
		(void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
	}
	F_CLR(rep, REP_F_DELAY);
	CLR_RECOVERY_SETTINGS(rep);
lckout:	REP_SYSTEM_UNLOCK(env);
	return (ret);

notfound:
	/*
	 * If we don't have an identification record, we still
	 * might have some log records but we're discarding them
	 * to sync up with the master from the start.
	 * Therefore, truncate our log and treat it as if it
	 * were empty.  In-memory logs can't be completely
	 * zeroed using __log_vtruncate, so just zero them out.
	 */
	RPRINT(env, (env, DB_VERB_REP_MISC,
	    "No commit or ckp found.  Truncate log."));
	if (lp->db_log_inmemory) {
		ZERO_LSN(lsn);
		ret = __log_zero(env, &lsn);
	} else {
		INIT_LSN(lsn);
		ret = __log_vtruncate(env, &lsn, &lsn, NULL);
	}
	if (ret != 0 && ret != DB_NOTFOUND)
		return (ret);
	infop = env->reginfo;
	renv = infop->primary;
	REP_SYSTEM_LOCK(env);
	(void)time(&renv->rep_timestamp);
	REP_SYSTEM_UNLOCK(env);
	if ((ret = __rep_newmaster_empty(env, eid)) != 0)
		goto err;
newmaster_complete:
	return (DB_REP_NEWMASTER);
}

/*
 * __rep_newmaster_empty
 *      Handle the case of a NEWMASTER message received when we have an empty
 * log.  This requires internal init.  If we can't do that because
 * AUTOINIT off, return JOIN_FAILURE.  If F_DELAY is in effect, don't even
 * consider AUTOINIT yet, because they could change it before rep_sync call.
 */
static int
__rep_newmaster_empty(env, eid)
	ENV *env;
	int eid;
{
	DB_REP *db_rep;
	LOG *lp;
	REP *rep;
	int msg, ret;

	db_rep = env->rep_handle;
	rep = db_rep->region;
	lp = env->lg_handle->reginfo.primary;
	msg = ret = 0;

	MUTEX_LOCK(env, rep->mtx_clientdb);
	REP_SYSTEM_LOCK(env);
	lp->wait_ts = rep->request_gap;

	/* Usual case is to skip to UPDATE state; we may revise this below. */
	rep->sync_state = SYNC_UPDATE;

	if (F_ISSET(rep, REP_F_DELAY)) {
		/*
		 * Having properly set up wait_ts for later, nothing more to
		 * do now.
		 */
	} else if (!FLD_ISSET(rep->config, REP_C_AUTOINIT)) {
		FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE);
		CLR_RECOVERY_SETTINGS(rep);
		ret = DB_REP_JOIN_FAILURE;
	} else {
		/* Normal case: not DELAY but AUTOINIT. */
		msg = 1;
	}
	REP_SYSTEM_UNLOCK(env);
	MUTEX_UNLOCK(env, rep->mtx_clientdb);

	if (msg)
		(void)__rep_send_message(env, eid, REP_UPDATE_REQ,
		    NULL, NULL, 0, 0);
	return (ret);
}

/*
 * __rep_elect_done
 *	Clear all election information for this site.  Assumes the
 *	caller hold the region mutex.
 *
 * PUBLIC: void __rep_elect_done __P((ENV *, REP *));
 */
void
__rep_elect_done(env, rep)
	ENV *env;
	REP *rep;
{
	int inelect;
	db_timespec endtime;

	inelect = IN_ELECTION(rep);
	FLD_CLR(rep->elect_flags, REP_E_PHASE1 | REP_E_PHASE2 | REP_E_TALLY);

	rep->sites = 0;
	rep->votes = 0;
	if (inelect) {
		if (timespecisset(&rep->etime)) {
			__os_gettime(env, &endtime, 1);
			timespecsub(&endtime, &rep->etime);
#ifdef HAVE_STATISTICS
			rep->stat.st_election_sec = (u_int32_t)endtime.tv_sec;
			rep->stat.st_election_usec = (u_int32_t)
			    (endtime.tv_nsec / NS_PER_US);
#endif
			RPRINT(env, (env, DB_VERB_REP_ELECT,
			    "Election finished in %lu.%09lu sec",
			    (u_long)endtime.tv_sec, (u_long)endtime.tv_nsec));
			timespecclear(&rep->etime);
		}
		rep->egen++;
	}
	RPRINT(env, (env, DB_VERB_REP_ELECT,
	    "Election done; egen %lu", (u_long)rep->egen));
}

/*
 * __env_rep_enter --
 *
 * Check if we are in the middle of replication initialization and/or
 * recovery, and if so, disallow operations.  If operations are allowed,
 * increment handle-counts, so that we do not start recovery while we
 * are operating in the library.
 *
 * PUBLIC: int __env_rep_enter __P((ENV *, int));
 */
int
__env_rep_enter(env, checklock)
	ENV *env;
	int checklock;
{
	DB_REP *db_rep;
	REGENV *renv;
	REGINFO *infop;
	REP *rep;
	int cnt, ret;
	time_t	timestamp;

	/* Check if locks have been globally turned off. */
	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;

	infop = env->reginfo;
	renv = infop->primary;
	if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
		(void)time(&timestamp);
		TIMESTAMP_CHECK(env, timestamp, renv);
		/*
		 * Check if we're still locked out after checking
		 * the timestamp.
		 */
		if (F_ISSET(renv, DB_REGENV_REPLOCKED))
			return (EINVAL);
	}

	REP_SYSTEM_LOCK(env);
	for (cnt = 0; FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_API);) {
		REP_SYSTEM_UNLOCK(env);
		/*
		 * We're spinning - environment may be hung. Check if
		 * recovery has been initiated.
		 */
		PANIC_CHECK(env);
		if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
			__db_errx(env,
    "Operation locked out.  Waiting for replication lockout to complete");
			return (DB_REP_LOCKOUT);
		}
		__os_yield(env, 1, 0);
		if (++cnt % 60 == 0 &&
		    (ret = __rep_show_progress(env,
		    "DB_ENV handle", cnt / 60)) != 0)
			return (ret);
		REP_SYSTEM_LOCK(env);
	}
	rep->handle_cnt++;
	REP_SYSTEM_UNLOCK(env);

	return (0);
}

static int
__rep_show_progress(env, which, mins)
	ENV *env;
	const char *which;
	int mins;
{
	DB_LOG *dblp;
	LOG *lp;
	REP *rep;
	DB_LSN ready_lsn;

	rep = env->rep_handle->region;
	dblp = env->lg_handle;
	lp = dblp == NULL ? NULL : dblp->reginfo.primary;

#define	WAITING_MSG "%s waiting %d minutes for replication lockout to complete"
#define	WAITING_ARGS WAITING_MSG, which, mins

	__db_errx(env, WAITING_ARGS);
	RPRINT(env, (env, DB_VERB_REP_SYNC, WAITING_ARGS));

	if (lp == NULL)
		ZERO_LSN(ready_lsn);
	else {
		MUTEX_LOCK(env, rep->mtx_clientdb);
		ready_lsn = lp->ready_lsn;
		MUTEX_UNLOCK(env, rep->mtx_clientdb);
	}
	REP_SYSTEM_LOCK(env);
	switch (rep->sync_state) {
	case SYNC_PAGE:
#define	PAGE_MSG "SYNC_PAGE: files %lu/%lu; pages %lu (%lu next)"
#define	PAGE_ARGS (u_long)rep->curfile, (u_long)rep->nfiles, \
		    (u_long)rep->npages, (u_long)rep->ready_pg
		__db_errx(env, PAGE_MSG, PAGE_ARGS);
		RPRINT(env, (env, DB_VERB_REP_SYNC, PAGE_MSG, PAGE_ARGS));
		break;
	case SYNC_LOG:
#define	LSN_ARG(lsn) (u_long)(lsn).file, (u_long)(lsn).offset
#define	LOG_LSN_ARGS LSN_ARG(ready_lsn),				\
	    LSN_ARG(rep->first_lsn), LSN_ARG(rep->last_lsn)
#ifdef	HAVE_STATISTICS
#define	LOG_MSG "SYNC_LOG: thru [%lu][%lu] from [%lu][%lu]/[%lu][%lu] (%lu queued)"
#define	LOG_ARGS LOG_LSN_ARGS, (u_long)rep->stat.st_log_queued
#else
#define	LOG_MSG "SYNC_LOG: thru [%lu][%lu] from [%lu][%lu]/[%lu][%lu]"
#define	LOG_ARGS LOG_LSN_ARGS
#endif
		__db_errx(env, LOG_MSG, LOG_ARGS);
		RPRINT(env, (env, DB_VERB_REP_SYNC, LOG_MSG, LOG_ARGS));
		break;
	default:
		RPRINT(env, (env, DB_VERB_REP_SYNC,
		    "sync state %d", (int)rep->sync_state));
		break;
	}
	REP_SYSTEM_UNLOCK(env);
	return (0);
}

/*
 * __env_db_rep_exit --
 *
 *	Decrement handle count upon routine exit.
 *
 * PUBLIC: int __env_db_rep_exit __P((ENV *));
 */
int
__env_db_rep_exit(env)
	ENV *env;
{
	DB_REP *db_rep;
	REP *rep;

	/* Check if locks have been globally turned off. */
	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;

	REP_SYSTEM_LOCK(env);
	rep->handle_cnt--;
	REP_SYSTEM_UNLOCK(env);

	return (0);
}

/*
 * __db_rep_enter --
 *	Called in replicated environments to keep track of in-use handles
 * and prevent any concurrent operation during recovery.  If checkgen is
 * non-zero, then we verify that the dbp has the same handle as the env.
 *
 * If return_now is non-zero, we'll return DB_DEADLOCK immediately, else we'll
 * sleep before returning DB_DEADLOCK.  Without the sleep, it is likely
 * the application will immediately try again and could reach a retry
 * limit before replication has a chance to finish.  The sleep increases
 * the probability that an application retry will succeed.
 *
 * Typically calls with txns set return_now so that we return immediately.
 * We want to return immediately because we want the txn to abort ASAP
 * so that the lockout can proceed.
 *
 * PUBLIC: int __db_rep_enter __P((DB *, int, int, int));
 */
int
__db_rep_enter(dbp, checkgen, checklock, return_now)
	DB *dbp;
	int checkgen, checklock, return_now;
{
	DB_REP *db_rep;
	ENV *env;
	REGENV *renv;
	REGINFO *infop;
	REP *rep;
	time_t	timestamp;

	env = dbp->env;
	/* Check if locks have been globally turned off. */
	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;
	infop = env->reginfo;
	renv = infop->primary;

	if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
		(void)time(&timestamp);
		TIMESTAMP_CHECK(env, timestamp, renv);
		/*
		 * Check if we're still locked out after checking
		 * the timestamp.
		 */
		if (F_ISSET(renv, DB_REGENV_REPLOCKED))
			return (EINVAL);
	}
	REP_SYSTEM_LOCK(env);
	/*
	 * !!!
	 * Note, we are checking REP_LOCKOUT_OP, but we are
	 * incrementing rep->handle_cnt.  That seems like a mismatch,
	 * but the intention is to return DEADLOCK to the application
	 * which will cause them to abort the txn quickly and allow
	 * the lockout to proceed.
	 *
	 * The correctness of doing this depends on the fact that
	 * lockout of the API always sets REP_LOCKOUT_OP first.
	 */
	if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_OP)) {
		REP_SYSTEM_UNLOCK(env);
		if (!return_now)
			__os_yield(env, 5, 0);
		return (DB_LOCK_DEADLOCK);
	}

	if (checkgen && dbp->timestamp != renv->rep_timestamp) {
		REP_SYSTEM_UNLOCK(env);
		return (DB_REP_HANDLE_DEAD);
	}
	rep->handle_cnt++;
	REP_SYSTEM_UNLOCK(env);

	return (0);
}

/*
 * Check for permission to increment handle_cnt, and do so if possible.  Used in
 * cases where we want to count an operation in the context of a transaction,
 * but the operation does not involve a DB handle.
 *
 * PUBLIC: int __op_handle_enter __P((ENV *));
 */
int
__op_handle_enter(env)
	ENV *env;
{
	REP *rep;
	int ret;

	rep = env->rep_handle->region;
	REP_SYSTEM_LOCK(env);
	if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_OP))
		ret = DB_LOCK_DEADLOCK;
	else {
		rep->handle_cnt++;
		ret = 0;
	}
	REP_SYSTEM_UNLOCK(env);
	return (ret);
}

/*
 * __op_rep_enter --
 *
 *	Check if we are in the middle of replication initialization and/or
 * recovery, and if so, disallow new multi-step operations, such as
 * transaction and memp gets.  If operations are allowed,
 * increment the op_cnt, so that we do not start recovery while we have
 * active operations.
 *
 * PUBLIC: int __op_rep_enter __P((ENV *, int));
 */
int
__op_rep_enter(env, local_nowait)
	ENV *env;
	int local_nowait;
{
	DB_REP *db_rep;
	REP *rep;
	int cnt, ret;

	/* Check if locks have been globally turned off. */
	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;

	REP_SYSTEM_LOCK(env);
	for (cnt = 0; FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_OP);) {
		REP_SYSTEM_UNLOCK(env);
		/*
		 * We're spinning - environment may be hung.  Check if
		 * recovery has been initiated.
		 */
		PANIC_CHECK(env);
		if (local_nowait)
			return (DB_REP_LOCKOUT);
		if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
			__db_errx(env,
    "Operation locked out.  Waiting for replication lockout to complete");
			return (DB_REP_LOCKOUT);
		}
		__os_yield(env, 5, 0);
		cnt += 5;
		if (++cnt % 60 == 0 &&
		    (ret = __rep_show_progress(env,
		    "__op_rep_enter", cnt / 60)) != 0)
			return (ret);
		REP_SYSTEM_LOCK(env);
	}
	rep->op_cnt++;
	REP_SYSTEM_UNLOCK(env);

	return (0);
}

/*
 * __op_rep_exit --
 *
 *	Decrement op count upon transaction commit/abort/discard or
 *	memp_fput.
 *
 * PUBLIC: int __op_rep_exit __P((ENV *));
 */
int
__op_rep_exit(env)
	ENV *env;
{
	DB_REP *db_rep;
	REP *rep;

	/* Check if locks have been globally turned off. */
	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;

	REP_SYSTEM_LOCK(env);
	DB_ASSERT(env, rep->op_cnt > 0);
	rep->op_cnt--;
	REP_SYSTEM_UNLOCK(env);

	return (0);
}

/*
 * __archive_rep_enter
 *	Used by log_archive to determine if it is okay to remove
 * log files.
 *
 * PUBLIC: int __archive_rep_enter __P((ENV *));
 */
int
__archive_rep_enter(env)
	ENV *env;
{
	DB_REP *db_rep;
	REGENV *renv;
	REGINFO *infop;
	REP *rep;
	time_t timestamp;
	int ret;

	ret = 0;
	infop = env->reginfo;
	renv = infop->primary;

	/*
	 * This is tested before REP_ON below because we always need
	 * to obey if any replication process has disabled archiving.
	 * Everything is in the environment region that we need here.
	 */
	if (F_ISSET(renv, DB_REGENV_REPLOCKED)) {
		(void)time(&timestamp);
		TIMESTAMP_CHECK(env, timestamp, renv);
		/*
		 * Check if we're still locked out after checking
		 * the timestamp.
		 */
		if (F_ISSET(renv, DB_REGENV_REPLOCKED))
			return (DB_REP_LOCKOUT);
	}

	if (!REP_ON(env))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;
	REP_SYSTEM_LOCK(env);
	if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_ARCHIVE))
		ret = DB_REP_LOCKOUT;
	else
		rep->arch_th++;
	REP_SYSTEM_UNLOCK(env);
	return (ret);
}

/*
 * __archive_rep_exit
 *	Clean up accounting for log archive threads.
 *
 * PUBLIC: int __archive_rep_exit __P((ENV *));
 */
int
__archive_rep_exit(env)
	ENV *env;
{
	DB_REP *db_rep;
	REP *rep;

	if (!REP_ON(env))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;
	REP_SYSTEM_LOCK(env);
	rep->arch_th--;
	REP_SYSTEM_UNLOCK(env);
	return (0);
}

/*
 * __rep_lockout_archive --
 *	Coordinate with other threads archiving log files so that
 *	we can run and know that no log files will be removed out
 *	from underneath us.
 *	Assumes the caller holds the region mutex.
 *
 * PUBLIC: int __rep_lockout_archive __P((ENV *, REP *));
 */
int
__rep_lockout_archive(env, rep)
	ENV *env;
	REP *rep;
{
	return (__rep_lockout_int(env, rep, &rep->arch_th, 0,
	    "arch_th", REP_LOCKOUT_ARCHIVE));
}

/*
 * __rep_lockout_api --
 *	Coordinate with other threads in the library and active txns so
 *	that we can run single-threaded, for recovery or internal backup.
 *	Assumes the caller holds the region mutex.
 *
 * PUBLIC: int __rep_lockout_api __P((ENV *, REP *));
 */
int
__rep_lockout_api(env, rep)
	ENV *env;
	REP *rep;
{
	int ret;

	/*
	 * We must drain long-running operations first.  We check
	 * REP_LOCKOUT_OP in __db_rep_enter in order to allow them
	 * to abort existing txns quickly.  Therefore, we must
	 * always lockout REP_LOCKOUT_OP first, then REP_LOCKOUT_API.
	 */
	if ((ret = __rep_lockout_int(env, rep, &rep->op_cnt, 0,
	    "op_cnt", REP_LOCKOUT_OP)) != 0)
		return (ret);
	if ((ret = __rep_lockout_int(env, rep, &rep->handle_cnt, 0,
	    "handle_cnt", REP_LOCKOUT_API)) != 0)
		FLD_CLR(rep->lockout_flags, REP_LOCKOUT_OP);
	return (ret);
}

/*
 * __rep_lockout_apply --
 *	Coordinate with other threads processing messages so that
 *	we can run single-threaded and know that no incoming
 *	message can apply new log records.
 *	This call should be short-term covering a specific critical
 *	operation where we need to make sure no new records change
 *	the log.  Currently used to coordinate with elections.
 *	Assumes the caller holds the region mutex.
 *
 * PUBLIC: int __rep_lockout_apply __P((ENV *, REP *, u_int32_t));
 */
int
__rep_lockout_apply(env, rep, apply_th)
	ENV *env;
	REP *rep;
	u_int32_t apply_th;
{
	return (__rep_lockout_int(env, rep, &rep->apply_th, apply_th,
	    "apply_th", REP_LOCKOUT_APPLY));
}

/*
 * __rep_lockout_msg --
 *	Coordinate with other threads processing messages so that
 *	we can run single-threaded and know that no incoming
 *	message can change the world (i.e., like a NEWMASTER message).
 *	This call should be short-term covering a specific critical
 *	operation where we need to make sure no new messages arrive
 *	in the middle and all message threads are out before we start it.
 *	Assumes the caller holds the region mutex.
 *
 * PUBLIC: int __rep_lockout_msg __P((ENV *, REP *, u_int32_t));
 */
int
__rep_lockout_msg(env, rep, msg_th)
	ENV *env;
	REP *rep;
	u_int32_t msg_th;
{
	return (__rep_lockout_int(env, rep, &rep->msg_th, msg_th,
	    "msg_th", REP_LOCKOUT_MSG));
}

/*
 * __rep_lockout_int --
 *	Internal common code for locking out and coordinating
 *	with other areas of the code.
 *	Assumes the caller holds the region mutex.
 *
 */
static int
__rep_lockout_int(env, rep, fieldp, field_val, msg, lockout_flag)
	ENV *env;
	REP *rep;
	u_int32_t *fieldp;
	const char *msg;
	u_int32_t field_val, lockout_flag;
{
	int ret, wait_cnt;

	FLD_SET(rep->lockout_flags, lockout_flag);
	for (wait_cnt = 0; *fieldp > field_val;) {
		if ((ret = __rep_notify_threads(env, LOCKOUT)) != 0)
			return (ret);
		REP_SYSTEM_UNLOCK(env);
		/* We're spinning - environment may be hung.  Check if
		 * recovery has been initiated.
		 */
		PANIC_CHECK(env);
		__os_yield(env, 1, 0);
#ifdef DIAGNOSTIC
		if (wait_cnt == 5) {
			RPRINT(env, (env, DB_VERB_REP_MISC,
			    "Waiting for %s (%lu) to complete lockout to %lu",
			    msg, (u_long)*fieldp, (u_long)field_val));
			__db_errx(env,
"Waiting for %s (%lu) to complete replication lockout",
			msg, (u_long)*fieldp);
		}
		if (++wait_cnt % 60 == 0)
			__db_errx(env,
"Waiting for %s (%lu) to complete replication lockout for %d minutes",
			msg, (u_long)*fieldp, wait_cnt / 60);
#endif
		REP_SYSTEM_LOCK(env);
	}

	COMPQUIET(msg, NULL);
	return (0);
}

/*
 * __rep_send_throttle -
 *	Send a record, throttling if necessary.  Callers of this function
 * will throttle - breaking out of their loop, if the repth->type field
 * changes from the normal message type to the *_MORE message type.
 * This function will send the normal type unless throttling gets invoked.
 * Then it sets the type field and sends the _MORE message.
 *
 * Throttling is always only relevant in serving requests, so we always send
 * with REPCTL_RESEND.  Additional desired flags can be passed in the ctlflags
 * argument.
 *
 * PUBLIC: int __rep_send_throttle __P((ENV *, int, REP_THROTTLE *,
 * PUBLIC:    u_int32_t, u_int32_t));
 */
int
__rep_send_throttle(env, eid, repth, flags, ctlflags)
	ENV *env;
	int eid;
	REP_THROTTLE *repth;
	u_int32_t ctlflags, flags;
{
	DB_REP *db_rep;
	REP *rep;
	u_int32_t size, typemore;
	int check_limit;

	check_limit = repth->gbytes != 0 || repth->bytes != 0;
	/*
	 * If we only want to do throttle processing and we don't have it
	 * turned on, return immediately.
	 */
	if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY))
		return (0);

	db_rep = env->rep_handle;
	rep = db_rep->region;
	typemore = 0;
	if (repth->type == REP_LOG)
		typemore = REP_LOG_MORE;
	if (repth->type == REP_PAGE)
		typemore = REP_PAGE_MORE;
	DB_ASSERT(env, typemore != 0);

	/*
	 * data_dbt.size is only the size of the log
	 * record;  it doesn't count the size of the
	 * control structure. Factor that in as well
	 * so we're not off by a lot if our log records
	 * are small.
	 */
	size = repth->data_dbt->size + sizeof(__rep_control_args);
	if (check_limit) {
		while (repth->bytes <= size) {
			if (repth->gbytes > 0) {
				repth->bytes += GIGABYTE;
				--(repth->gbytes);
				continue;
			}
			/*
			 * We don't hold the rep mutex,
			 * and may miscount.
			 */
			STAT(rep->stat.st_nthrottles++);
			repth->type = typemore;
			goto snd;
		}
		repth->bytes -= size;
	}
	/*
	 * Always send if it is typemore, otherwise send only if
	 * REP_THROTTLE_ONLY is not set.
	 *
	 * NOTE:  It is the responsibility of the caller to marshal, if
	 * needed, the data_dbt.  This function just sends what it is given.
	 */
snd:	if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) &&
	    (__rep_send_message(env, eid, repth->type,
	    &repth->lsn, repth->data_dbt, (REPCTL_RESEND | ctlflags), 0) != 0))
		return (DB_REP_UNAVAIL);
	return (0);
}

/*
 * __rep_msg_to_old --
 *	Convert current message numbers to old message numbers.
 *
 * PUBLIC: u_int32_t __rep_msg_to_old __P((u_int32_t, u_int32_t));
 */
u_int32_t
__rep_msg_to_old(version, rectype)
	u_int32_t version, rectype;
{
	/*
	 * We need to convert from current message numbers to old numbers and
	 * we need to convert from old numbers to current numbers.  Offset by
	 * one for more readable code.
	 */
	/*
	 * Everything for version 0 is invalid, there is no version 0.
	 */
	static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
	/* There is no DB_REPVERSION 0. */
	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
	/*
	 * 4.2/DB_REPVERSION 1 no longer supported.
	 */
	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
	/*
	 * 4.3/DB_REPVERSION 2 no longer supported.
	 */
	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
	/*
	 * From 4.7 message number To 4.4/4.5 message number
	 */
	{   REP_INVALID,	/* NO message 0 */
	    1,			/* REP_ALIVE */
	    2,			/* REP_ALIVE_REQ */
	    3,			/* REP_ALL_REQ */
	    4,			/* REP_BULK_LOG */
	    5,			/* REP_BULK_PAGE */
	    6,			/* REP_DUPMASTER */
	    7,			/* REP_FILE */
	    8,			/* REP_FILE_FAIL */
	    9,			/* REP_FILE_REQ */
	    REP_INVALID,	/* REP_LEASE_GRANT */
	    10,			/* REP_LOG */
	    11,			/* REP_LOG_MORE */
	    12,			/* REP_LOG_REQ */
	    13,			/* REP_MASTER_REQ */
	    14,			/* REP_NEWCLIENT */
	    15,			/* REP_NEWFILE */
	    16,			/* REP_NEWMASTER */
	    17,			/* REP_NEWSITE */
	    18,			/* REP_PAGE */
	    19,			/* REP_PAGE_FAIL */
	    20,			/* REP_PAGE_MORE */
	    21,			/* REP_PAGE_REQ */
	    22,			/* REP_REREQUEST */
	    REP_INVALID,	/* REP_START_SYNC */
	    23,			/* REP_UPDATE */
	    24,			/* REP_UPDATE_REQ */
	    25,			/* REP_VERIFY */
	    26,			/* REP_VERIFY_FAIL */
	    27,			/* REP_VERIFY_REQ */
	    28,			/* REP_VOTE1 */
	    29			/* REP_VOTE2 */
	},
	/*
	 * From 4.7 message number To 4.6 message number.  There are
	 * NO message differences between 4.6 and 4.7.  The
	 * control structure changed.
	 */
	{   REP_INVALID,	/* NO message 0 */
	    1,			/* REP_ALIVE */
	    2,			/* REP_ALIVE_REQ */
	    3,			/* REP_ALL_REQ */
	    4,			/* REP_BULK_LOG */
	    5,			/* REP_BULK_PAGE */
	    6,			/* REP_DUPMASTER */
	    7,			/* REP_FILE */
	    8,			/* REP_FILE_FAIL */
	    9,			/* REP_FILE_REQ */
	    10,			/* REP_LEASE_GRANT */
	    11,			/* REP_LOG */
	    12,			/* REP_LOG_MORE */
	    13,			/* REP_LOG_REQ */
	    14,			/* REP_MASTER_REQ */
	    15,			/* REP_NEWCLIENT */
	    16,			/* REP_NEWFILE */
	    17,			/* REP_NEWMASTER */
	    18,			/* REP_NEWSITE */
	    19,			/* REP_PAGE */
	    20,			/* REP_PAGE_FAIL */
	    21,			/* REP_PAGE_MORE */
	    22,			/* REP_PAGE_REQ */
	    23,			/* REP_REREQUEST */
	    24,			/* REP_START_SYNC */
	    25,			/* REP_UPDATE */
	    26,			/* REP_UPDATE_REQ */
	    27,			/* REP_VERIFY */
	    28,			/* REP_VERIFY_FAIL */
	    29,			/* REP_VERIFY_REQ */
	    30,			/* REP_VOTE1 */
	    31			/* REP_VOTE2 */
	}
	};
	return (table[version][rectype]);
}

/*
 * __rep_msg_from_old --
 *	Convert old message numbers to current message numbers.
 *
 * PUBLIC: u_int32_t __rep_msg_from_old __P((u_int32_t, u_int32_t));
 */
u_int32_t
__rep_msg_from_old(version, rectype)
	u_int32_t version, rectype;
{
	/*
	 * We need to convert from current message numbers to old numbers and
	 * we need to convert from old numbers to current numbers.  Offset by
	 * one for more readable code.
	 */
	/*
	 * Everything for version 0 is invalid, there is no version 0.
	 */
	static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
	/* There is no DB_REPVERSION 0. */
	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
	/*
	 * 4.2/DB_REPVERSION 1 no longer supported.
	 */
	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
	/*
	 * 4.3/DB_REPVERSION 2 no longer supported.
	 */
	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
	/*
	 * From 4.4/4.5 message number To 4.7 message number
	 */
	{   REP_INVALID,	/* NO message 0 */
	    1,			/* 1, REP_ALIVE */
	    2,			/* 2, REP_ALIVE_REQ */
	    3,			/* 3, REP_ALL_REQ */
	    4,			/* 4, REP_BULK_LOG */
	    5,			/* 5, REP_BULK_PAGE */
	    6,			/* 6, REP_DUPMASTER */
	    7,			/* 7, REP_FILE */
	    8,			/* 8, REP_FILE_FAIL */
	    9,			/* 9, REP_FILE_REQ */
	    /* 10, REP_LEASE_GRANT doesn't exist */
	    11,			/* 10, REP_LOG */
	    12,			/* 11, REP_LOG_MORE */
	    13,			/* 12, REP_LOG_REQ */
	    14,			/* 13, REP_MASTER_REQ */
	    15,			/* 14, REP_NEWCLIENT */
	    16,			/* 15, REP_NEWFILE */
	    17,			/* 16, REP_NEWMASTER */
	    18,			/* 17, REP_NEWSITE */
	    19,			/* 18, REP_PAGE */
	    20,			/* 19, REP_PAGE_FAIL */
	    21,			/* 20, REP_PAGE_MORE */
	    22,			/* 21, REP_PAGE_REQ */
	    23,			/* 22, REP_REREQUEST */
	    /* 24, REP_START_SYNC doesn't exist */
	    25,			/* 23, REP_UPDATE */
	    26,			/* 24, REP_UPDATE_REQ */
	    27,			/* 25, REP_VERIFY */
	    28,			/* 26, REP_VERIFY_FAIL */
	    29,			/* 27, REP_VERIFY_REQ */
	    30,			/* 28, REP_VOTE1 */
	    31,			/* 29, REP_VOTE2 */
	    REP_INVALID,	/* 30, 4.4/4.5 no message */
	    REP_INVALID		/* 31, 4.4/4.5 no message */
	},
	/*
	 * From 4.6 message number To 4.6 message number.  There are
	 * NO message differences between 4.6 and 4.7.  The
	 * control structure changed.
	 */
	{   REP_INVALID,	/* NO message 0 */
	    1,			/* 1, REP_ALIVE */
	    2,			/* 2, REP_ALIVE_REQ */
	    3,			/* 3, REP_ALL_REQ */
	    4,			/* 4, REP_BULK_LOG */
	    5,			/* 5, REP_BULK_PAGE */
	    6,			/* 6, REP_DUPMASTER */
	    7,			/* 7, REP_FILE */
	    8,			/* 8, REP_FILE_FAIL */
	    9,			/* 9, REP_FILE_REQ */
	    10,			/* 10, REP_LEASE_GRANT */
	    11,			/* 11, REP_LOG */
	    12,			/* 12, REP_LOG_MORE */
	    13,			/* 13, REP_LOG_REQ */
	    14,			/* 14, REP_MASTER_REQ */
	    15,			/* 15, REP_NEWCLIENT */
	    16,			/* 16, REP_NEWFILE */
	    17,			/* 17, REP_NEWMASTER */
	    18,			/* 18, REP_NEWSITE */
	    19,			/* 19, REP_PAGE */
	    20,			/* 20, REP_PAGE_FAIL */
	    21,			/* 21, REP_PAGE_MORE */
	    22,			/* 22, REP_PAGE_REQ */
	    23,			/* 22, REP_REREQUEST */
	    24,			/* 24, REP_START_SYNC */
	    25,			/* 25, REP_UPDATE */
	    26,			/* 26, REP_UPDATE_REQ */
	    27,			/* 27, REP_VERIFY */
	    28,			/* 28, REP_VERIFY_FAIL */
	    29,			/* 29, REP_VERIFY_REQ */
	    30,			/* 30, REP_VOTE1 */
	    31			/* 31, REP_VOTE2 */
	}
	};
	return (table[version][rectype]);
}

/*
 * __rep_print_system --
 *	Optionally print a verbose message, including to the system file.
 *
 * PUBLIC: int __rep_print_system __P((ENV *, u_int32_t, const char *, ...))
 * PUBLIC:    __attribute__ ((__format__ (__printf__, 3, 4)));
 */
int
#ifdef STDC_HEADERS
__rep_print_system(ENV *env, u_int32_t verbose, const char *fmt, ...)
#else
__rep_print_system(env, verbose, fmt, va_alist)
	ENV *env;
	u_int32_t verbose;
	const char *fmt;
	va_dcl
#endif
{
	va_list ap;
	int ret;

#ifdef STDC_HEADERS
	va_start(ap, fmt);
#else
	va_start(ap);
#endif
	ret = __rep_print_int(env, verbose | DB_VERB_REP_SYSTEM, fmt, ap);
	va_end(ap);
	return (ret);
}

/*
 * __rep_print --
 *	Optionally print a verbose message.
 *
 * PUBLIC: int __rep_print __P((ENV *, u_int32_t, const char *, ...))
 * PUBLIC:    __attribute__ ((__format__ (__printf__, 3, 4)));
 */
int
#ifdef STDC_HEADERS
__rep_print(ENV *env, u_int32_t verbose, const char *fmt, ...)
#else
__rep_print(env, verbose, fmt, va_alist)
	ENV *env;
	u_int32_t verbose;
	const char *fmt;
	va_dcl
#endif
{
	va_list ap;
	int ret;

#ifdef STDC_HEADERS
	va_start(ap, fmt);
#else
	va_start(ap);
#endif
	ret = __rep_print_int(env, verbose, fmt, ap);
	va_end(ap);
	return (ret);
}

/*
 * __rep_print_int --
 *	Optionally print a verbose message.
 *
 * NOTE:
 * One anomaly is that the messaging functions expect/use/require
 * void functions.  The use of a mutex in __rep_print_int requires
 * a return value.
 */
static int
__rep_print_int(env, verbose, fmt, ap)
	ENV *env;
	u_int32_t verbose;
	const char *fmt;
	va_list ap;
{
	DB_MSGBUF mb;
	REP *rep;
	db_timespec ts;
	pid_t pid;
	db_threadid_t tid;
	int diag_msg;
	u_int32_t regular_msg, tmp_verbose;
	const char *s;
	char buf[DB_THREADID_STRLEN];

	tmp_verbose = env->dbenv->verbose;
	if (FLD_ISSET(tmp_verbose, verbose | DB_VERB_REPLICATION) == 0)
		return (0);
	DB_MSGBUF_INIT(&mb);

	diag_msg = 0;
	if (REP_ON(env)) {
		rep = env->rep_handle->region;
		/*
		 * If system diag messages are configured and this message's
		 * verbose level includes DB_VERB_REP_SYSTEM, this is a diag
		 * message.  This means it will be written to the diagnostic
		 * message files.
		 */
		diag_msg = FLD_ISSET(tmp_verbose, DB_VERB_REP_SYSTEM) &&
		    FLD_ISSET(verbose, DB_VERB_REP_SYSTEM) &&
		    !FLD_ISSET(rep->config, REP_C_INMEM);
	} else
		rep = NULL;
	/*
	 * We need to know if this message should be printed out
	 * via the regular, user mechanism.
	 */
	FLD_CLR(tmp_verbose, DB_VERB_REP_SYSTEM);
	regular_msg = FLD_ISSET(tmp_verbose,
	    verbose | DB_VERB_REPLICATION);

	/*
	 * It is possible we could be called before the env is finished
	 * getting set up and we want to skip that.
	 */
	if (diag_msg == 0 && regular_msg == 0)
		return (0);
	s = NULL;
	if (env->dbenv->db_errpfx != NULL)
		s = env->dbenv->db_errpfx;
	else if (rep != NULL) {
		if (F_ISSET(rep, REP_F_CLIENT))
			s = "CLIENT";
		else if (F_ISSET(rep, REP_F_MASTER))
			s = "MASTER";
	}
	if (s == NULL)
		s = "REP_UNDEF";
	__os_id(env->dbenv, &pid, &tid);
	if (diag_msg)
		MUTEX_LOCK(env, rep->mtx_diag);
	__os_gettime(env, &ts, 1);
	__db_msgadd(env, &mb, "[%lu:%lu][%s] %s: ",
	    (u_long)ts.tv_sec, (u_long)ts.tv_nsec/NS_PER_US,
	    env->dbenv->thread_id_string(env->dbenv, pid, tid, buf), s);

	__db_msgadd_ap(env, &mb, fmt, ap);

	DB_MSGBUF_REP_FLUSH(env, &mb, diag_msg, regular_msg);
	if (diag_msg)
		MUTEX_UNLOCK(env, rep->mtx_diag);
	return (0);
}

/*
 * PUBLIC: void __rep_print_message
 * PUBLIC:     __P((ENV *, int, __rep_control_args *, char *, u_int32_t));
 */
void
__rep_print_message(env, eid, rp, str, flags)
	ENV *env;
	int eid;
	__rep_control_args *rp;
	char *str;
	u_int32_t flags;
{
	u_int32_t ctlflags, rectype, verbflag;
	char ftype[64], *type;

	rectype = rp->rectype;
	ctlflags = rp->flags;
	verbflag = DB_VERB_REP_MSGS | DB_VERB_REPLICATION;
	if (rp->rep_version != DB_REPVERSION)
		rectype = __rep_msg_from_old(rp->rep_version, rectype);
	switch (rectype) {
	case REP_ALIVE:
		FLD_SET(verbflag, DB_VERB_REP_ELECT | DB_VERB_REP_MISC);
		type = "alive";
		break;
	case REP_ALIVE_REQ:
		type = "alive_req";
		break;
	case REP_ALL_REQ:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "all_req";
		break;
	case REP_BULK_LOG:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "bulk_log";
		break;
	case REP_BULK_PAGE:
		FLD_SET(verbflag, DB_VERB_REP_SYNC);
		type = "bulk_page";
		break;
	case REP_DUPMASTER:
		FLD_SET(verbflag, DB_VERB_REP_SYSTEM);
		type = "dupmaster";
		break;
	case REP_FILE:
		type = "file";
		break;
	case REP_FILE_FAIL:
		type = "file_fail";
		break;
	case REP_FILE_REQ:
		type = "file_req";
		break;
	case REP_LEASE_GRANT:
		FLD_SET(verbflag, DB_VERB_REP_LEASE);
		type = "lease_grant";
		break;
	case REP_LOG:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "log";
		break;
	case REP_LOG_MORE:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "log_more";
		break;
	case REP_LOG_REQ:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "log_req";
		break;
	case REP_MASTER_REQ:
		type = "master_req";
		break;
	case REP_NEWCLIENT:
		FLD_SET(verbflag, DB_VERB_REP_MISC | DB_VERB_REP_SYSTEM);
		type = "newclient";
		break;
	case REP_NEWFILE:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "newfile";
		break;
	case REP_NEWMASTER:
		FLD_SET(verbflag, DB_VERB_REP_MISC | DB_VERB_REP_SYSTEM);
		type = "newmaster";
		break;
	case REP_NEWSITE:
		type = "newsite";
		break;
	case REP_PAGE:
		FLD_SET(verbflag, DB_VERB_REP_SYNC);
		type = "page";
		break;
	case REP_PAGE_FAIL:
		FLD_SET(verbflag, DB_VERB_REP_SYNC);
		type = "page_fail";
		break;
	case REP_PAGE_MORE:
		FLD_SET(verbflag, DB_VERB_REP_SYNC);
		type = "page_more";
		break;
	case REP_PAGE_REQ:
		FLD_SET(verbflag, DB_VERB_REP_SYNC);
		type = "page_req";
		break;
	case REP_REREQUEST:
		type = "rerequest";
		break;
	case REP_START_SYNC:
		FLD_SET(verbflag, DB_VERB_REP_MISC);
		type = "start_sync";
		break;
	case REP_UPDATE:
		FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
		type = "update";
		break;
	case REP_UPDATE_REQ:
		FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
		type = "update_req";
		break;
	case REP_VERIFY:
		FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
		type = "verify";
		break;
	case REP_VERIFY_FAIL:
		FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
		type = "verify_fail";
		break;
	case REP_VERIFY_REQ:
		FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
		type = "verify_req";
		break;
	case REP_VOTE1:
		FLD_SET(verbflag, DB_VERB_REP_ELECT | DB_VERB_REP_SYSTEM);
		type = "vote1";
		break;
	case REP_VOTE2:
		FLD_SET(verbflag, DB_VERB_REP_ELECT | DB_VERB_REP_SYSTEM);
		type = "vote2";
		break;
	default:
		type = "NOTYPE";
		break;
	}

	/*
	 * !!!
	 * If adding new flags to print out make sure the aggregate
	 * length cannot overflow the buffer.
	 */
	ftype[0] = '\0';
	if (LF_ISSET(DB_REP_ANYWHERE))
		(void)strcat(ftype, " any");		/* 4 */
	if (FLD_ISSET(ctlflags, REPCTL_FLUSH))
		(void)strcat(ftype, " flush");		/* 10 */
	/*
	 * We expect most of the time the messages will indicate
	 * group membership.  Only print if we're not already
	 * part of a group.
	 */
	if (!FLD_ISSET(ctlflags, REPCTL_GROUP_ESTD))
		(void)strcat(ftype, " nogroup");	/* 18 */
	if (FLD_ISSET(ctlflags, REPCTL_LEASE))
		(void)strcat(ftype, " lease");		/* 24 */
	if (LF_ISSET(DB_REP_NOBUFFER))
		(void)strcat(ftype, " nobuf");		/* 30 */
	if (FLD_ISSET(ctlflags, REPCTL_PERM))
		(void)strcat(ftype, " perm");		/* 35 */
	if (LF_ISSET(DB_REP_REREQUEST))
		(void)strcat(ftype, " rereq");		/* 41 */
	if (FLD_ISSET(ctlflags, REPCTL_RESEND))
		(void)strcat(ftype, " resend");		/* 48 */
	if (FLD_ISSET(ctlflags, REPCTL_LOG_END))
		(void)strcat(ftype, " logend");		/* 55 */

	/*
	 * !!!
	 * We selectively turned on bits using different verbose settings
	 * that relate to each message type.  Therefore, since the
	 * DB_VERB_REP_SYSTEM flag is explicitly set above when wanted,
	 * we *must* use the VPRINT macro here.  It will correctly
	 * handle the messages whether or not the SYSTEM flag is set.
	 */
	VPRINT(env, (env, verbflag,
    "%s %s: msgv = %lu logv %lu gen = %lu eid %d, type %s, LSN [%lu][%lu] %s",
	    env->db_home, str,
	    (u_long)rp->rep_version, (u_long)rp->log_version, (u_long)rp->gen,
	    eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset, ftype));
	/*
	 * Make sure the version is close, and not swapped
	 * here.  Check for current version,  +/- a little bit.
	 */
	DB_ASSERT(env, rp->rep_version <= DB_REPVERSION+10);
	DB_ASSERT(env, rp->log_version <= DB_LOGVERSION+10);
}

/*
 * PUBLIC: void __rep_fire_event __P((ENV *, u_int32_t, void *));
 */
void
__rep_fire_event(env, event, info)
	ENV *env;
	u_int32_t event;
	void *info;
{
	int ret;

	/*
	 * Give repmgr first crack at handling all replication-related events.
	 * If it can't (or chooses not to) handle the event fully, then pass it
	 * along to the application.
	 */
	ret = __repmgr_handle_event(env, event, info);
	DB_ASSERT(env, ret == 0 || ret == DB_EVENT_NOT_HANDLED);

	if (ret == DB_EVENT_NOT_HANDLED)
		DB_EVENT(env, event, info);
}

/*
 * __rep_msg --
 *      Rep system diagnostic messaging routine.
 * This function is called from the __db_msg subsystem to
 * write out diagnostic messages to replication-owned files.
 *
 * PUBLIC: void __rep_msg __P((const ENV *, const char *));
 */
void
__rep_msg(env, msg)
	const ENV *env;
	const char *msg;
{
	DB_FH *fhp;
	DB_REP *db_rep;
	REP *rep;
	int i;
	size_t cnt, nlcnt;
	char nl = '\n';

	db_rep = env->rep_handle;
	rep = db_rep->region;
	DB_ASSERT((ENV *)env, !FLD_ISSET(rep->config, REP_C_INMEM));
	/*
	 * We know the only way we get here is with the mutex locked.  So
	 * we can read, modify and change all the diag related fields.
	 */
	i = rep->diag_index;
	fhp = db_rep->diagfile[i];

	if (db_rep->diag_off != rep->diag_off)
		(void)__os_seek((ENV *)env, fhp, 0, 0, rep->diag_off);
	if (__os_write((ENV *)env, fhp, (void *)msg, strlen(msg), &cnt) != 0)
		return;
	if (__os_write((ENV *)env, fhp, &nl, 1, &nlcnt) != 0)
		return;
	db_rep->diag_off = rep->diag_off += (cnt + nlcnt);
	/*
	 * If writing this message put us over the file size threshold,
	 * then we reset to the next file.  We don't care if it is
	 * exactly at the size, some amount over the file size is fine.
	 */
	if (rep->diag_off >= REP_DIAGSIZE) {
		rep->diag_index = (++i % DBREP_DIAG_FILES);
		rep->diag_off = 0;
	}
	return;
}

/*
 * PUBLIC: int __rep_notify_threads __P((ENV *, rep_waitreason_t));
 *
 * Caller must hold rep region mutex.  In the AWAIT_LSN case, caller must also
 * hold mtx_clientdb.
 */
int
__rep_notify_threads(env, wake_reason)
	ENV *env;
	rep_waitreason_t wake_reason;
{
	REP *rep;
	struct __rep_waiter *waiter;
	struct rep_waitgoal *goal;
	int ret, wake;

	ret = 0;
	rep = env->rep_handle->region;

	SH_TAILQ_FOREACH(waiter, &rep->waiters, links, __rep_waiter) {
		goal = &waiter->goal;
		wake = 0;
		if (wake_reason == LOCKOUT) {
			F_SET(waiter, REP_F_PENDING_LOCKOUT);
			wake = 1;
		} else if (wake_reason == goal->why ||
		    (goal->why == AWAIT_HISTORY && wake_reason == AWAIT_LSN)) {
			/*
			 * It's important that we only call __rep_check_goal
			 * with "goals" that match the wake_reason passed to us
			 * (modulo the LSN-to-HISTORY equivalence), because the
			 * caller has ensured that it is holding the appropriate
			 * mutexes depending on the wake_reason.
			 */
			if ((ret = __rep_check_goal(env, goal)) == 0)
				wake = 1;
			else if (ret == DB_TIMEOUT)
				ret = 0;
			else
				goto out;
		}

		if (wake) {
			MUTEX_UNLOCK(env, waiter->mtx_repwait);
			SH_TAILQ_REMOVE(&rep->waiters,
			    waiter, links, __rep_waiter);
			F_SET(waiter, REP_F_WOKEN);
		}
	}

out:
	return (ret);
}

/*
 * A "wait goal" describes a condition that a thread may be waiting for.
 * Evaluate the condition, returning 0 if the condition has been satisfied, and
 * DB_TIMEOUT if not.
 *
 * Caller must hold REP_SYSTEM lock and/or mtx_clientdb as appropriate.
 *
 * PUBLIC: int __rep_check_goal __P((ENV *, struct rep_waitgoal *));
 */
int
__rep_check_goal(env, goal)
	ENV *env;
	struct rep_waitgoal *goal;
{
	REP *rep;
	LOG *lp;
	int ret;

	rep = env->rep_handle->region;
	lp = env->lg_handle->reginfo.primary;
	ret = DB_TIMEOUT;	/* Pessimistic, to start. */

	/*
	 * Note that while AWAIT_LSN and AWAIT_HISTORY look similar, they are
	 * actually quite different.  With AWAIT_LSN, the u.lsn is the LSN of
	 * the commit of the transaction the caller is waiting for.  So we need
	 * to make sure we have gotten at least that far, thus ">=".
	 *
	 * For AWAIT_HISTORY, the u.lsn is simply a copy of whatever the current
	 * max_perm_lsn was at the time we last checked.  So anything if we have
	 * anything *beyond* that then we should wake up again and check to see
	 * if we now have the desired history (thus ">").  Thus when we're
	 * waiting for HISTORY we're going to get woken *at every commit we
	 * receive*!  Fortunately it should be coming as the first transaction
	 * after the gen change, and waiting for HISTORY should be extremely
	 * rare anyway.
	 */
	switch (goal->why) {
	case AWAIT_LSN:
		/* Have we reached our goal LSN? */
		if (LOG_COMPARE(&lp->max_perm_lsn, &goal->u.lsn) >= 0)
			ret = 0;
		break;
	case AWAIT_HISTORY:
		/*
		 * Have we made any progress whatsoever, beyond where we were at
		 * the time the waiting thread noted the current LSN?
		 *     When we have to wait for replication of the LSN history
		 * database, we don't know what LSN it's going to occur at.  So
		 * we have to wake up every time we get a new transaction.
		 * Fortunately, this should be exceedingly rare, and the number
		 * of transactions we have to plow through should almost never
		 * be more than 1.
		 */
		if (LOG_COMPARE(&lp->max_perm_lsn, &goal->u.lsn) > 0)
			ret = 0;
		break;
	case AWAIT_GEN:
		if (rep->gen >= goal->u.gen)
			ret = 0;
		break;
	case AWAIT_NIMDB:
		if (F_ISSET(rep, REP_F_NIMDBS_LOADED))
			ret = 0;
		break;
	default:
		DB_ASSERT(env, 0);
	}
	return (ret);
}
