Skip to content

Allow txg_wait_synced_flags() and dmu_tx_assign() to return when the pool suspends #17355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions include/sys/dmu.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,30 @@ typedef enum dmu_object_type {
* the transaction is full. See the comment above dmu_tx_assign() for more
* details on the meaning of these flags.
*/
#define DMU_TX_NOWAIT (0ULL)
#define DMU_TX_WAIT (1ULL<<0)
#define DMU_TX_NOTHROTTLE (1ULL<<1)
typedef enum {
/*
* If the tx cannot be assigned to a transaction for any reason, do
* not block but return immediately.
*/
DMU_TX_NOWAIT = 0,

/*
* Assign the tx to the open transaction. If the open transaction is
* full, or the write throttle is active, block until the next
* transaction and try again. If the pool suspends while waiting
* and failmode=continue, return an error.
*/
DMU_TX_WAIT = (1 << 0),

/* If the write throttle would prevent the assignment, ignore it. */
DMU_TX_NOTHROTTLE = (1 << 1),

/*
* With DMU_TX_WAIT, always block if the pool suspends during
* assignment, regardless of the value of the failmode= property.
*/
DMU_TX_SUSPEND = (1 << 2),
} dmu_tx_flag_t;

void byteswap_uint64_array(void *buf, size_t size);
void byteswap_uint32_array(void *buf, size_t size);
Expand Down Expand Up @@ -849,7 +870,7 @@ void dmu_tx_hold_spill(dmu_tx_t *tx, uint64_t object);
void dmu_tx_hold_sa(dmu_tx_t *tx, struct sa_handle *hdl, boolean_t may_grow);
void dmu_tx_hold_sa_create(dmu_tx_t *tx, int total_size);
void dmu_tx_abort(dmu_tx_t *tx);
int dmu_tx_assign(dmu_tx_t *tx, uint64_t flags);
int dmu_tx_assign(dmu_tx_t *tx, dmu_tx_flag_t flags);
void dmu_tx_wait(dmu_tx_t *tx);
void dmu_tx_commit(dmu_tx_t *tx);
void dmu_tx_mark_netfree(dmu_tx_t *tx);
Expand Down
6 changes: 5 additions & 1 deletion include/sys/dmu_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
/*
* Copyright (c) 2012, 2016 by Delphix. All rights reserved.
* Copyright (c) 2025, Klara, Inc.
*/

#ifndef _SYS_DMU_TX_H
Expand Down Expand Up @@ -80,6 +81,9 @@ struct dmu_tx {
/* has this transaction already been delayed? */
boolean_t tx_dirty_delayed;

/* whether dmu_tx_wait() should return on suspend */
boolean_t tx_break_on_suspend;

int tx_err;
};

Expand Down Expand Up @@ -143,7 +147,7 @@ extern dmu_tx_stats_t dmu_tx_stats;
* These routines are defined in dmu.h, and are called by the user.
*/
dmu_tx_t *dmu_tx_create(objset_t *dd);
int dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how);
int dmu_tx_assign(dmu_tx_t *tx, dmu_tx_flag_t flags);
void dmu_tx_commit(dmu_tx_t *tx);
void dmu_tx_abort(dmu_tx_t *tx);
uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
Expand Down
9 changes: 9 additions & 0 deletions include/sys/txg.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
/*
* Copyright (c) 2012, 2017 by Delphix. All rights reserved.
* Copyright (c) 2025, Klara, Inc.
*/

#ifndef _SYS_TXG_H
Expand Down Expand Up @@ -78,6 +79,9 @@ typedef enum {

/* If a signal arrives while waiting, abort and return EINTR */
TXG_WAIT_SIGNAL = (1 << 0),

/* If the pool suspends while waiting, abort and return ESHUTDOWN. */
TXG_WAIT_SUSPEND = (1 << 1),
} txg_wait_flag_t;

struct dsl_pool;
Expand Down Expand Up @@ -111,6 +115,11 @@ extern int txg_wait_synced_flags(struct dsl_pool *dp, uint64_t txg,
*/
extern void txg_wait_synced(struct dsl_pool *dp, uint64_t txg);

/*
* Wake all threads waiting in txg_wait_synced_flags() so they can reevaluate.
*/
extern void txg_wait_kick(struct dsl_pool *dp);

/*
* Wait until the given transaction group, or one after it, is
* the open transaction group. Try to make this happen as soon
Expand Down
11 changes: 10 additions & 1 deletion module/os/freebsd/zfs/vdev_geom.c
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,16 @@ vdev_geom_io_done(zio_t *zio)
}

if (bp == NULL) {
ASSERT3S(zio->io_error, ==, ENXIO);
if (zio_injection_enabled && zio->io_error == EIO)
/*
* Convert an injected EIO to ENXIO. This is needed to
* work around zio_handle_device_injection_impl() not
* currently being able to inject ENXIO directly, while
* the assertion below only allows ENXIO here.
*/
zio->io_error = SET_ERROR(ENXIO);
else
ASSERT3S(zio->io_error, ==, ENXIO);
return;
}

Expand Down
2 changes: 1 addition & 1 deletion module/zfs/dmu_redact.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ commit_rl_updates(objset_t *os, struct merge_data *md, uint64_t object,
{
dmu_tx_t *tx = dmu_tx_create_dd(spa_get_dsl(os->os_spa)->dp_mos_dir);
dmu_tx_hold_space(tx, sizeof (struct redact_block_list_node));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT | DMU_TX_SUSPEND));
uint64_t txg = dmu_tx_get_txg(tx);
if (!md->md_synctask_txg[txg & TXG_MASK]) {
dsl_sync_task_nowait(dmu_tx_pool(tx),
Expand Down
107 changes: 88 additions & 19 deletions module/zfs/dmu_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
* Copyright (c) 2012, 2017 by Delphix. All rights reserved.
* Copyright (c) 2024, Klara, Inc.
* Copyright (c) 2024, 2025, Klara, Inc.
*/

#include <sys/dmu.h>
Expand Down Expand Up @@ -1017,7 +1017,7 @@ dmu_tx_delay(dmu_tx_t *tx, uint64_t dirty)
* decreasing performance.
*/
static int
dmu_tx_try_assign(dmu_tx_t *tx, uint64_t flags)
dmu_tx_try_assign(dmu_tx_t *tx)
{
spa_t *spa = tx->tx_pool->dp_spa;

Expand All @@ -1032,19 +1032,10 @@ dmu_tx_try_assign(dmu_tx_t *tx, uint64_t flags)
DMU_TX_STAT_BUMP(dmu_tx_suspended);

/*
* If the user has indicated a blocking failure mode
* then return ERESTART which will block in dmu_tx_wait().
* Otherwise, return EIO so that an error can get
* propagated back to the VOP calls.
*
* Note that we always honor the `flags` flag regardless
* of the failuremode setting.
* Let dmu_tx_assign() know specifically what happened, so
* it can make the right choice based on the caller flags.
*/
if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_CONTINUE &&
!(flags & DMU_TX_WAIT))
return (SET_ERROR(EIO));

return (SET_ERROR(ERESTART));
return (SET_ERROR(ESHUTDOWN));
}

if (!tx->tx_dirty_delayed &&
Expand Down Expand Up @@ -1184,6 +1175,12 @@ dmu_tx_unassign(dmu_tx_t *tx)
* they have already called dmu_tx_wait() (though most likely on a
* different tx).
*
* If DMU_TX_SUSPEND is set, this indicates that this tx should ignore
* the pool being or becoming suspending while it is in progress. This will
* cause dmu_tx_assign() (and dmu_tx_wait()) to block until the pool resumes.
* If this flag is not set and the pool suspends, the return will be either
* ERESTART or EIO, depending on the value of the pool's failmode= property.
*
* It is guaranteed that subsequent successful calls to dmu_tx_assign()
* will assign the tx to monotonically increasing txgs. Of course this is
* not strong monotonicity, because the same txg can be returned multiple
Expand All @@ -1201,12 +1198,13 @@ dmu_tx_unassign(dmu_tx_t *tx)
* 1 <- dmu_tx_get_txg(T3)
*/
int
dmu_tx_assign(dmu_tx_t *tx, uint64_t flags)
dmu_tx_assign(dmu_tx_t *tx, dmu_tx_flag_t flags)
{
int err;

ASSERT(tx->tx_txg == 0);
ASSERT0(flags & ~(DMU_TX_WAIT | DMU_TX_NOTHROTTLE));
ASSERT0(flags & ~(DMU_TX_WAIT | DMU_TX_NOTHROTTLE | DMU_TX_SUSPEND));
IMPLY(flags & DMU_TX_SUSPEND, flags & DMU_TX_WAIT);
ASSERT(!dsl_pool_sync_context(tx->tx_pool));

/* If we might wait, we must not hold the config lock. */
Expand All @@ -1215,13 +1213,74 @@ dmu_tx_assign(dmu_tx_t *tx, uint64_t flags)
if ((flags & DMU_TX_NOTHROTTLE))
tx->tx_dirty_delayed = B_TRUE;

while ((err = dmu_tx_try_assign(tx, flags)) != 0) {
if (!(flags & DMU_TX_SUSPEND))
tx->tx_break_on_suspend = B_TRUE;

while ((err = dmu_tx_try_assign(tx)) != 0) {
dmu_tx_unassign(tx);

boolean_t suspended = (err == ESHUTDOWN);
if (suspended) {
/*
* Pool suspended. We need to decide whether to block
* and retry, or return error, depending on the
* caller's flags and the pool config.
*/
if (flags & DMU_TX_SUSPEND)
/*
* The caller expressly does not care about
* suspend, so treat it as a normal retry.
*/
err = SET_ERROR(ERESTART);
else if ((flags & DMU_TX_WAIT) &&
spa_get_failmode(tx->tx_pool->dp_spa) ==
ZIO_FAILURE_MODE_CONTINUE)
/*
* Caller wants to wait, but pool config is
* overriding that, so return EIO to be
* propagated back to userspace.
*/
err = SET_ERROR(EIO);
else
/* Anything else, we should just block. */
err = SET_ERROR(ERESTART);
}

/*
* Return unless we decided to retry, or the caller does not
* want to block.
*/
if (err != ERESTART || !(flags & DMU_TX_WAIT))
return (err);

/*
* Wait until there's room in this txg, or until it's been
* synced out and a new one is available.
*
* If we're here because the pool suspended above, then we
* unset tx_break_on_suspend to make sure that if dmu_tx_wait()
* has to fall back to a txg_wait_synced_flags(), it doesn't
* immediately return because the pool is suspended. That would
* then immediately return here, and we'd end up in a busy loop
* until the pool resumes.
*
* On the other hand, if the pool hasn't suspended yet, then it
* should be allowed to break a txg wait if the pool does
* suspend, so we can loop and reassess it in
* dmu_tx_try_assign().
*/
if (suspended)
tx->tx_break_on_suspend = B_FALSE;

dmu_tx_wait(tx);

/*
* Reset tx_break_on_suspend for DMU_TX_SUSPEND. We do this
* here so that it's available if we return for some other
* reason, and then the caller calls dmu_tx_wait().
*/
if (!(flags & DMU_TX_SUSPEND))
tx->tx_break_on_suspend = B_TRUE;
}

txg_rele_to_quiesce(&tx->tx_txgh);
Expand All @@ -1239,6 +1298,16 @@ dmu_tx_wait(dmu_tx_t *tx)
ASSERT(tx->tx_txg == 0);
ASSERT(!dsl_pool_config_held(tx->tx_pool));

/*
* Break on suspend according to whether or not DMU_TX_SUSPEND was
* supplied to the previous dmu_tx_assign() call. For clients, this
* ensures that after dmu_tx_assign() fails, the followup dmu_tx_wait()
* gets the same behaviour wrt suspend. See also the comments in
* dmu_tx_assign().
*/
txg_wait_flag_t flags =
(tx->tx_break_on_suspend ? TXG_WAIT_SUSPEND : TXG_WAIT_NONE);

before = gethrtime();

if (tx->tx_wait_dirty) {
Expand Down Expand Up @@ -1276,7 +1345,7 @@ dmu_tx_wait(dmu_tx_t *tx)
* obtain a tx. If that's the case then tx_lasttried_txg
* would not have been set.
*/
txg_wait_synced(dp, spa_last_synced_txg(spa) + 1);
txg_wait_synced_flags(dp, spa_last_synced_txg(spa) + 1, flags);
} else if (tx->tx_needassign_txh) {
dnode_t *dn = tx->tx_needassign_txh->txh_dnode;

Expand All @@ -1291,7 +1360,7 @@ dmu_tx_wait(dmu_tx_t *tx)
* out a TXG at which point we'll hopefully have synced
* a portion of the changes.
*/
txg_wait_synced(dp, spa_last_synced_txg(spa) + 1);
txg_wait_synced_flags(dp, spa_last_synced_txg(spa) + 1, flags);
}

spa_tx_assign_add_nsecs(spa, gethrtime() - before);
Expand Down
2 changes: 1 addition & 1 deletion module/zfs/dsl_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ dsl_scan_restart_resilver(dsl_pool_t *dp, uint64_t txg)
if (txg == 0) {
dmu_tx_t *tx;
tx = dmu_tx_create_dd(dp->dp_mos_dir);
VERIFY(0 == dmu_tx_assign(tx, DMU_TX_WAIT));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT | DMU_TX_SUSPEND));

txg = dmu_tx_get_txg(tx);
dp->dp_scan->scn_restart_txg = txg;
Expand Down
2 changes: 1 addition & 1 deletion module/zfs/dsl_synctask.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ dsl_sync_task_common(const char *pool, dsl_checkfunc_t *checkfunc,

top:
tx = dmu_tx_create_dd(dp->dp_mos_dir);
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT | DMU_TX_SUSPEND));

dst.dst_pool = dp;
dst.dst_txg = dmu_tx_get_txg(tx);
Expand Down
2 changes: 1 addition & 1 deletion module/zfs/spa.c
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ static void
spa_unload_log_sm_flush_all(spa_t *spa)
{
dmu_tx_t *tx = dmu_tx_create_dd(spa_get_dsl(spa)->dp_mos_dir);
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT | DMU_TX_SUSPEND));

ASSERT3U(spa->spa_log_flushall_txg, ==, 0);
spa->spa_log_flushall_txg = dmu_tx_get_txg(tx);
Expand Down
21 changes: 20 additions & 1 deletion module/zfs/txg.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
* Portions Copyright 2011 Martin Matuska
* Copyright (c) 2012, 2019 by Delphix. All rights reserved.
* Copyright (c) 2025, Klara, Inc.
*/

#include <sys/zfs_context.h>
Expand Down Expand Up @@ -705,7 +706,7 @@ txg_wait_synced_flags(dsl_pool_t *dp, uint64_t txg, txg_wait_flag_t flags)
int error = 0;
tx_state_t *tx = &dp->dp_tx;

ASSERT0(flags & ~TXG_WAIT_SIGNAL);
ASSERT0(flags & ~(TXG_WAIT_SIGNAL | TXG_WAIT_SUSPEND));
ASSERT(!dsl_pool_config_held(dp));

mutex_enter(&tx->tx_sync_lock);
Expand All @@ -723,6 +724,15 @@ txg_wait_synced_flags(dsl_pool_t *dp, uint64_t txg, txg_wait_flag_t flags)
* else interesting happens, we'll set an error and break out.
*/
while (tx->tx_synced_txg < txg) {
if ((flags & TXG_WAIT_SUSPEND) && spa_suspended(dp->dp_spa)) {
/*
* Pool suspended and the caller does not want to
* block; inform them immediately.
*/
error = SET_ERROR(ESHUTDOWN);
break;
}

dprintf("broadcasting sync more "
"tx_synced=%llu waiting=%llu dp=%px\n",
(u_longlong_t)tx->tx_synced_txg,
Expand Down Expand Up @@ -756,6 +766,15 @@ txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
VERIFY0(txg_wait_synced_flags(dp, txg, TXG_WAIT_NONE));
}

void
txg_wait_kick(dsl_pool_t *dp)
{
tx_state_t *tx = &dp->dp_tx;
mutex_enter(&tx->tx_sync_lock);
cv_broadcast(&tx->tx_sync_done_cv);
mutex_exit(&tx->tx_sync_lock);
}

/*
* Wait for the specified open transaction group. Set should_quiesce
* when the current open txg should be quiesced immediately.
Expand Down
2 changes: 1 addition & 1 deletion module/zfs/vdev_indirect.c
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ spa_condense_indirect_commit_entry(spa_t *spa,

dmu_tx_t *tx = dmu_tx_create_dd(spa_get_dsl(spa)->dp_mos_dir);
dmu_tx_hold_space(tx, sizeof (*vimep) + sizeof (count));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT));
VERIFY0(dmu_tx_assign(tx, DMU_TX_WAIT | DMU_TX_SUSPEND));
int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;

/*
Expand Down
Loading
Loading