Rework and simplify RCU code
Use __ATOMIC_RELAXED where possible. Dont store additional values in the users field. Reviewed-by: Neil Horman <nhorman@openssl.org> Reviewed-by: Tomas Mraz <tomas@openssl.org> (Merged from https://github.com/openssl/openssl/pull/26690)
This commit is contained in:
parent
65787e2dc2
commit
5949918f9a
2 changed files with 39 additions and 119 deletions
|
@ -270,25 +270,6 @@ static ossl_inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m)
|
|||
# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
|
||||
# endif
|
||||
|
||||
/*
|
||||
* users is broken up into 2 parts
|
||||
* bits 0-15 current readers
|
||||
* bit 32-63 ID
|
||||
*/
|
||||
# define READER_SHIFT 0
|
||||
# define ID_SHIFT 32
|
||||
/* TODO: READER_SIZE 32 in threads_win.c */
|
||||
# define READER_SIZE 16
|
||||
# define ID_SIZE 32
|
||||
|
||||
# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
|
||||
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
|
||||
# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
|
||||
READER_MASK))
|
||||
# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
|
||||
# define VAL_READER ((uint64_t)1 << READER_SHIFT)
|
||||
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
|
||||
|
||||
/*
|
||||
* This is the core of an rcu lock. It tracks the readers and writers for the
|
||||
* current quiescence point for a given lock. Users is the 64 bit value that
|
||||
|
@ -388,29 +369,16 @@ static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock)
|
|||
*/
|
||||
qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
|
||||
|
||||
/*
|
||||
* Notes of use of __ATOMIC_RELEASE
|
||||
* This counter is only read by the write side of the lock, and so we
|
||||
* specify __ATOMIC_RELEASE here to ensure that the write side of the
|
||||
* lock see this during the spin loop read of users, as it waits for the
|
||||
* reader count to approach zero
|
||||
*/
|
||||
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
|
||||
__ATOMIC_RELEASE);
|
||||
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
/* if the idx hasn't changed, we're good, else try again */
|
||||
if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx,
|
||||
__ATOMIC_ACQUIRE))
|
||||
__ATOMIC_RELAXED))
|
||||
break;
|
||||
|
||||
/*
|
||||
* Notes on use of __ATOMIC_RELEASE
|
||||
* As with the add above, we want to ensure that this decrement is
|
||||
* seen by the write side of the lock as soon as it happens to prevent
|
||||
* undue spinning waiting for write side completion
|
||||
*/
|
||||
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
|
||||
__ATOMIC_RELEASE);
|
||||
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
|
||||
__ATOMIC_RELAXED);
|
||||
}
|
||||
|
||||
return &lock->qp_group[qp_idx];
|
||||
|
@ -477,14 +445,14 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
|||
for (i = 0; i < MAX_QPS; i++) {
|
||||
if (data->thread_qps[i].lock == lock) {
|
||||
/*
|
||||
* As with read side acquisition, we use __ATOMIC_RELEASE here
|
||||
* to ensure that the decrement is published immediately
|
||||
* to any write side waiters
|
||||
* we have to use __ATOMIC_RELEASE here
|
||||
* to ensure that all preceding read instructions complete
|
||||
* before the decrement is visible to ossl_synchronize_rcu
|
||||
*/
|
||||
data->thread_qps[i].depth--;
|
||||
if (data->thread_qps[i].depth == 0) {
|
||||
ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
|
||||
VAL_READER, __ATOMIC_RELEASE);
|
||||
(uint64_t)1, __ATOMIC_RELEASE);
|
||||
OPENSSL_assert(ret != UINT64_MAX);
|
||||
data->thread_qps[i].qp = NULL;
|
||||
data->thread_qps[i].lock = NULL;
|
||||
|
@ -503,9 +471,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
|||
* Write side allocation routine to get the current qp
|
||||
* and replace it with a new one
|
||||
*/
|
||||
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
||||
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
|
||||
{
|
||||
uint64_t new_id;
|
||||
uint32_t current_idx;
|
||||
|
||||
pthread_mutex_lock(&lock->alloc_lock);
|
||||
|
@ -528,29 +495,11 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
|||
lock->current_alloc_idx =
|
||||
(lock->current_alloc_idx + 1) % lock->group_count;
|
||||
|
||||
/* get and insert a new id */
|
||||
new_id = VAL_ID(lock->id_ctr);
|
||||
*curr_id = lock->id_ctr;
|
||||
lock->id_ctr++;
|
||||
|
||||
/*
|
||||
* Even though we are under a write side lock here
|
||||
* We need to use atomic instructions to ensure that the results
|
||||
* of this update are published to the read side prior to updating the
|
||||
* reader idx below
|
||||
*/
|
||||
ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
|
||||
__ATOMIC_RELEASE);
|
||||
ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
|
||||
__ATOMIC_RELEASE);
|
||||
|
||||
/*
|
||||
* Update the reader index to be the prior qp.
|
||||
* Note the use of __ATOMIC_RELEASE here is based on the corresponding use
|
||||
* of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
|
||||
* of this value to be seen on the read side immediately after it happens
|
||||
*/
|
||||
ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx,
|
||||
__ATOMIC_RELEASE);
|
||||
__ATOMIC_RELAXED);
|
||||
|
||||
/* wake up any waiters */
|
||||
pthread_cond_signal(&lock->alloc_signal);
|
||||
|
@ -594,6 +543,7 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
|
|||
{
|
||||
struct rcu_qp *qp;
|
||||
uint64_t count;
|
||||
uint32_t curr_id;
|
||||
struct rcu_cb_item *cb_items, *tmpcb;
|
||||
|
||||
pthread_mutex_lock(&lock->write_lock);
|
||||
|
@ -601,26 +551,27 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
|
|||
lock->cb_items = NULL;
|
||||
pthread_mutex_unlock(&lock->write_lock);
|
||||
|
||||
qp = update_qp(lock);
|
||||
|
||||
/*
|
||||
* wait for the reader count to reach zero
|
||||
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
|
||||
* prior __ATOMIC_RELEASE write operation in get_hold_current_qp
|
||||
* is visible prior to our read
|
||||
*/
|
||||
do {
|
||||
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
|
||||
} while (READER_COUNT(count) != 0);
|
||||
qp = update_qp(lock, &curr_id);
|
||||
|
||||
/* retire in order */
|
||||
pthread_mutex_lock(&lock->prior_lock);
|
||||
while (lock->next_to_retire != ID_VAL(count))
|
||||
while (lock->next_to_retire != curr_id)
|
||||
pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
|
||||
lock->next_to_retire++;
|
||||
pthread_cond_broadcast(&lock->prior_signal);
|
||||
pthread_mutex_unlock(&lock->prior_lock);
|
||||
|
||||
/*
|
||||
* wait for the reader count to reach zero
|
||||
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
|
||||
* prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
|
||||
* is visible prior to our read
|
||||
* however this is likely just necessary to silence a tsan warning
|
||||
*/
|
||||
do {
|
||||
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
|
||||
} while (count != (uint64_t)0);
|
||||
|
||||
retire_qp(lock, qp);
|
||||
|
||||
/* handle any callbacks that we have */
|
||||
|
|
|
@ -43,25 +43,6 @@ typedef struct {
|
|||
} CRYPTO_win_rwlock;
|
||||
# endif
|
||||
|
||||
/*
|
||||
* users is broken up into 2 parts
|
||||
* bits 0-31 current readers
|
||||
* bit 32-63 ID
|
||||
*/
|
||||
# define READER_SHIFT 0
|
||||
# define ID_SHIFT 32
|
||||
/* TODO: READER_SIZE 16 in threads_pthread.c */
|
||||
# define READER_SIZE 32
|
||||
# define ID_SIZE 32
|
||||
|
||||
# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
|
||||
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
|
||||
# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
|
||||
READER_MASK))
|
||||
# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
|
||||
# define VAL_READER ((int64_t)1 << READER_SHIFT)
|
||||
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
|
||||
|
||||
/*
|
||||
* This defines a quescent point (qp)
|
||||
* This is the barrier beyond which a writer
|
||||
|
@ -229,13 +210,13 @@ static ossl_inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock)
|
|||
for (;;) {
|
||||
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx,
|
||||
lock->rw_lock);
|
||||
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, VAL_READER, &tmp64,
|
||||
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)1, &tmp64,
|
||||
lock->rw_lock);
|
||||
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp,
|
||||
lock->rw_lock);
|
||||
if (qp_idx == tmp)
|
||||
break;
|
||||
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, -VAL_READER, &tmp64,
|
||||
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)-1, &tmp64,
|
||||
lock->rw_lock);
|
||||
}
|
||||
|
||||
|
@ -313,7 +294,7 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
|||
data->thread_qps[i].depth--;
|
||||
if (data->thread_qps[i].depth == 0) {
|
||||
CRYPTO_atomic_add64(&data->thread_qps[i].qp->users,
|
||||
-VAL_READER, (uint64_t *)&ret,
|
||||
(uint64_t)-1, (uint64_t *)&ret,
|
||||
lock->rw_lock);
|
||||
OPENSSL_assert(ret >= 0);
|
||||
data->thread_qps[i].qp = NULL;
|
||||
|
@ -328,12 +309,10 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
|||
* Write side allocation routine to get the current qp
|
||||
* and replace it with a new one
|
||||
*/
|
||||
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
||||
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
|
||||
{
|
||||
uint64_t new_id;
|
||||
uint32_t current_idx;
|
||||
uint32_t tmp;
|
||||
uint64_t tmp64;
|
||||
|
||||
ossl_crypto_mutex_lock(lock->alloc_lock);
|
||||
/*
|
||||
|
@ -355,20 +334,9 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
|||
(lock->current_alloc_idx + 1) % lock->group_count;
|
||||
|
||||
/* get and insert a new id */
|
||||
new_id = VAL_ID(lock->id_ctr);
|
||||
*curr_id = lock->id_ctr;
|
||||
lock->id_ctr++;
|
||||
|
||||
/*
|
||||
* Even though we are under a write side lock here
|
||||
* We need to use atomic instructions to ensure that the results
|
||||
* of this update are published to the read side prior to updating the
|
||||
* reader idx below
|
||||
*/
|
||||
CRYPTO_atomic_and(&lock->qp_group[current_idx].users, ID_MASK, &tmp64,
|
||||
lock->rw_lock);
|
||||
CRYPTO_atomic_add64(&lock->qp_group[current_idx].users, new_id, &tmp64,
|
||||
lock->rw_lock);
|
||||
|
||||
/* update the reader index to be the prior qp */
|
||||
tmp = lock->current_alloc_idx;
|
||||
InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp);
|
||||
|
@ -393,28 +361,29 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
|
|||
{
|
||||
struct rcu_qp *qp;
|
||||
uint64_t count;
|
||||
uint32_t curr_id;
|
||||
struct rcu_cb_item *cb_items, *tmpcb;
|
||||
|
||||
/* before we do anything else, lets grab the cb list */
|
||||
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items,
|
||||
NULL);
|
||||
|
||||
qp = update_qp(lock);
|
||||
|
||||
/* wait for the reader count to reach zero */
|
||||
do {
|
||||
CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
|
||||
} while (READER_COUNT(count) != 0);
|
||||
qp = update_qp(lock, &curr_id);
|
||||
|
||||
/* retire in order */
|
||||
ossl_crypto_mutex_lock(lock->prior_lock);
|
||||
while (lock->next_to_retire != ID_VAL(count))
|
||||
while (lock->next_to_retire != curr_id)
|
||||
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
|
||||
|
||||
lock->next_to_retire++;
|
||||
ossl_crypto_condvar_broadcast(lock->prior_signal);
|
||||
ossl_crypto_mutex_unlock(lock->prior_lock);
|
||||
|
||||
/* wait for the reader count to reach zero */
|
||||
do {
|
||||
CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
|
||||
} while (count != (uint64_t)0);
|
||||
|
||||
retire_qp(lock, qp);
|
||||
|
||||
/* handle any callbacks that we have */
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue