Added example and documentation on proper thread-safe design with evhtp.

This commit is contained in:
Mark Ellzey 2012-04-05 17:07:34 -04:00
parent ffd1858684
commit 154017ef22
2 changed files with 297 additions and 0 deletions

View file

@ -74,3 +74,20 @@ Libevhtp attempts to address these problems along with a wide variety of cool me
}
## Is evhtp thread-safe?
For simple usage with evhtp_use_threads(), yes. But for more extreme cases:
sorta, you are bound to the thread mechanisms of libevent itself.
But with proper design around libevhtp, thread issues can be out-of-sight,
out-of-mind.
What do you mean by this "proper design" statement?
Refer to the code in ./examples/thread_design.c. The comments go into great detail
of the hows and whys for proper design using libevhtp's threading model.
This example uses redis, mainly because most people who have asked me "is evhtp
thread-safe" were attempting to *other things* before sending a response to a
request. And on more than one occasion, those *other things* were communicating
with redis.

280
examples/thread_design.c Normal file
View file

@ -0,0 +1,280 @@
/*
* How to exploit the wonders of libevhtp's threading model to avoid using
* libevent's locking API.
*
* In this example we use Redis's Async API (Libhiredis) store and retr the following
* information for a request:
*
* Total requests seen.
* Total requests seen by the requestors IP address.
* All of the source ports seen used by the requestors IP address.
*
* We do this all using libevhtp's builtin thread-pool model, without the use of
* mutexes or evthread_use_pthreads() type stuff.
*
* The technique is simple:
* 1. Create your evhtp_t structure, assign callbacks like usual.
* 2. Call evhtp_use_threads() with a thread init callback.
* 3. Each time a thread starts, the thread init callback you defined will be
* called with information about that thread.
*
* First a bit of information about how evhtp does threading:
* libevhtp uses the evthr library, which works more like a threaded
* co-routine than a threadpool. Each evthr in a pool has its own unique
* event_base (and each evthr runs its own event_base_loop()). Under the
* hood when libevhtp sends a request to a thread, it calls
* "evthr_pool_defer(pool, _run_connection_in_thread, ...).
*
* The evthr library then finds a thread inside the pool with the lowest backlog,
* sends a packet over that threads socketpair containing information about what
* function to execute. It uses socketpairs because they can be treated as
* an event, thus able to be processed in a threads own unique
* event_base_loop().
*
* Knowing that, a connection in evhtp is never associated with the initial
* event_base that was passed to evhtp_new(), but instead the connection
* uses the evthr's unique event_base. This is what makes libevhtp's
* safe from thread-related race conditions.
*
* 4. Use the thread init callback as a place to put event type things on the
* threads event_base() instead of using the global one.
*
* In this code, that function is app_init_thread(). When this function is
* called, the first argument is the evthr_t of the thread that just
* started. This function uses "evthr_get_base(thread)" to get the
* event_base associated with this specific thread.
*
* Using that event_base, the function will start up an async redis
* connection. This redis connection is now tied to that thread, and can be
* used on a threaded request without locking (remember that your request
* has the same event_base as the thread it was executed in).
*
* We allocate a dummy structure "struct app" and then call
* "evthr_set_aux(thread, app)". This function sets some aux data which can
* be fetched at any point using evthr_get_aux(thread). We use this later on
* inside process_request()
*
* This part is the secret to evhtp threading success.
*
* 5. When a request has been fully processed, it will call the function
* "app_process_request()". Note here that the "arg" argument is NULL since no
* arguments were passed to evhtp_set_gencb().
*
* Since we want to do a bunch of redis stuff before sending a reply to the
* client, we must fetch the "struct app" data we allocated and set for the
* thread associated with this request (struct app * app =
* evthr_get_aux(thread);).
*
* struct app has our thread-specific redis connection ctx, so using that
* redisAsyncCommand() is called a bunch of times to queue up the commands
* which will be run.
*
* The last part of this technique is to call the function
* "evhtp_request_pause()". This essentially tells evhtp to flip the
* read-side of the connections file-descriptor OFF (This avoids potential
* situations where a client disconnected before all of the redis commands
* executed).
*
* 6. Each redis command is executed in order, and each callback will write to
* the requests output_buffer with relevant information from the result.
*
* 7. The last redis callback executed here is "redis_get_srcport_cb". It is
* the job os this function to call evhtp_send_reply() and then
* evhtp_request_resume().
*
* Using this design in conjunction with libevhtp makes the world an easier
* place to code.
*
* Compile: gcc thread_design.c -o thread_design -levhtp -levent -lhiredis
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <errno.h>
#include <evhtp.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
struct app_parent {
evhtp_t * evhtp;
evbase_t * evbase;
char * redis_host;
uint16_t redis_port;
};
struct app {
struct app_parent * parent;
evbase_t * evbase;
redisAsyncContext * redis;
};
static evthr_t *
get_request_thr(evhtp_request_t * request) {
evhtp_connection_t * htpconn;
evthr_t * thread;
htpconn = evhtp_request_get_connection(request);
thread = htpconn->thread;
return thread;
}
void
redis_global_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_global_incr_cb() failed\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"Total requests = %lld\n", reply->integer);
}
void
redis_srcaddr_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_srcaddr_incr_cb() failed\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"Requests from this source IP = %lld\n", reply->integer);
}
void
redis_set_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_set_srcport_cb() failed\n");
return;
}
if (!reply->integer) {
evbuffer_add_printf(request->buffer_out,
"This source port has been seen already.\n");
} else {
evbuffer_add_printf(request->buffer_out,
"This source port has never been seen.\n");
}
}
void
redis_get_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
int i;
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY) {
evbuffer_add_printf(request->buffer_out,
"redis_get_srcport_cb() failed.\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"source ports which have been seen for your ip:\n");
for (i = 0; i < reply->elements; i++) {
redisReply * elem = reply->element[i];
evbuffer_add_printf(request->buffer_out, "%s ", elem->str);
}
evbuffer_add(request->buffer_out, "\n", 1);
/* final callback for redis, so send the response */
evhtp_send_reply(request, EVHTP_RES_OK);
evhtp_request_resume(request);
}
void
app_process_request(evhtp_request_t * request, void * arg) {
struct sockaddr_in * sin;
struct app_parent * app_parent;
struct app * app;
evthr_t * thread;
evhtp_connection_t * conn;
char tmp[1024];
thread = get_request_thr(request);
conn = evhtp_request_get_connection(request);
app = (struct app *)evthr_get_aux(thread);
sin = (struct sockaddr_in *)conn->saddr;
evutil_inet_ntop(AF_INET, &sin->sin_addr, tmp, sizeof(tmp));
/* increment a global counter of hits on redis */
redisAsyncCommand(app->redis, redis_global_incr_cb,
(void *)request, "INCR requests:total");
/* increment a counter for hits from this source address on redis */
redisAsyncCommand(app->redis, redis_srcaddr_incr_cb,
(void *)request, "INCR requests:ip:%s", tmp);
/* add the source port of this request to a source-specific set */
redisAsyncCommand(app->redis, redis_set_srcport_cb, (void *)request,
"SADD requests:ip:%s:ports %d", tmp, ntohs(sin->sin_port));
/* get all of the ports this source address has used */
redisAsyncCommand(app->redis, redis_get_srcport_cb, (void *)request,
"SMEMBERS requests:ip:%s:ports", tmp);
/* pause the request processing */
evhtp_request_pause(request);
}
void
app_init_thread(evhtp_t * htp, evthr_t * thread, void * arg) {
struct app_parent * app_parent;
struct app * app;
app_parent = (struct app_parent *)arg;
app = calloc(sizeof(struct app), 1);
app->parent = app_parent;
app->evbase = evthr_get_base(thread);
app->redis = redisAsyncConnect(app_parent->redis_host, app_parent->redis_port);
redisLibeventAttach(app->redis, app->evbase);
evthr_set_aux(thread, app);
}
int
main(int argc, char ** argv) {
evbase_t * evbase;
evhtp_t * evhtp;
struct app_parent * app_p;
evbase = event_base_new();
evhtp = evhtp_new(evbase, NULL);
app_p = calloc(sizeof(struct app_parent), 1);
app_p->evhtp = evhtp;
app_p->evbase = evbase;
app_p->redis_host = "127.0.0.1";
app_p->redis_port = 6379;
evhtp_set_gencb(evhtp, app_process_request, NULL);
evhtp_use_threads(evhtp, app_init_thread, 4, app_p);
evhtp_bind_socket(evhtp, "127.0.0.1", 9090, 1024);
event_base_loop(evbase, 0);
return 0;
}