Rework and simplify RCU code
Use __ATOMIC_RELAXED where possible. Dont store additional values in the users field. Reviewed-by: Neil Horman <nhorman@openssl.org> Reviewed-by: Tomas Mraz <tomas@openssl.org> (Merged from https://github.com/openssl/openssl/pull/26690)
This commit is contained in:
parent
65787e2dc2
commit
5949918f9a
2 changed files with 39 additions and 119 deletions
|
@ -270,25 +270,6 @@ static ossl_inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m)
|
||||||
# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
|
# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
|
||||||
# endif
|
# endif
|
||||||
|
|
||||||
/*
|
|
||||||
* users is broken up into 2 parts
|
|
||||||
* bits 0-15 current readers
|
|
||||||
* bit 32-63 ID
|
|
||||||
*/
|
|
||||||
# define READER_SHIFT 0
|
|
||||||
# define ID_SHIFT 32
|
|
||||||
/* TODO: READER_SIZE 32 in threads_win.c */
|
|
||||||
# define READER_SIZE 16
|
|
||||||
# define ID_SIZE 32
|
|
||||||
|
|
||||||
# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
|
|
||||||
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
|
|
||||||
# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
|
|
||||||
READER_MASK))
|
|
||||||
# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
|
|
||||||
# define VAL_READER ((uint64_t)1 << READER_SHIFT)
|
|
||||||
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This is the core of an rcu lock. It tracks the readers and writers for the
|
* This is the core of an rcu lock. It tracks the readers and writers for the
|
||||||
* current quiescence point for a given lock. Users is the 64 bit value that
|
* current quiescence point for a given lock. Users is the 64 bit value that
|
||||||
|
@ -388,29 +369,16 @@ static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock)
|
||||||
*/
|
*/
|
||||||
qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
|
qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
|
||||||
|
|
||||||
/*
|
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
|
||||||
* Notes of use of __ATOMIC_RELEASE
|
__ATOMIC_ACQUIRE);
|
||||||
* This counter is only read by the write side of the lock, and so we
|
|
||||||
* specify __ATOMIC_RELEASE here to ensure that the write side of the
|
|
||||||
* lock see this during the spin loop read of users, as it waits for the
|
|
||||||
* reader count to approach zero
|
|
||||||
*/
|
|
||||||
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
|
|
||||||
__ATOMIC_RELEASE);
|
|
||||||
|
|
||||||
/* if the idx hasn't changed, we're good, else try again */
|
/* if the idx hasn't changed, we're good, else try again */
|
||||||
if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx,
|
if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx,
|
||||||
__ATOMIC_ACQUIRE))
|
__ATOMIC_RELAXED))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
/*
|
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
|
||||||
* Notes on use of __ATOMIC_RELEASE
|
__ATOMIC_RELAXED);
|
||||||
* As with the add above, we want to ensure that this decrement is
|
|
||||||
* seen by the write side of the lock as soon as it happens to prevent
|
|
||||||
* undue spinning waiting for write side completion
|
|
||||||
*/
|
|
||||||
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
|
|
||||||
__ATOMIC_RELEASE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &lock->qp_group[qp_idx];
|
return &lock->qp_group[qp_idx];
|
||||||
|
@ -477,14 +445,14 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
||||||
for (i = 0; i < MAX_QPS; i++) {
|
for (i = 0; i < MAX_QPS; i++) {
|
||||||
if (data->thread_qps[i].lock == lock) {
|
if (data->thread_qps[i].lock == lock) {
|
||||||
/*
|
/*
|
||||||
* As with read side acquisition, we use __ATOMIC_RELEASE here
|
* we have to use __ATOMIC_RELEASE here
|
||||||
* to ensure that the decrement is published immediately
|
* to ensure that all preceding read instructions complete
|
||||||
* to any write side waiters
|
* before the decrement is visible to ossl_synchronize_rcu
|
||||||
*/
|
*/
|
||||||
data->thread_qps[i].depth--;
|
data->thread_qps[i].depth--;
|
||||||
if (data->thread_qps[i].depth == 0) {
|
if (data->thread_qps[i].depth == 0) {
|
||||||
ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
|
ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
|
||||||
VAL_READER, __ATOMIC_RELEASE);
|
(uint64_t)1, __ATOMIC_RELEASE);
|
||||||
OPENSSL_assert(ret != UINT64_MAX);
|
OPENSSL_assert(ret != UINT64_MAX);
|
||||||
data->thread_qps[i].qp = NULL;
|
data->thread_qps[i].qp = NULL;
|
||||||
data->thread_qps[i].lock = NULL;
|
data->thread_qps[i].lock = NULL;
|
||||||
|
@ -503,9 +471,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
||||||
* Write side allocation routine to get the current qp
|
* Write side allocation routine to get the current qp
|
||||||
* and replace it with a new one
|
* and replace it with a new one
|
||||||
*/
|
*/
|
||||||
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
|
||||||
{
|
{
|
||||||
uint64_t new_id;
|
|
||||||
uint32_t current_idx;
|
uint32_t current_idx;
|
||||||
|
|
||||||
pthread_mutex_lock(&lock->alloc_lock);
|
pthread_mutex_lock(&lock->alloc_lock);
|
||||||
|
@ -528,29 +495,11 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
||||||
lock->current_alloc_idx =
|
lock->current_alloc_idx =
|
||||||
(lock->current_alloc_idx + 1) % lock->group_count;
|
(lock->current_alloc_idx + 1) % lock->group_count;
|
||||||
|
|
||||||
/* get and insert a new id */
|
*curr_id = lock->id_ctr;
|
||||||
new_id = VAL_ID(lock->id_ctr);
|
|
||||||
lock->id_ctr++;
|
lock->id_ctr++;
|
||||||
|
|
||||||
/*
|
|
||||||
* Even though we are under a write side lock here
|
|
||||||
* We need to use atomic instructions to ensure that the results
|
|
||||||
* of this update are published to the read side prior to updating the
|
|
||||||
* reader idx below
|
|
||||||
*/
|
|
||||||
ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
|
|
||||||
__ATOMIC_RELEASE);
|
|
||||||
ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
|
|
||||||
__ATOMIC_RELEASE);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Update the reader index to be the prior qp.
|
|
||||||
* Note the use of __ATOMIC_RELEASE here is based on the corresponding use
|
|
||||||
* of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
|
|
||||||
* of this value to be seen on the read side immediately after it happens
|
|
||||||
*/
|
|
||||||
ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx,
|
ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx,
|
||||||
__ATOMIC_RELEASE);
|
__ATOMIC_RELAXED);
|
||||||
|
|
||||||
/* wake up any waiters */
|
/* wake up any waiters */
|
||||||
pthread_cond_signal(&lock->alloc_signal);
|
pthread_cond_signal(&lock->alloc_signal);
|
||||||
|
@ -594,6 +543,7 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
|
||||||
{
|
{
|
||||||
struct rcu_qp *qp;
|
struct rcu_qp *qp;
|
||||||
uint64_t count;
|
uint64_t count;
|
||||||
|
uint32_t curr_id;
|
||||||
struct rcu_cb_item *cb_items, *tmpcb;
|
struct rcu_cb_item *cb_items, *tmpcb;
|
||||||
|
|
||||||
pthread_mutex_lock(&lock->write_lock);
|
pthread_mutex_lock(&lock->write_lock);
|
||||||
|
@ -601,26 +551,27 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
|
||||||
lock->cb_items = NULL;
|
lock->cb_items = NULL;
|
||||||
pthread_mutex_unlock(&lock->write_lock);
|
pthread_mutex_unlock(&lock->write_lock);
|
||||||
|
|
||||||
qp = update_qp(lock);
|
qp = update_qp(lock, &curr_id);
|
||||||
|
|
||||||
/*
|
|
||||||
* wait for the reader count to reach zero
|
|
||||||
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
|
|
||||||
* prior __ATOMIC_RELEASE write operation in get_hold_current_qp
|
|
||||||
* is visible prior to our read
|
|
||||||
*/
|
|
||||||
do {
|
|
||||||
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
|
|
||||||
} while (READER_COUNT(count) != 0);
|
|
||||||
|
|
||||||
/* retire in order */
|
/* retire in order */
|
||||||
pthread_mutex_lock(&lock->prior_lock);
|
pthread_mutex_lock(&lock->prior_lock);
|
||||||
while (lock->next_to_retire != ID_VAL(count))
|
while (lock->next_to_retire != curr_id)
|
||||||
pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
|
pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
|
||||||
lock->next_to_retire++;
|
lock->next_to_retire++;
|
||||||
pthread_cond_broadcast(&lock->prior_signal);
|
pthread_cond_broadcast(&lock->prior_signal);
|
||||||
pthread_mutex_unlock(&lock->prior_lock);
|
pthread_mutex_unlock(&lock->prior_lock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* wait for the reader count to reach zero
|
||||||
|
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
|
||||||
|
* prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
|
||||||
|
* is visible prior to our read
|
||||||
|
* however this is likely just necessary to silence a tsan warning
|
||||||
|
*/
|
||||||
|
do {
|
||||||
|
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
|
||||||
|
} while (count != (uint64_t)0);
|
||||||
|
|
||||||
retire_qp(lock, qp);
|
retire_qp(lock, qp);
|
||||||
|
|
||||||
/* handle any callbacks that we have */
|
/* handle any callbacks that we have */
|
||||||
|
|
|
@ -43,25 +43,6 @@ typedef struct {
|
||||||
} CRYPTO_win_rwlock;
|
} CRYPTO_win_rwlock;
|
||||||
# endif
|
# endif
|
||||||
|
|
||||||
/*
|
|
||||||
* users is broken up into 2 parts
|
|
||||||
* bits 0-31 current readers
|
|
||||||
* bit 32-63 ID
|
|
||||||
*/
|
|
||||||
# define READER_SHIFT 0
|
|
||||||
# define ID_SHIFT 32
|
|
||||||
/* TODO: READER_SIZE 16 in threads_pthread.c */
|
|
||||||
# define READER_SIZE 32
|
|
||||||
# define ID_SIZE 32
|
|
||||||
|
|
||||||
# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
|
|
||||||
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
|
|
||||||
# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
|
|
||||||
READER_MASK))
|
|
||||||
# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
|
|
||||||
# define VAL_READER ((int64_t)1 << READER_SHIFT)
|
|
||||||
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This defines a quescent point (qp)
|
* This defines a quescent point (qp)
|
||||||
* This is the barrier beyond which a writer
|
* This is the barrier beyond which a writer
|
||||||
|
@ -229,13 +210,13 @@ static ossl_inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock)
|
||||||
for (;;) {
|
for (;;) {
|
||||||
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx,
|
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx,
|
||||||
lock->rw_lock);
|
lock->rw_lock);
|
||||||
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, VAL_READER, &tmp64,
|
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)1, &tmp64,
|
||||||
lock->rw_lock);
|
lock->rw_lock);
|
||||||
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp,
|
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp,
|
||||||
lock->rw_lock);
|
lock->rw_lock);
|
||||||
if (qp_idx == tmp)
|
if (qp_idx == tmp)
|
||||||
break;
|
break;
|
||||||
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, -VAL_READER, &tmp64,
|
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)-1, &tmp64,
|
||||||
lock->rw_lock);
|
lock->rw_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +294,7 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
||||||
data->thread_qps[i].depth--;
|
data->thread_qps[i].depth--;
|
||||||
if (data->thread_qps[i].depth == 0) {
|
if (data->thread_qps[i].depth == 0) {
|
||||||
CRYPTO_atomic_add64(&data->thread_qps[i].qp->users,
|
CRYPTO_atomic_add64(&data->thread_qps[i].qp->users,
|
||||||
-VAL_READER, (uint64_t *)&ret,
|
(uint64_t)-1, (uint64_t *)&ret,
|
||||||
lock->rw_lock);
|
lock->rw_lock);
|
||||||
OPENSSL_assert(ret >= 0);
|
OPENSSL_assert(ret >= 0);
|
||||||
data->thread_qps[i].qp = NULL;
|
data->thread_qps[i].qp = NULL;
|
||||||
|
@ -328,12 +309,10 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
|
||||||
* Write side allocation routine to get the current qp
|
* Write side allocation routine to get the current qp
|
||||||
* and replace it with a new one
|
* and replace it with a new one
|
||||||
*/
|
*/
|
||||||
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
|
||||||
{
|
{
|
||||||
uint64_t new_id;
|
|
||||||
uint32_t current_idx;
|
uint32_t current_idx;
|
||||||
uint32_t tmp;
|
uint32_t tmp;
|
||||||
uint64_t tmp64;
|
|
||||||
|
|
||||||
ossl_crypto_mutex_lock(lock->alloc_lock);
|
ossl_crypto_mutex_lock(lock->alloc_lock);
|
||||||
/*
|
/*
|
||||||
|
@ -355,20 +334,9 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
|
||||||
(lock->current_alloc_idx + 1) % lock->group_count;
|
(lock->current_alloc_idx + 1) % lock->group_count;
|
||||||
|
|
||||||
/* get and insert a new id */
|
/* get and insert a new id */
|
||||||
new_id = VAL_ID(lock->id_ctr);
|
*curr_id = lock->id_ctr;
|
||||||
lock->id_ctr++;
|
lock->id_ctr++;
|
||||||
|
|
||||||
/*
|
|
||||||
* Even though we are under a write side lock here
|
|
||||||
* We need to use atomic instructions to ensure that the results
|
|
||||||
* of this update are published to the read side prior to updating the
|
|
||||||
* reader idx below
|
|
||||||
*/
|
|
||||||
CRYPTO_atomic_and(&lock->qp_group[current_idx].users, ID_MASK, &tmp64,
|
|
||||||
lock->rw_lock);
|
|
||||||
CRYPTO_atomic_add64(&lock->qp_group[current_idx].users, new_id, &tmp64,
|
|
||||||
lock->rw_lock);
|
|
||||||
|
|
||||||
/* update the reader index to be the prior qp */
|
/* update the reader index to be the prior qp */
|
||||||
tmp = lock->current_alloc_idx;
|
tmp = lock->current_alloc_idx;
|
||||||
InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp);
|
InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp);
|
||||||
|
@ -393,28 +361,29 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
|
||||||
{
|
{
|
||||||
struct rcu_qp *qp;
|
struct rcu_qp *qp;
|
||||||
uint64_t count;
|
uint64_t count;
|
||||||
|
uint32_t curr_id;
|
||||||
struct rcu_cb_item *cb_items, *tmpcb;
|
struct rcu_cb_item *cb_items, *tmpcb;
|
||||||
|
|
||||||
/* before we do anything else, lets grab the cb list */
|
/* before we do anything else, lets grab the cb list */
|
||||||
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items,
|
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
qp = update_qp(lock);
|
qp = update_qp(lock, &curr_id);
|
||||||
|
|
||||||
/* wait for the reader count to reach zero */
|
|
||||||
do {
|
|
||||||
CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
|
|
||||||
} while (READER_COUNT(count) != 0);
|
|
||||||
|
|
||||||
/* retire in order */
|
/* retire in order */
|
||||||
ossl_crypto_mutex_lock(lock->prior_lock);
|
ossl_crypto_mutex_lock(lock->prior_lock);
|
||||||
while (lock->next_to_retire != ID_VAL(count))
|
while (lock->next_to_retire != curr_id)
|
||||||
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
|
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
|
||||||
|
|
||||||
lock->next_to_retire++;
|
lock->next_to_retire++;
|
||||||
ossl_crypto_condvar_broadcast(lock->prior_signal);
|
ossl_crypto_condvar_broadcast(lock->prior_signal);
|
||||||
ossl_crypto_mutex_unlock(lock->prior_lock);
|
ossl_crypto_mutex_unlock(lock->prior_lock);
|
||||||
|
|
||||||
|
/* wait for the reader count to reach zero */
|
||||||
|
do {
|
||||||
|
CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
|
||||||
|
} while (count != (uint64_t)0);
|
||||||
|
|
||||||
retire_qp(lock, qp);
|
retire_qp(lock, qp);
|
||||||
|
|
||||||
/* handle any callbacks that we have */
|
/* handle any callbacks that we have */
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue