Request pausing + threading fixes.

- Fixed issue where if a request is pipelined, or there is a socket error while
  the request is still marked as paused, the connection and request would be
  free'd. The expected behaviour is to not free until the request is resumed.
This commit is contained in:
Mark Ellzey 2012-12-07 16:16:11 -05:00
parent 9d8e243235
commit 0e774db72d
4 changed files with 44 additions and 2 deletions

View file

@ -204,3 +204,6 @@ target_link_libraries(test_basic libevhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS})
add_executable(test_vhost test_vhost.c)
target_link_libraries(test_vhost libevhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS})
#add_executable(td examples/thread_design.c)
#target_link_libraries(td libevhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS} hiredis)

31
evhtp.c
View file

@ -832,6 +832,10 @@ static int
_evhtp_request_parser_start(htparser * p) {
evhtp_connection_t * c = htparser_get_userdata(p);
if (c->paused == 1) {
return -1;
}
if (c->request) {
if (c->request->finished == 1) {
_evhtp_request_free(c->request);
@ -1400,10 +1404,19 @@ static void
_evhtp_connection_resumecb(int fd, short events, void * arg) {
evhtp_connection_t * c = arg;
c->paused = 0;
bufferevent_enable(c->bev, EV_READ);
if (c->request) {
c->request->status = EVHTP_RES_OK;
}
if (c->free_connection == 1) {
evhtp_connection_free(c);
return;
}
_evhtp_connection_readcb(c->bev, c);
}
@ -1420,6 +1433,9 @@ _evhtp_connection_readcb(evbev_t * bev, void * arg) {
c->request->status = EVHTP_RES_OK;
}
if (c->paused == 1) {
return;
}
buf = evbuffer_pullup(bufferevent_get_input(bev), avail);
@ -1471,6 +1487,10 @@ _evhtp_connection_writecb(evbev_t * bev, void * arg) {
_evhtp_connection_write_hook(c);
if (c->paused == 1) {
return;
}
if (c->request->finished == 0 || evbuffer_get_length(bufferevent_get_output(bev))) {
return;
}
@ -1542,7 +1562,12 @@ _evhtp_connection_eventcb(evbev_t * bev, short events, void * arg) {
c->request->hooks->on_error_arg);
}
evhtp_connection_free((evhtp_connection_t *)arg);
if (c->paused == 1) {
c->free_connection = 1;
} else {
evhtp_connection_free((evhtp_connection_t *)arg);
}
}
static int
@ -1642,6 +1667,7 @@ _evhtp_connection_new(evhtp_t * htp, int sock) {
connection->error = 0;
connection->owner = 1;
connection->paused = 0;
connection->sock = sock;
connection->htp = htp;
connection->parser = htparser_new();
@ -1874,6 +1900,7 @@ evhtp_request_get_method(evhtp_request_t * r) {
void
evhtp_connection_pause(evhtp_connection_t * c) {
if ((bufferevent_get_enabled(c->bev) & EV_READ)) {
c->paused = 1;
bufferevent_disable(c->bev, EV_READ);
}
}
@ -1886,7 +1913,7 @@ evhtp_connection_pause(evhtp_connection_t * c) {
void
evhtp_connection_resume(evhtp_connection_t * c) {
if (!(bufferevent_get_enabled(c->bev) & EV_READ)) {
bufferevent_enable(c->bev, EV_READ);
/* bufferevent_enable(c->bev, EV_READ); */
event_active(c->resume_ev, EV_WRITE, 1);
}
}

View file

@ -426,6 +426,8 @@ struct evhtp_connection_s {
uint64_t max_body_size;
uint64_t body_bytes_read;
uint64_t num_requests;
char paused;
char free_connection;
};
struct evhtp_hooks_s {

View file

@ -128,6 +128,8 @@ redis_global_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg)
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
printf("global_incr_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_global_incr_cb() failed\n");
@ -143,6 +145,8 @@ redis_srcaddr_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg)
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
printf("incr_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_srcaddr_incr_cb() failed\n");
@ -158,6 +162,8 @@ redis_set_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg)
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
printf("set_srcport_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_set_srcport_cb() failed\n");
@ -179,6 +185,8 @@ redis_get_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg)
evhtp_request_t * request = arg;
int i;
printf("get_srcport_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY) {
evbuffer_add_printf(request->buffer_out,
"redis_get_srcport_cb() failed.\n");
@ -210,6 +218,8 @@ app_process_request(evhtp_request_t * request, void * arg) {
evhtp_connection_t * conn;
char tmp[1024];
printf("process_request(%p)\n", request);
thread = get_request_thr(request);
conn = evhtp_request_get_connection(request);
app = (struct app *)evthr_get_aux(thread);