libevhtp/evthr/evthr.c
2013-03-16 13:34:01 +08:00

487 lines
9.7 KiB
C

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <limits.h>
#include <errno.h>
#include <fcntl.h>
#ifndef WIN32
#include <sys/syscall.h>
#include <sys/ioctl.h>
#include <sys/queue.h>
#endif
#include <unistd.h>
#include <pthread.h>
#include <event2/event.h>
#include <event2/thread.h>
#include "evthr.h"
#if (__GNUC__ > 2 || ( __GNUC__ == 2 && __GNUC__MINOR__ > 4)) && (!defined(__STRICT_ANSI__) || __STRICT_ANSI__ == 0)
#define __unused__ __attribute__((unused))
#else
#define __unused__
#endif
#define _EVTHR_MAGIC 0x03fb
typedef struct evthr_cmd evthr_cmd_t;
typedef struct evthr_pool_slist evthr_pool_slist_t;
struct evthr_cmd {
uint8_t stop : 1;
void * args;
evthr_cb cb;
} __attribute__ ((packed));
TAILQ_HEAD(evthr_pool_slist, evthr);
struct evthr_pool {
int nthreads;
evthr_pool_slist_t threads;
};
struct evthr {
int cur_backlog;
int max_backlog;
int rdr;
int wdr;
char err;
ev_t * event;
evbase_t * evbase;
pthread_mutex_t lock;
pthread_mutex_t stat_lock;
pthread_mutex_t rlock;
pthread_t * thr;
evthr_init_cb init_cb;
void * arg;
void * aux;
TAILQ_ENTRY(evthr) next;
};
#ifndef TAILQ_FOREACH_SAFE
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = TAILQ_FIRST((head)); \
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
(var) = (tvar))
#endif
inline void
evthr_inc_backlog(evthr_t * evthr) {
__sync_fetch_and_add(&evthr->cur_backlog, 1);
}
inline void
evthr_dec_backlog(evthr_t * evthr) {
__sync_fetch_and_sub(&evthr->cur_backlog, 1);
}
inline int
evthr_get_backlog(evthr_t * evthr) {
return __sync_add_and_fetch(&evthr->cur_backlog, 0);
}
inline void
evthr_set_max_backlog(evthr_t * evthr, int max) {
evthr->max_backlog = max;
}
static void
_evthr_read_cmd(evutil_socket_t sock, short __unused__ which, void * args) {
evthr_t * thread;
evthr_cmd_t cmd;
ssize_t recvd;
if (!(thread = (evthr_t *)args)) {
return;
}
if (pthread_mutex_trylock(&thread->lock) != 0) {
return;
}
pthread_mutex_lock(&thread->rlock);
if ((recvd = recv(sock, &cmd, sizeof(evthr_cmd_t), 0)) <= 0) {
pthread_mutex_unlock(&thread->rlock);
if (errno == EAGAIN) {
goto end;
} else {
goto error;
}
}
if (recvd < sizeof(evthr_cmd_t)) {
pthread_mutex_unlock(&thread->rlock);
goto error;
}
pthread_mutex_unlock(&thread->rlock);
if (recvd != sizeof(evthr_cmd_t)) {
goto error;
}
if (cmd.stop == 1) {
goto stop;
}
if (cmd.cb != NULL) {
cmd.cb(thread, cmd.args, thread->arg);
goto done;
} else {
goto done;
}
stop:
event_base_loopbreak(thread->evbase);
done:
evthr_dec_backlog(thread);
end:
pthread_mutex_unlock(&thread->lock);
return;
error:
pthread_mutex_lock(&thread->stat_lock);
thread->cur_backlog = -1;
thread->err = 1;
pthread_mutex_unlock(&thread->stat_lock);
pthread_mutex_unlock(&thread->lock);
event_base_loopbreak(thread->evbase);
return;
} /* _evthr_read_cmd */
static void *
_evthr_loop(void * args) {
evthr_t * thread;
if (!(thread = (evthr_t *)args)) {
return NULL;
}
if (thread == NULL || thread->thr == NULL) {
pthread_exit(NULL);
}
thread->evbase = event_base_new();
thread->event = event_new(thread->evbase, thread->rdr,
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->event, NULL);
pthread_mutex_lock(&thread->lock);
if (thread->init_cb != NULL) {
thread->init_cb(thread, thread->arg);
}
pthread_mutex_unlock(&thread->lock);
event_base_loop(thread->evbase, 0);
if (thread->err == 1) {
fprintf(stderr, "FATAL ERROR!\n");
}
evthr_free(thread);
pthread_exit(NULL);
}
evthr_res
evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) {
int cur_backlog;
evthr_cmd_t cmd;
cur_backlog = evthr_get_backlog(thread);
if (thread->max_backlog) {
if (cur_backlog + 1 > thread->max_backlog) {
return EVTHR_RES_BACKLOG;
}
}
if (cur_backlog == -1) {
return EVTHR_RES_FATAL;
}
/* cmd.magic = _EVTHR_MAGIC; */
cmd.cb = cb;
cmd.args = arg;
cmd.stop = 0;
pthread_mutex_lock(&thread->rlock);
evthr_inc_backlog(thread);
if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
evthr_dec_backlog(thread);
pthread_mutex_unlock(&thread->rlock);
return EVTHR_RES_RETRY;
}
pthread_mutex_unlock(&thread->rlock);
return EVTHR_RES_OK;
}
evthr_res
evthr_stop(evthr_t * thread) {
evthr_cmd_t cmd;
/* cmd.magic = _EVTHR_MAGIC; */
cmd.cb = NULL;
cmd.args = NULL;
cmd.stop = 1;
pthread_mutex_lock(&thread->rlock);
if (write(thread->wdr, &cmd, sizeof(evthr_cmd_t)) < 0) {
pthread_mutex_unlock(&thread->rlock);
return EVTHR_RES_RETRY;
}
pthread_mutex_unlock(&thread->rlock);
return EVTHR_RES_OK;
}
evbase_t *
evthr_get_base(evthr_t * thr) {
return thr->evbase;
}
void
evthr_set_aux(evthr_t * thr, void * aux) {
thr->aux = aux;
}
void *
evthr_get_aux(evthr_t * thr) {
return thr->aux;
}
evthr_t *
evthr_new(evthr_init_cb init_cb, void * args) {
evthr_t * thread;
int fds[2];
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
return NULL;
}
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
if (!(thread = calloc(sizeof(evthr_t), sizeof(char)))) {
return NULL;
}
thread->thr = malloc(sizeof(pthread_t));
thread->init_cb = init_cb;
thread->arg = args;
thread->rdr = fds[0];
thread->wdr = fds[1];
if (pthread_mutex_init(&thread->lock, NULL)) {
evthr_free(thread);
return NULL;
}
if (pthread_mutex_init(&thread->stat_lock, NULL)) {
evthr_free(thread);
return NULL;
}
if (pthread_mutex_init(&thread->rlock, NULL)) {
evthr_free(thread);
return NULL;
}
return thread;
} /* evthr_new */
int
evthr_start(evthr_t * thread) {
int res;
if (thread == NULL || thread->thr == NULL) {
return -1;
}
if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
return -1;
}
res = pthread_detach(*thread->thr);
return res;
}
void
evthr_free(evthr_t * thread) {
if (thread == NULL) {
return;
}
if (thread->rdr > 0) {
close(thread->rdr);
}
if (thread->wdr > 0) {
close(thread->wdr);
}
if (thread->thr) {
free(thread->thr);
}
if (thread->event) {
event_free(thread->event);
}
if (thread->evbase) {
event_base_free(thread->evbase);
}
free(thread);
} /* evthr_free */
void
evthr_pool_free(evthr_pool_t * pool) {
evthr_t * thread;
evthr_t * save;
if (pool == NULL) {
return;
}
TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
TAILQ_REMOVE(&pool->threads, thread, next);
evthr_free(thread);
}
free(pool);
}
evthr_res
evthr_pool_stop(evthr_pool_t * pool) {
evthr_t * thr;
evthr_t * save;
if (pool == NULL) {
return EVTHR_RES_FATAL;
}
TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
evthr_stop(thr);
}
return EVTHR_RES_OK;
}
evthr_res
evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) {
evthr_t * min_thr = NULL;
evthr_t * thr = NULL;
if (pool == NULL) {
return EVTHR_RES_FATAL;
}
if (cb == NULL) {
return EVTHR_RES_NOCB;
}
/* find the thread with the smallest backlog */
TAILQ_FOREACH(thr, &pool->threads, next) {
evthr_t * m_save;
evthr_t * t_save;
int thr_backlog = 0;
int min_backlog = 0;
thr_backlog = evthr_get_backlog(thr);
if (min_thr) {
min_backlog = evthr_get_backlog(min_thr);
}
m_save = min_thr;
t_save = thr;
if (min_thr == NULL) {
min_thr = thr;
} else if (thr_backlog == 0) {
min_thr = thr;
} else if (thr_backlog < min_backlog) {
min_thr = thr;
}
if (evthr_get_backlog(min_thr) == 0) {
break;
}
}
return evthr_defer(min_thr, cb, arg);
} /* evthr_pool_defer */
evthr_pool_t *
evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) {
evthr_pool_t * pool;
int i;
if (nthreads == 0) {
return NULL;
}
if (!(pool = calloc(sizeof(evthr_pool_t), sizeof(char)))) {
return NULL;
}
pool->nthreads = nthreads;
TAILQ_INIT(&pool->threads);
for (i = 0; i < nthreads; i++) {
evthr_t * thread;
if (!(thread = evthr_new(init_cb, shared))) {
evthr_pool_free(pool);
return NULL;
}
TAILQ_INSERT_TAIL(&pool->threads, thread, next);
}
return pool;
}
void
evthr_pool_set_max_backlog(evthr_pool_t * pool, int max) {
evthr_t * thr;
TAILQ_FOREACH(thr, &pool->threads, next) {
evthr_set_max_backlog(thr, max);
}
}
int
evthr_pool_start(evthr_pool_t * pool) {
evthr_t * evthr = NULL;
if (pool == NULL) {
return -1;
}
TAILQ_FOREACH(evthr, &pool->threads, next) {
if (evthr_start(evthr) < 0) {
return -1;
}
usleep(5000);
}
return 0;
}