diff --git a/dist/api_data.py b/dist/api_data.py index 7991a8416..d624edaed 100644 --- a/dist/api_data.py +++ b/dist/api_data.py @@ -1930,6 +1930,10 @@ methods = { Config('dryrun', 'false', r''' perform the checks associated with RTS, but don't modify any data.''', type='boolean'), + Config('threads', '2', r''' + maximum number of threads WiredTiger will start to help rollback to stable. Each + rts worker thread uses a session from the configured session_max''', + min=0, max=10), ]), 'WT_SESSION.reconfigure' : Method(session_config), diff --git a/src/config/config_def.c b/src/config/config_def.c index 76c738fa4..e0f6f1995 100644 --- a/src/config/config_def.c +++ b/src/config/config_def.c @@ -203,7 +203,8 @@ static const WT_CONFIG_CHECK confchk_WT_CONNECTION_reconfigure[] = { {NULL, NULL, NULL, NULL, NULL, 0}}; static const WT_CONFIG_CHECK confchk_WT_CONNECTION_rollback_to_stable[] = { - {"dryrun", "boolean", NULL, NULL, NULL, 0}, {NULL, NULL, NULL, NULL, NULL, 0}}; + {"dryrun", "boolean", NULL, NULL, NULL, 0}, {"threads", "int", NULL, "min=0,max=10", NULL, 0}, + {NULL, NULL, NULL, NULL, NULL, 0}}; static const WT_CONFIG_CHECK confchk_WT_CONNECTION_set_timestamp[] = { {"durable_timestamp", "string", NULL, NULL, NULL, 0}, {"force", "boolean", NULL, NULL, NULL, 0}, @@ -1307,7 +1308,8 @@ static const WT_CONFIG_ENTRY config_entries[] = {{"WT_CONNECTION.add_collator", "tiered_storage=(local_retention=300),timing_stress_for_test=," "verbose=[]", confchk_WT_CONNECTION_reconfigure, 35}, - {"WT_CONNECTION.rollback_to_stable", "dryrun=false", confchk_WT_CONNECTION_rollback_to_stable, 1}, + {"WT_CONNECTION.rollback_to_stable", "dryrun=false,threads=2", + confchk_WT_CONNECTION_rollback_to_stable, 2}, {"WT_CONNECTION.set_file_system", "", NULL, 0}, {"WT_CONNECTION.set_timestamp", "durable_timestamp=,force=false,oldest_timestamp=," diff --git a/src/include/connection.h b/src/include/connection.h index d2bffc953..62f0a5992 100644 --- a/src/include/connection.h +++ b/src/include/connection.h @@ -705,9 +705,10 @@ struct __wt_connection_impl { #define WT_CONN_RECONFIGURING 0x00800000u #define WT_CONN_RECOVERING 0x01000000u #define WT_CONN_RECOVERY_COMPLETE 0x02000000u -#define WT_CONN_SALVAGE 0x04000000u -#define WT_CONN_TIERED_FIRST_FLUSH 0x08000000u -#define WT_CONN_WAS_BACKUP 0x10000000u +#define WT_CONN_RTS_THREAD_RUN 0x04000000u +#define WT_CONN_SALVAGE 0x08000000u +#define WT_CONN_TIERED_FIRST_FLUSH 0x10000000u +#define WT_CONN_WAS_BACKUP 0x20000000u /* AUTOMATIC FLAG VALUE GENERATION STOP 32 */ uint32_t flags; }; diff --git a/src/include/extern.h b/src/include/extern.h index 5a0a3cf32..c4c34ad94 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -33,6 +33,8 @@ extern bool __wt_page_evict_urgent(WT_SESSION_IMPL *session, WT_REF *ref) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern bool __wt_read_cell_time_window(WT_CURSOR_BTREE *cbt, WT_TIME_WINDOW *tw) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern bool __wt_rts_thread_chk(WT_SESSION_IMPL *session) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern bool __wt_rts_visibility_has_stable_update(WT_UPDATE *upd) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern bool __wt_rts_visibility_page_needs_abort(WT_SESSION_IMPL *session, WT_REF *ref, @@ -1357,6 +1359,8 @@ extern int __wt_rts_btree_walk_btree(WT_SESSION_IMPL *session, wt_timestamp_t ro extern int __wt_rts_btree_walk_btree_apply( WT_SESSION_IMPL *session, const char *uri, const char *config, wt_timestamp_t rollback_timestamp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_btree_work_unit(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rts_check(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rts_history_btree_hs_truncate(WT_SESSION_IMPL *session, uint32_t btree_id) @@ -1365,6 +1369,12 @@ extern int __wt_rts_history_delete_hs(WT_SESSION_IMPL *session, WT_ITEM *key, wt WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rts_history_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t rollback_timestamp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_push_work(WT_SESSION_IMPL *session, const char *uri, + wt_timestamp_t rollback_timestamp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_thread_run(WT_SESSION_IMPL *session, WT_THREAD *thread) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_thread_stop(WT_SESSION_IMPL *session, WT_THREAD *thread) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rwlock_init(WT_SESSION_IMPL *session, WT_RWLOCK *l) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_salvage(WT_SESSION_IMPL *session, const char *cfg[]) @@ -1931,8 +1941,10 @@ extern void __wt_ref_out(WT_SESSION_IMPL *session, WT_REF *ref); extern void __wt_rollback_to_stable_init(WT_CONNECTION_IMPL *conn); extern void __wt_root_ref_init( WT_SESSION_IMPL *session, WT_REF *root_ref, WT_PAGE *root, bool is_recno); +extern void __wt_rts_pop_work(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT **entryp); extern void __wt_rts_progress_msg(WT_SESSION_IMPL *session, struct timespec rollback_start, uint64_t rollback_count, uint64_t *rollback_msg_count, bool walk); +extern void __wt_rts_work_free(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry); extern void __wt_rwlock_destroy(WT_SESSION_IMPL *session, WT_RWLOCK *l); extern void __wt_schema_destroy_colgroup(WT_SESSION_IMPL *session, WT_COLGROUP **colgroupp); extern void __wt_scr_discard(WT_SESSION_IMPL *session); diff --git a/src/include/rollback_to_stable.h b/src/include/rollback_to_stable.h index 757d6a881..e94c6125c 100644 --- a/src/include/rollback_to_stable.h +++ b/src/include/rollback_to_stable.h @@ -52,6 +52,7 @@ #define WT_RTS_VERB_TAG_TREE_SKIP "[TREE_SKIP] " #define WT_RTS_VERB_TAG_UPDATE_ABORT "[UPDATE_ABORT] " #define WT_RTS_VERB_TAG_UPDATE_CHAIN_VERIFY "[UPDATE_CHAIN_VERIFY] " +#define WT_RTS_VERB_TAG_WAIT_THREADS "[WAIT_THREADS] " #define WT_CHECK_RECOVERY_FLAG_TXNID(session, txnid) \ (F_ISSET(S2C(session), WT_CONN_RECOVERING) && S2C(session)->recovery_ckpt_snap_min != 0 && \ @@ -81,6 +82,16 @@ WT_STAT_CONN_DATA_INCR(session, stat##_dryrun); \ } while (0) +/* + * WT_RTS_WORK_UNIT -- + * A definition of maintenance that a RTS tree needs done. + */ +struct __wt_rts_work_unit { + TAILQ_ENTRY(__wt_rts_work_unit) q; /* Worker unit queue */ + char *uri; + wt_timestamp_t rollback_timestamp; +}; + /* * WT_ROLLBACK_TO_STABLE -- * Rollback to stable singleton, contains the interface to rollback to stable along @@ -91,6 +102,15 @@ struct __wt_rollback_to_stable { int (*rollback_to_stable_one)(WT_SESSION_IMPL *, const char *, bool *); int (*rollback_to_stable)(WT_SESSION_IMPL *, const char *[], bool); + /* RTS thread information. */ + WT_CONDVAR *thread_cond; /* RTS thread condition */ + WT_THREAD_GROUP thread_group; + uint32_t threads; /* RTS threads */ + + /* Locked: rts system work queue. */ + TAILQ_HEAD(__wt_rts_qh, __wt_rts_work_unit) rtsqh; + WT_SPINLOCK rts_lock; /* RTS work queue spinlock */ + /* Configuration. */ bool dryrun; }; diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index d770a11e6..a8616a31d 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -2639,6 +2639,9 @@ struct __wt_connection { * @configstart{WT_CONNECTION.rollback_to_stable, see dist/api_data.py} * @config{dryrun, perform the checks associated with RTS\, but don't modify any data., a * boolean flag; default \c false.} + * @config{threads, maximum number of threads WiredTiger will start to help rollback to stable. + * Each rts worker thread uses a session from the configured session_max., an integer between \c + * 0 and \c 10; default \c 2.} * @configend * @errors * An error should occur only in the case of a system problem, and an application typically diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index e3df0754b..803f39a62 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -327,6 +327,8 @@ struct __wt_rollback_to_stable; typedef struct __wt_rollback_to_stable WT_ROLLBACK_TO_STABLE; struct __wt_row; typedef struct __wt_row WT_ROW; +struct __wt_rts_work_unit; +typedef struct __wt_rts_work_unit WT_RTS_WORK_UNIT; struct __wt_rwlock; typedef struct __wt_rwlock WT_RWLOCK; struct __wt_salvage_cookie; @@ -437,9 +439,10 @@ typedef uint64_t wt_timestamp_t; #include "misc.h" #include "mutex.h" -#include "stat.h" /* required by dhandle.h */ -#include "dhandle.h" /* required by btree.h */ -#include "timestamp.h" /* required by reconcile.h */ +#include "stat.h" /* required by dhandle.h */ +#include "dhandle.h" /* required by btree.h */ +#include "timestamp.h" /* required by reconcile.h */ +#include "thread_group.h" /* required by rollback_to_stable.h */ #include "api.h" #include "block.h" @@ -464,7 +467,6 @@ typedef uint64_t wt_timestamp_t; #include "reconcile.h" #include "rollback_to_stable.h" #include "schema.h" -#include "thread_group.h" #include "tiered.h" #include "truncate.h" #include "txn.h" diff --git a/src/rollback_to_stable/rts.c b/src/rollback_to_stable/rts.c index 08e7b9ba8..a88ca2ebd 100644 --- a/src/rollback_to_stable/rts.c +++ b/src/rollback_to_stable/rts.c @@ -103,6 +103,133 @@ __wt_rts_progress_msg(WT_SESSION_IMPL *session, struct timespec rollback_start, } } +/* + * __wt_rts_thread_chk -- + * Check to decide if the rts thread should continue running. + */ +bool +__wt_rts_thread_chk(WT_SESSION_IMPL *session) +{ + return (F_ISSET(S2C(session), WT_CONN_RTS_THREAD_RUN)); +} + +/* + * __wt_rts_thread_run -- + * Entry function for an rts thread. This is called repeatedly from the thread group code so it + * does not need to loop itself. + */ +int +__wt_rts_thread_run(WT_SESSION_IMPL *session, WT_THREAD *thread) +{ + WT_DECL_RET; + WT_RTS_WORK_UNIT *entry; + + WT_UNUSED(thread); + + /* Mark the session as an eviction thread session. */ + F_SET(session, WT_SESSION_ROLLBACK_TO_STABLE); + + __wt_rts_pop_work(session, &entry); + if (entry == NULL) + return (0); + + WT_ERR(__wt_rts_btree_work_unit(session, entry)); + + if (0) { +err: + WT_RET_PANIC(session, ret, "rts thread error"); + } + return (ret); +} + +/* + * __wt_rts_thread_stop -- + * Shutdown function for an rts thread. + */ +int +__wt_rts_thread_stop(WT_SESSION_IMPL *session, WT_THREAD *thread) +{ + if (thread->id != 0) + return (0); + + /* Clear the eviction thread session flag. */ + F_CLR(session, WT_SESSION_ROLLBACK_TO_STABLE); + + __wt_verbose(session, WT_VERB_EVICTSERVER, "%s", "rts thread exiting"); + return (0); +} + +/* + * __wt_rts_thread_create -- + * Start rts threads. + */ +static int +__wt_rts_thread_create(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + uint32_t session_flags; + + conn = S2C(session); + + if (conn->rts->threads == 0) + return (0); + + /* Set first, the thread might run before we finish up. */ + F_SET(conn, WT_CONN_RTS_THREAD_RUN); + + TAILQ_INIT(&conn->rts->rtsqh); /* RTS work unit list */ + WT_RET(__wt_spin_init(session, &conn->rts->rts_lock, "rts work unit list")); + WT_RET(__wt_cond_auto_alloc( + session, "rts threads", 10 * WT_THOUSAND, WT_MILLION, &conn->rts->thread_cond)); + + /* + * Create the rts thread group. Set the group size to the maximum allowed sessions. + */ + session_flags = WT_THREAD_CAN_WAIT | WT_THREAD_PANIC_FAIL; + WT_RET(__wt_thread_group_create(session, &conn->rts->thread_group, "rts-threads", + conn->rts->threads, conn->rts->threads, session_flags, __wt_rts_thread_chk, + __wt_rts_thread_run, __wt_rts_thread_stop)); + + return (0); +} + +/* + * __wt_rts_thread_destroy -- + * Destroy the rts threads. + */ +static int +__wt_rts_thread_destroy(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + + conn = S2C(session); + + if (conn->rts->threads == 0) + return (0); + + /* Wait for any rts thread group changes to stabilize. */ + __wt_writelock(session, &conn->rts->thread_group.lock); + + /* + * Signal the threads to finish and stop populating the queue. + */ + F_CLR(conn, WT_CONN_RTS_THREAD_RUN); + __wt_cond_signal(session, conn->rts->thread_cond); + + __wt_verbose( + session, WT_VERB_RTS, WT_RTS_VERB_TAG_WAIT_THREADS "%s", "waiting for helper threads"); + + /* + * We call the destroy function still holding the write lock. It assumes it is called locked. + */ + WT_TRET(__wt_thread_group_destroy(session, &conn->rts->thread_group)); + __wt_spin_destroy(session, &conn->rts->rts_lock); + __wt_cond_destroy(session, &conn->rts->thread_cond); + + return (ret); +} + /* * __wt_rts_btree_apply_all -- * Perform rollback to stable to all files listed in the metadata, apart from the metadata and @@ -114,16 +241,21 @@ __wt_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_times struct timespec rollback_timer; WT_CURSOR *cursor; WT_DECL_RET; + WT_RTS_WORK_UNIT *entry; uint64_t rollback_count, rollback_msg_count; char ts_string[WT_TS_INT_STRING_SIZE]; const char *config, *uri; + bool rts_threads_started; /* Initialize the verbose tracking timer. */ __wt_epoch(session, &rollback_timer); rollback_count = 0; rollback_msg_count = 0; - WT_RET(__wt_metadata_cursor(session, &cursor)); + WT_RET(__wt_rts_thread_create(session)); + rts_threads_started = true; + + WT_ERR(__wt_metadata_cursor(session, &cursor)); while ((ret = cursor->next(cursor)) == 0) { /* Log a progress message. */ __wt_rts_progress_msg(session, rollback_timer, rollback_count, &rollback_msg_count, false); @@ -151,6 +283,23 @@ __wt_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_times } WT_ERR_NOTFOUND_OK(ret, false); + /* + * Wait until the entire rts queue is finished processing before performing the history store + * final pass. + */ + if (S2C(session)->rts->threads != 0) { + while (!TAILQ_EMPTY(&S2C(session)->rts->rtsqh)) { + __wt_rts_pop_work(session, &entry); + if (entry == NULL) + break; + WT_ERR(__wt_rts_btree_work_unit(session, entry)); + __wt_rts_work_free(session, entry); + } + } + + WT_ERR(__wt_rts_thread_destroy(session)); + rts_threads_started = false; + /* * Performing eviction in parallel to a checkpoint can lead to a situation where the history * store has more updates than its corresponding data store. Performing history store cleanup at @@ -170,5 +319,7 @@ __wt_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_times } err: WT_TRET(__wt_metadata_cursor_release(session, &cursor)); + if (rts_threads_started) + WT_TRET(__wt_rts_thread_destroy(session)); return (ret); } diff --git a/src/rollback_to_stable/rts_api.c b/src/rollback_to_stable/rts_api.c index 60d9556ab..b7ae4c1b4 100644 --- a/src/rollback_to_stable/rts_api.c +++ b/src/rollback_to_stable/rts_api.c @@ -37,12 +37,14 @@ __rollback_to_stable_int(WT_SESSION_IMPL *session, bool no_ckpt) WT_DECL_RET; WT_TXN_GLOBAL *txn_global; wt_timestamp_t pinned_timestamp, rollback_timestamp; + uint32_t threads; char ts_string[2][WT_TS_INT_STRING_SIZE]; bool dryrun; conn = S2C(session); txn_global = &conn->txn_global; dryrun = conn->rts->dryrun; + threads = conn->rts->threads; WT_ASSERT_SPINLOCK_OWNED(session, &conn->checkpoint_lock); WT_ASSERT_SPINLOCK_OWNED(session, &conn->schema_lock); @@ -78,9 +80,10 @@ __rollback_to_stable_int(WT_SESSION_IMPL *session, bool no_ckpt) WT_ORDERED_READ(pinned_timestamp, txn_global->pinned_timestamp); __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_INIT - "start rollback to stable with stable_timestamp=%s and oldest_timestamp=%s", + "start rollback to stable with stable_timestamp=%s and oldest_timestamp=%s using %d worker " + "threads", __wt_timestamp_to_string(rollback_timestamp, ts_string[0]), - __wt_timestamp_to_string(txn_global->oldest_timestamp, ts_string[1])); + __wt_timestamp_to_string(txn_global->oldest_timestamp, ts_string[1]), threads); if (F_ISSET(conn, WT_CONN_RECOVERING)) __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), @@ -170,8 +173,11 @@ __rollback_to_stable_finalize(WT_ROLLBACK_TO_STABLE *rts) static int __rollback_to_stable(WT_SESSION_IMPL *session, const char *cfg[], bool no_ckpt) { + struct timespec cur_time, rts_timer_start; WT_CONFIG_ITEM cval; WT_DECL_RET; + uint64_t time_diff; + uint32_t threads; bool dryrun; /* @@ -179,11 +185,16 @@ __rollback_to_stable(WT_SESSION_IMPL *session, const char *cfg[], bool no_ckpt) * don't get default values installed in the config string. */ dryrun = false; + threads = 0; if (cfg != NULL) { ret = __wt_config_gets(session, cfg, "dryrun", &cval); if (ret == 0) dryrun = cval.val != 0; WT_RET_NOTFOUND_OK(ret); + ret = __wt_config_gets(session, cfg, "threads", &cval); + if (ret == 0) + threads = (uint32_t)cval.val; + WT_RET_NOTFOUND_OK(ret); } /* @@ -196,13 +207,23 @@ __rollback_to_stable(WT_SESSION_IMPL *session, const char *cfg[], bool no_ckpt) __wt_open_internal_session(S2C(session), "txn rollback_to_stable", true, 0, 0, &session)); S2C(session)->rts->dryrun = dryrun; + S2C(session)->rts->threads = threads; + + /* Initialize the tracking timer */ + __wt_epoch(session, &rts_timer_start); WT_STAT_CONN_SET(session, txn_rollback_to_stable_running, 1); WT_WITH_CHECKPOINT_LOCK( session, WT_WITH_SCHEMA_LOCK(session, ret = __rollback_to_stable_int(session, no_ckpt))); + __wt_epoch(session, &cur_time); + + /* Time since the RTS started */ + time_diff = WT_TIMEDIFF_SEC(cur_time, rts_timer_start); + __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), - WT_RTS_VERB_TAG_END "finished rollback to stable%s", dryrun ? " dryrun" : ""); + WT_RTS_VERB_TAG_END "finished rollback to stable%s has ran for %" PRIu64 " seconds", + dryrun ? " dryrun" : "", time_diff); WT_STAT_CONN_SET(session, txn_rollback_to_stable_running, 0); __rollback_to_stable_finalize(S2C(session)->rts); diff --git a/src/rollback_to_stable/rts_btree_walk.c b/src/rollback_to_stable/rts_btree_walk.c index d4062a096..53e250dbb 100644 --- a/src/rollback_to_stable/rts_btree_walk.c +++ b/src/rollback_to_stable/rts_btree_walk.c @@ -144,6 +144,106 @@ __rts_btree_walk(WT_SESSION_IMPL *session, wt_timestamp_t rollback_timestamp) return (ret); } +/* + * __wt_rts_work_free -- + * Free a work unit and account. + */ +void +__wt_rts_work_free(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry) +{ + __wt_free(session, entry->uri); + __wt_free(session, entry); +} + +/* + * __wt_rts_pop_work -- + * Pop a work unit from the queue. + */ +void +__wt_rts_pop_work(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT **entryp) +{ + WT_CONNECTION_IMPL *conn; + WT_RTS_WORK_UNIT *entry; + + *entryp = entry = NULL; + + conn = S2C(session); + if (TAILQ_EMPTY(&conn->rts->rtsqh)) + return; + + __wt_spin_lock(session, &conn->rts->rts_lock); + + /* Recheck again to confirm whether the queue is empty or not? */ + if (TAILQ_EMPTY(&conn->rts->rtsqh)) { + __wt_spin_unlock(session, &conn->rts->rts_lock); + return; + } + + entry = TAILQ_FIRST(&conn->rts->rtsqh); + TAILQ_REMOVE(&conn->rts->rtsqh, entry, q); + *entryp = entry; + + __wt_spin_unlock(session, &conn->rts->rts_lock); + return; +} + +/* + * __wt_rts_push_work -- + * Push a work unit to the queue. + */ +int +__wt_rts_push_work(WT_SESSION_IMPL *session, const char *uri, wt_timestamp_t rollback_timestamp) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_RTS_WORK_UNIT *entry; + + conn = S2C(session); + + WT_RET(__wt_calloc_one(session, &entry)); + WT_ERR(__wt_strdup(session, uri, &entry->uri)); + entry->rollback_timestamp = rollback_timestamp; + + __wt_spin_lock(session, &conn->rts->rts_lock); + TAILQ_INSERT_TAIL(&conn->rts->rtsqh, entry, q); + __wt_spin_unlock(session, &conn->rts->rts_lock); + __wt_cond_signal(session, conn->rts->thread_cond); + + return (0); +err: + __wt_free(session, entry); + return (ret); +} + +/* + * __rts_btree -- + * Perform rollback to stable on a single uri. + */ +static int +__rts_btree(WT_SESSION_IMPL *session, const char *uri, wt_timestamp_t rollback_timestamp) +{ + WT_DECL_RET; + + /* Open a handle for processing. */ + ret = __wt_session_get_dhandle(session, uri, NULL, NULL, 0); + if (ret != 0) + WT_RET_MSG(session, ret, "%s: unable to open handle%s", uri, + ret == EBUSY ? ", error indicates handle is unavailable due to concurrent use" : ""); + ret = __wt_rts_btree_walk_btree(session, rollback_timestamp); + WT_TRET(__wt_session_release_dhandle(session)); + return (ret); +} + +/* + * __wt_rts_btree_work_unit -- + * Perform rollback to stable on a single work unit. + */ +int +__wt_rts_btree_work_unit(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry) +{ + return (__rts_btree(session, entry->uri, entry->rollback_timestamp)); +} + /* * __wt_rts_btree_walk_btree_apply -- * Perform rollback to stable on a single file. @@ -160,8 +260,7 @@ __wt_rts_btree_walk_btree_apply( uint64_t rollback_txnid, write_gen; uint32_t btree_id; char ts_string[2][WT_TS_INT_STRING_SIZE]; - bool dhandle_allocated, has_txn_updates_gt_than_ckpt_snap, modified; - bool prepared_updates; + bool has_txn_updates_gt_than_ckpt_snap, modified, prepared_updates, table_skipped; /* Ignore non-btree objects as well as the metadata and history store files. */ if (!WT_BTREE_PREFIX(uri) || strcmp(uri, WT_HS_URI) == 0 || strcmp(uri, WT_METAFILE_URI) == 0) @@ -170,7 +269,7 @@ __wt_rts_btree_walk_btree_apply( addr_size = 0; rollback_txnid = 0; write_gen = 0; - dhandle_allocated = false; + table_skipped = true; /* Find out the max durable timestamp of the object from checkpoint. */ newest_start_durable_ts = newest_stop_durable_ts = WT_TS_NONE; @@ -260,26 +359,22 @@ __wt_rts_btree_walk_btree_apply( if (modified || max_durable_ts > rollback_timestamp || prepared_updates || has_txn_updates_gt_than_ckpt_snap) { - /* Open a handle for processing. */ - ret = __wt_session_get_dhandle(session, uri, NULL, NULL, 0); - if (ret != 0) - WT_ERR_MSG(session, ret, "%s: unable to open handle%s", uri, - ret == EBUSY ? ", error indicates handle is unavailable due to concurrent use" : ""); - dhandle_allocated = true; - __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_TREE - "rolling back tree. modified=%s, durable_timestamp=%s > stable_timestamp=%s: %s, " + "%s: rolling back tree. modified=%s, durable_timestamp=%s > stable_timestamp=%s: %s, " "has_prepared_updates=%s, txnid=%" PRIu64 " > recovery_checkpoint_snap_min=%" PRIu64 ": %s", - S2BT(session)->modified ? "true" : "false", - __wt_timestamp_to_string(max_durable_ts, ts_string[0]), + uri, modified ? "true" : "false", __wt_timestamp_to_string(max_durable_ts, ts_string[0]), __wt_timestamp_to_string(rollback_timestamp, ts_string[1]), max_durable_ts > rollback_timestamp ? "true" : "false", prepared_updates ? "true" : "false", rollback_txnid, S2C(session)->recovery_ckpt_snap_min, has_txn_updates_gt_than_ckpt_snap ? "true" : "false"); - WT_ERR(__wt_rts_btree_walk_btree(session, rollback_timestamp)); + if (S2C(session)->rts->threads == 0) + WT_ERR(__rts_btree(session, uri, rollback_timestamp)); + else + WT_ERR(__wt_rts_push_work(session, uri, rollback_timestamp)); + table_skipped = false; } else __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_TREE_SKIP @@ -300,7 +395,7 @@ __wt_rts_btree_walk_btree_apply( * timestamp to WT_TS_NONE, we need this exception. * 2. In-memory database - In this scenario, there is no history store to truncate. */ - if ((!dhandle_allocated || !S2BT(session)->modified) && max_durable_ts == WT_TS_NONE && + if ((table_skipped || !modified) && max_durable_ts == WT_TS_NONE && !F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) { WT_ERR(__wt_config_getones(session, config, "id", &cval)); btree_id = (uint32_t)cval.val; @@ -311,8 +406,6 @@ __wt_rts_btree_walk_btree_apply( } err: - if (dhandle_allocated) - WT_TRET(__wt_session_release_dhandle(session)); return (ret); } diff --git a/src/support/thread_group.c b/src/support/thread_group.c index 7a9e00af7..8ad0caf03 100644 --- a/src/support/thread_group.c +++ b/src/support/thread_group.c @@ -181,8 +181,8 @@ __thread_group_resize(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t WT_ERR(__wt_calloc_one(session, &thread)); /* Threads get their own session. */ session_flags = LF_ISSET(WT_THREAD_CAN_WAIT) ? WT_SESSION_CAN_WAIT : 0; - WT_ERR( - __wt_open_internal_session(conn, group->name, false, session_flags, 0, &thread->session)); + WT_ERR(__wt_open_internal_session( + conn, group->name, false, session_flags, session->lock_flags, &thread->session)); if (LF_ISSET(WT_THREAD_PANIC_FAIL)) F_SET(thread, WT_THREAD_PANIC_FAIL); thread->id = i; diff --git a/test/suite/test_rollback_to_stable43.py b/test/suite/test_rollback_to_stable43.py new file mode 100644 index 000000000..912e46a3b --- /dev/null +++ b/test/suite/test_rollback_to_stable43.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import wttest +from wtdataset import SimpleDataSet +from wiredtiger import stat +from wtscenario import make_scenarios +from rollback_to_stable_util import test_rollback_to_stable_base + +# test_rollback_to_stable43.py +# Test that rollback to stable remove all the unstable updates with multiple worker threads. +class test_rollback_to_stable43(test_rollback_to_stable_base): + + # For FLCS, set the page size down. Otherwise for the in-memory scenarios we get enough + # updates on the page that the in-memory page footprint exceeds the default maximum + # in-memory size, and that in turn leads to pathological behavior where the page gets + # force-evicted over and over again trying to resolve/condense the updates. But they + # don't (for in-memory, they can't be moved to the history store) so this leads to a + # semi-livelock state that makes the test some 20x slower than it needs to be. + # + # FUTURE: it would be better if the system adjusted on its own, but it's not critical + # and this workload (with every entry on the page modified repeatedly) isn't much like + # anything that happens in production. + format_values = [ + ('column', dict(key_format='r', value_format='S', extraconfig='')), + ('column_fix', dict(key_format='r', value_format='8t', extraconfig=',leaf_page_max=4096')), + ('row_integer', dict(key_format='i', value_format='S', extraconfig='')), + ] + + in_memory_values = [ + ('no_inmem', dict(in_memory=False)), + ('inmem', dict(in_memory=True)) + ] + + dryrun_values = [ + ('no_dryrun', dict(dryrun=False)), + ('dryrun', dict(dryrun=True)) + ] + + worker_thread_values = [ + ('0', dict(threads=0)), + ('1', dict(threads=1)), + ('2', dict(threads=2)), + ('3', dict(threads=3)), + ('4', dict(threads=4)) + ] + + scenarios = make_scenarios(format_values, in_memory_values, dryrun_values, worker_thread_values) + + def conn_config(self): + config = 'cache_size=100MB,statistics=(all)' + if self.in_memory: + config += ',in_memory=true' + return config + + def test_rollback_to_stable(self): + nrows = 1000 + ntables = 10 + + # Create a tables. + for i in range(1, ntables + 1): + uri = "table:rollback_to_stable43" + str(i) + ds_config = self.extraconfig + ds_config += ',log=(enabled=false)' if self.in_memory else '' + ds = SimpleDataSet(self, uri, 0, + key_format=self.key_format, value_format=self.value_format, config=ds_config) + ds.populate() + + if self.value_format == '8t': + valuea = 97 + valueb = 98 + valuec = 99 + valued = 100 + else: + valuea = "aaaaa" * 100 + valueb = "bbbbb" * 100 + valuec = "ccccc" * 100 + valued = "ddddd" * 100 + + # Pin oldest and stable to timestamp 1. + self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(1) + + ',stable_timestamp=' + self.timestamp_str(1)) + + for i in range(1, ntables + 1): + uri = "table:rollback_to_stable43" + str(i) + self.large_updates(uri, valuea, ds, nrows, None, 10) + # Check that all updates are seen. + self.check(valuea, uri, nrows, None, 10) + + self.large_updates(uri, valueb, ds, nrows, None, 20) + # Check that the new updates are only seen after the update timestamp. + self.check(valueb, uri, nrows, None, 20) + + self.large_updates(uri, valuec, ds, nrows, None, 30) + # Check that the new updates are only seen after the update timestamp. + self.check(valuec, uri, nrows, None, 30) + + self.large_updates(uri, valued, ds, nrows, None, 40) + # Check that the new updates are only seen after the update timestamp. + self.check(valued, uri, nrows, None, 40) + + # Pin stable to timestamp 20. + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(20)) + + # Checkpoint to ensure that all the data is flushed. + self.session.breakpoint() + if not self.in_memory: + self.session.checkpoint() + + self.conn.rollback_to_stable('dryrun={}'.format('true' if self.dryrun else 'false') + ',threads=' + str(self.threads)) + # Check that the new updates are only seen after the update timestamp. + self.session.breakpoint() + + if self.dryrun: + self.check(valued, uri, nrows, None, 40) + else: + self.check(valueb, uri, nrows, None, 40) + + self.check(valueb, uri, nrows, None, 20) + self.check(valuea, uri, nrows, None, 10) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + calls = stat_cursor[stat.conn.txn_rts][2] + upd_aborted = (stat_cursor[stat.conn.txn_rts_upd_aborted][2] + + stat_cursor[stat.conn.txn_rts_hs_removed][2]) + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2] + stat_cursor.close() + + self.assertEqual(calls, 1) + self.assertEqual(keys_removed, 0) + self.assertEqual(keys_restored, 0) + self.assertGreater(pages_visited, 0) + + if self.dryrun: + self.assertEqual(upd_aborted, 0) + else: + self.assertGreaterEqual(upd_aborted, nrows * 2 * ntables) + +if __name__ == '__main__': + wttest.run() diff --git a/tools/rts_verifier/checker.py b/tools/rts_verifier/checker.py index 9cfc9a715..f6439724f 100644 --- a/tools/rts_verifier/checker.py +++ b/tools/rts_verifier/checker.py @@ -287,6 +287,10 @@ class Checker: # TODO expand this out pass + def __apply_check_wait_threads(self, operation): + # TODO expand this out + pass + def __apply_check_end(self, operation): # TODO expand this out pass diff --git a/tools/rts_verifier/operation.py b/tools/rts_verifier/operation.py index 050b65c37..f7a07be45 100644 --- a/tools/rts_verifier/operation.py +++ b/tools/rts_verifier/operation.py @@ -50,7 +50,8 @@ class OpType(Enum): STABLE_UPDATE_FOUND = 41 TREE_OBJECT_LOG = 42 UPDATE_CHAIN_VERIFY = 43 - SKIP_DEL = 44 + WAIT_THREADS = 44 + SKIP_DEL = 45 class Operation: def __init__(self, line): @@ -578,5 +579,8 @@ class Operation: matches = re.search('btree=(\d+)', line) self.btree_id = int(matches.group(1)) + def __init_wait_threads(self, line): + self.type = OpType.WAIT_THREADS + def __init_end(self, line): self.type = OpType.END