# HG changeset patch # User Nathan Phillip Brink # Date 2010-07-02 01:17:27 # Node ID 3efc48659d0ba52520aa47bf79c265c70709dd7c # Parent 22ee38bf849085262e16f5b6ea803b74278d864f Use poll() instead of select() (poll() is the thing to use nowadays ;-) ). diff --git a/src/server/distrend.c b/src/server/distrend.c --- a/src/server/distrend.c +++ b/src/server/distrend.c @@ -83,7 +83,7 @@ struct general_info int distrend_do(); int distrend_do_config(int argc, char *argv[], struct distrend_config **config); int distrend_config_free(struct distrend_config *config); -int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo); +int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo); /* **************XML Functions**************** */ @@ -100,8 +100,8 @@ int main(int argc, char *argv[]) /* Parse arguments */ int counter; int test = 0; /*< Interactive mode if 1 */ + int tmp; struct general_info general_info; - struct distrend_clientset *clients; enum clientstatus { @@ -154,19 +154,30 @@ int main(int argc, char *argv[]) /** Execute test function */ interactiveTest(test, &general_info); - if(distrend_listen(general_info.config, &clients)) + general_info.config->listens = distrend_listens_new(&general_info); + if(!general_info.config->listens) { - fprintf(stderr, "error listening\n"); + fprintf(stderr, "error initializing listens\n"); return 1; } + for(counter = 0; general_info.config->listen_ports[counter]; counter ++) + { + tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]); + if(tmp) + { + fprintf(stderr, "Error listening on port %d\n", general_info.config->listen_ports[counter]); + return 1; + } + } int slaveKey = 0; // Remotio should set me on a per-slave basis /* Main Loop */ + general_info.config->die = 0; while(!general_info.config->die) { int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */ - distrend_accept(general_info.config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)&general_info); + distrend_accept(general_info.config->listens); /* Run the watchdog, @TODO: like every 10 mins or something */ frame_watchdog(general_info.conn); @@ -194,7 +205,7 @@ int main(int argc, char *argv[]) distrenjob_free(&job); } - distrend_unlisten(general_info.config->listens, clients); + distrend_listen_free(general_info.config->listens); distrend_config_free(general_info.config); xmlcleanup(); @@ -208,7 +219,7 @@ int main(int argc, char *argv[]) /* ********************** Functions ************************* */ -int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo) +int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo) { size_t counter; char *tmp_str; @@ -246,11 +257,11 @@ int distrend_handle_request(struct distr fixedbuf[req->len] = '\0'; _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, fixedbuf); - distrend_send_disconnect(client, tmp_str); + distrend_send_disconnect(listens, client, tmp_str); } distren_request_new(&newreq, strlen(VERSION), DISTREN_REQUEST_VERSION); - distrend_client_write_request(client, newreq, VERSION); + distrend_client_write_request(listens, client, newreq, VERSION); distren_request_free(newreq); break; } @@ -308,26 +319,29 @@ int distrend_do_config(int argc, char *a /** grab listen blocks: */ - (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1)); + (*config)->listen_ports = malloc(sizeof(int) * (cfg_size((*config)->mycfg, "listen") + 1)); for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++) { cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter); - (*config)->listens[counter].port = cfg_getint(cfg_listen, "port"); - (*config)->listens[counter].sock = -1; + (*config)->listen_ports[counter] = cfg_getint(cfg_listen, "port"); } - memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen)); + (*config)->listen_ports[counter] = 0; fprintf(stderr, "using %s as datadir\n", (*config)->datadir); return 0; } + int distrend_config_free(struct distrend_config *config) { + distrend_listen_free(config->listens); options_free(config->options); + free(config->listen_ports); free(config); return 0; } + /* ************************** XML Functions ************************* */ // writes the general_info.xml file which is a copy of the general_info structure @@ -482,7 +496,6 @@ int interactiveTest(int test, struct gen int32_t frameNum = 0; int32_t newPriority = 0; int tmp = 0; - struct distrend_clientset *clients; fprintf(stderr,"Hello!\n"); @@ -557,14 +570,9 @@ int interactiveTest(int test, struct gen break; case 5: - if(distrend_listen(geninfo->config, &clients)) - { - fprintf(stderr, "error listening\n"); - return 1; - } while(1) { - distrend_accept(geninfo->config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)geninfo); + distrend_accept(geninfo->config->listens); /* code for reading data from clients either goes here or in distrend_accept(). it might make sense for us to just pass the distrend_accept() function a diff --git a/src/server/distrend.h b/src/server/distrend.h --- a/src/server/distrend.h +++ b/src/server/distrend.h @@ -31,7 +31,8 @@ struct distrend_config { cfg_t *mycfg; struct options_common *options; - struct distrend_listen *listens; /*< Null terminated array of structs */ + struct distrend_listens *listens; + int *listen_ports; char *datadir; int die; diff --git a/src/server/listen.c b/src/server/listen.c --- a/src/server/listen.c +++ b/src/server/listen.c @@ -18,6 +18,7 @@ */ #include "listen.h" +#include "common/protocol.h" #include #include @@ -27,55 +28,91 @@ #include #include #include -#include +#include #include #include /* local */ -struct distrend_clientset -{ - LIST *clients; - - int nfds; -}; - struct distrend_packet { size_t len; char *data; }; -int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state); +struct distrend_request_handler_info +{ + enum distren_request_type request_type; + distrend_handle_request_func_t handler; +}; + +int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state); int distrend_client_free(struct distrend_client *client); void distrend_packet_free(struct distrend_packet *packet); int distrend_makesocknonblock(int sock); int distrend_packets_collapse(QUEUE *queue, size_t len); +int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata); +struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset); + +int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events); +int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock); + +struct distrend_listens *distrend_listens_new(struct general_info *geninfo) +{ + struct distrend_listens *listens; + + listens = malloc(sizeof(struct distrend_listens)); + if(!listens) + return NULL; + + listens->listen_socks = list_init(); + listens->request_handlers = list_init(); + listens->clients = list_init(); + listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */ + listens->pollfds_len = 2; + listens->nfds = 0; + listens->geninfo = geninfo; + + if(!listens->listen_socks + || !listens->request_handlers + || !listens->clients + || !listens->pollfds) + { + if(listens->listen_socks) + list_free(listens->listen_socks, LIST_NODEALLOC); + if(listens->request_handlers) + list_free(listens->request_handlers, LIST_NODEALLOC); + if(listens->clients) + list_free(listens->clients, LIST_NODEALLOC); + free(listens->pollfds); + free(listens); + return NULL; + } + + return listens; +} /** clean up after a partially completed distrend_listen() call which ends in error. */ -int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter) +int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo) { int sock; - for(counter ++; counter > 0; counter --) - { - sock = config->listens[counter - 1].sock; - if(sock >= 0) - close(sock); - } - list_free(clients->clients, NULL); - free(clients); + sock = sockinfo->sock.sock; + if(sock >= 0) + close(sock); + + free(sockinfo); return 0; } -int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients) +int distrend_listen_add(struct distrend_listens *listens, int port) { int tmp; - size_t counter; + struct distrend_listen_sock *newsock; struct sockaddr_in6 sockaddr = { @@ -86,50 +123,48 @@ int distrend_listen(struct distrend_conf .sin6_scope_id = 0 }; - *clients = malloc(sizeof(struct distrend_clientset)); + sockaddr.sin6_port = htons(port); - (*clients)->clients = list_init(); - (*clients)->nfds = 1; + newsock = malloc(sizeof(struct distrend_listen_sock)); + if(!newsock) + return 1; - for(counter = 0; config->listens[counter].port; counter ++) + newsock->port = port; + newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0); + tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); + if(tmp == -1) { - sockaddr.sin6_port = htons(config->listens[counter].port); + perror("bind"); + distrend_listen_add_unwind(newsock); - config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0); - tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); - if(tmp == -1) - { - perror("bind"); - distrend_listen_unwind(config, *clients, counter); + return 1; + } - return 1; - } - - tmp = listen(config->listens->sock, 1); - if(tmp == -1) - { - perror("listen"); - distrend_listen_unwind(config, *clients, counter); + tmp = listen(newsock->sock.sock, 1); + if(tmp == -1) + { + perror("listen"); + distrend_listen_add_unwind(newsock); - return 1; - } + return 1; + } - tmp = distrend_makesocknonblock(config->listens->sock); - if(tmp) - { - /* error is already printed to user by distrend_makesocknonblock() */ - distrend_listen_unwind(config, *clients, counter); - return 1; - } + tmp = distrend_makesocknonblock(newsock->sock.sock); + if(tmp) + { + /* error is already printed to user by distrend_makesocknonblock() */ + distrend_listen_add_unwind(newsock); + return 1; } + distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM); + list_insert_after(listens->listen_socks, newsock, 0); + return 0; } -int distrend_handleread(struct distrend_config *config, - struct distrend_client *client, - distrend_handle_request_t handlereq, - void *handlereqdata) +int distrend_handleread(struct distrend_listens *listens, + struct distrend_client *client) { struct distrend_packet *packet; ssize_t readlen; @@ -138,13 +173,6 @@ int distrend_handleread(struct distrend_ struct distren_request *req; void *reqdata; - /** - we have to check that there's new data so that - we may call ourself recursively - */ - fd_set readfd; - struct timeval selecttimeout; - packet = malloc(sizeof(struct distrend_packet)); if(!packet) { @@ -154,13 +182,9 @@ int distrend_handleread(struct distrend_ packet->len = 0; packet->data = NULL; - FD_ZERO(&readfd); - FD_SET(client->sock, &readfd); - memset(&selecttimeout, '\0', sizeof(selecttimeout)); - select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout); - if(FD_ISSET(client->sock, &readfd)) - { - readlen = read(client->sock, buf, sizeof(buf)); + // if(revents & POLLRDNORM) + //{ + readlen = read(client->sock.sock, buf, sizeof(buf)); if(readlen == -1) { perror("read"); @@ -199,7 +223,7 @@ int distrend_handleread(struct distrend_ q_enqueue(client->inmsgs, packet, 0); client->inlen += packet->len; packet = NULL; - } /* if(FD_ISSET(client->sock, &readfd)) */ + // } /* */ /** Manage input, etc. @@ -262,48 +286,15 @@ int distrend_handleread(struct distrend_ client->inlen -= client->expectlen; client->expectlen = 0; - (*handlereq)(client, req, reqdata, handlereqdata); + distrend_dispatch_request(listens, client, req, reqdata); distren_request_free(req); } return 0; } -struct distrend_accept_fdset_prepare_data -{ - fd_set readfds; - fd_set writefds; - /** - stores the highest FD number which is one less than the first - arfgument to select(). So do select(maxfd + 1, ...) - */ - int maxfd; -}; -int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data, - struct distrend_client *client) -{ - if(client->state == DISTREND_CLIENT_DEAD) - return TRUE; - - if(client->sock > data->maxfd) - data->maxfd = client->sock; - FD_SET(client->sock, &data->readfds); - /** - Only select on an outgoing socket if we have something to - say. - */ - if(!q_empty(client->outmsgs)) - FD_SET(client->sock, &data->writefds); - - return TRUE; -} - struct distrend_accept_client_proc_data { - fd_set *fdset; - struct distrend_config *config; - distrend_handle_request_t handlereq; - void *handlereqdata; - enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode; + struct distrend_listens *listens; time_t current_time; }; int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data, @@ -311,19 +302,23 @@ int distrend_accept_client_proc(struct d { struct distrend_packet *packet; ssize_t written_amount; + short revents; if(client->state == DISTREND_CLIENT_DEAD) return TRUE; + /** + Manage timing-out clients. + */ if(data->current_time > client->cleanup_time) switch(client->state) { case DISTREND_CLIENT_PREAUTH: - distrend_send_disconnect(client, "You have failed to present authentication information in a timely manner. Cya ;-)"); + distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)"); break; case DISTREND_CLIENT_GOOD: - distrend_send_disconnect(client, "Ping timeout :-p"); + distrend_send_disconnect(data->listens, client, "Ping timeout :-p"); break; case DISTREND_CLIENT_BAD: @@ -334,22 +329,27 @@ int distrend_accept_client_proc(struct d break; } - if(!FD_ISSET(client->sock, data->fdset)) - /** continue iteration through the list */ - return TRUE; + revents = data->listens->pollfds[client->sock.pollfd_offset].revents; + if(revents & POLLRDNORM) + /* handle reading */ + { + fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n", + __FILE__, __LINE__, client->sock.sock); + distrend_handleread(data->listens, client); + } + if(revents & POLLWRNORM) + { + fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n", + __FILE__, __LINE__, client->sock.sock); - fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n", - __FILE__, __LINE__, client->sock, - (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ"); - - switch(data->mode) - { - case DISTREND_ACCEPT_CLIENT_WRITE: if(q_empty(client->outmsgs)) - return TRUE; + { + data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM; + return TRUE; + } packet = q_front(client->outmsgs); - written_amount = write(client->sock, packet->data, packet->len); + written_amount = write(client->sock.sock, packet->data, packet->len); /** Disconnect in case of write error. */ @@ -369,6 +369,9 @@ int distrend_accept_client_proc(struct d { q_dequeue(client->outmsgs); distrend_packet_free(packet); + + if(q_empty(client->outmsgs)) + data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM; } else { @@ -376,36 +379,47 @@ int distrend_accept_client_proc(struct d shifting seems the simplest solution. */ packet->len -= written_amount; + memmove(packet->data, packet->data + written_amount, packet->len); } - break; - - case DISTREND_ACCEPT_CLIENT_READ: - distrend_handleread(data->config, client, data->handlereq, data->handlereqdata); - break; } /** continue iteration through the list */ return TRUE; } -int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata) +int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock) +{ + short revents; + int newclientsock; + + revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents; + fprintf(stderr, "blah %d\n", (int)revents); + if(revents & POLLRDNORM) + { + newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL); + if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREAUTH)) + { + fprintf(stderr, "error allocating/adding client struct\n"); + return 1; + } + fprintf(stderr, "accepted new connection\n"); + } + + /** + Check other listen blocks too. + */ + return TRUE; +} + +int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata) { int tmp; - int newclientsock; - struct distrend_accept_fdset_prepare_data fdsets; struct distrend_accept_client_proc_data travinfo; struct distrend_client *newclient; - FD_ZERO(&fdsets.readfds); - FD_ZERO(&fdsets.writefds); - FD_SET(config->listens->sock, &fdsets.readfds); - fdsets.maxfd = config->listens->sock; - - list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR); - - fprintf(stderr, "select()..."); - tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL); + fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds); + poll(listens->pollfds, listens->nfds, -1); if(tmp == -1) { perror("select"); @@ -417,62 +431,42 @@ int distrend_accept(struct distrend_conf Deal first with data waiting to be sent and then with input data. */ - travinfo.config = config; + travinfo.listens = listens; + travinfo.current_time = time(NULL); /*< cache the time */ - travinfo.fdset = &fdsets.writefds; - travinfo.handlereq = handlereq; - travinfo.handlereqdata = handlereqdata; - travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE; - travinfo.current_time = time(NULL); /*< cache the time */ - list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); - - travinfo.fdset = &fdsets.readfds; - travinfo.mode = DISTREND_ACCEPT_CLIENT_READ; - list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); + list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE); /** Handle new connections. */ - if(FD_ISSET(config->listens->sock, &fdsets.readfds)) - { - newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL); - if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH)) - { - fprintf(stderr, "error allocating client struct\n"); - return 1; - } - clients->nfds ++; - list_insert_after(clients->clients, newclient, 0); - fprintf(stderr, "accepted new connection\n"); - distrend_client_write(newclient, "hello\n", 6); - } - + list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE); /** Until liblist supports deleting nodes during a traversal, we clean dead clients here: */ - list_mvfront(clients->clients); - newclient = list_curr(clients->clients); + list_mvfront(listens->clients); + newclient = list_curr(listens->clients); while(newclient) { if(newclient->state == DISTREND_CLIENT_DEAD) { + distrend_listen_poll_deletefd(listens, &newclient->sock); distrend_client_free(newclient); - list_remove_curr(clients->clients); + list_remove_curr(listens->clients); fprintf(stderr, "removed dead connection\n"); } - list_mvnext(clients->clients); + list_mvnext(listens->clients); /* provide for termination of this loop */ - if(newclient == list_curr(clients->clients)) + if(newclient == list_curr(listens->clients)) newclient = NULL; else - newclient = list_curr(clients->clients); + newclient = list_curr(listens->clients); } return 0; } -int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients) +int distrend_listen_free(struct distrend_listens *listens) { fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__); @@ -486,40 +480,59 @@ void remotio_send_to_client(struct distr fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__); } -int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state) +/** + Adds a newly connected distrend_client to the listens->clients list + */ +int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state) { int tmp; + struct distrend_client *client; tmp = distrend_makesocknonblock(sock); if(tmp) { fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n"); + close(sock); return 1; } - *client = malloc(sizeof(struct distrend_client)); - if(!*client) + client = malloc(sizeof(struct distrend_client)); + if(!client) { fprintf(stderr, "OOM\n"); + close(sock); return 1; } - (*client)->sock = sock; - (*client)->state = state; - (*client)->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME; - (*client)->inlen = 0; - (*client)->expectlen = 0; - (*client)->inmsgs = q_init(); - (*client)->outmsgs = q_init(); + client->sock.sock = sock; + distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */ + + client->state = state; + client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME; + client->inlen = 0; + client->expectlen = 0; + client->inmsgs = q_init(); + client->outmsgs = q_init(); + + list_insert_after(listens->clients, client, 0); + + /** + For those using netcat/telnet to debug their internets. + */ + distrend_client_write(listens, client, "distren\n", 8); return 0; } +/** + This function shall only be called after the appropriate + distrend_listen_poll_deletefd() has been called + */ int distrend_client_free(struct distrend_client *client) { q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free); q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free); - close(client->sock); + close(client->sock.sock); return 0; } @@ -548,7 +561,7 @@ int distrend_makesocknonblock(int sock) } -int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen) +int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen) { struct distrend_packet *packet; @@ -572,10 +585,12 @@ int distrend_client_write(struct distren q_enqueue(client->outmsgs, packet, 0); + listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM; + return 0; } -int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data) +int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data) { char *towrite; int ret; @@ -593,7 +608,7 @@ int distrend_client_write_request(struct memcpy(towrite, req, sizeof(struct distren_request)); memcpy(towrite + sizeof(struct distren_request), data, req->len); - ret = distrend_client_write(client, towrite, msglen); + ret = distrend_client_write(listens, client, towrite, msglen); free(towrite); return ret; @@ -670,12 +685,12 @@ int distrend_packets_collapse(QUEUE *que } -int distrend_send_disconnect(struct distrend_client *client, char *quit_msg) +int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg) { struct distren_request *req; distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT); - distrend_client_write_request(client, req, quit_msg); + distrend_client_write_request(listens, client, req, quit_msg); distren_request_free(req); client->state = DISTREND_CLIENT_BAD; @@ -683,3 +698,198 @@ int distrend_send_disconnect(struct dist return 0; } + +int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler) +{ + struct distrend_request_handler_info *handler_info; + + handler_info = malloc(sizeof(struct distrend_request_handler_info)); + if(!handler_info) + return 1; + + handler_info->request_type = type; + handler_info->handler = handler; + list_insert_after(listens->request_handlers, handler_info, 0); + + return 0; +} + +struct distrend_dispatch_request_data +{ + struct general_info *geninfo; + struct distrend_client *client; + struct distren_request *req; + void *req_data; +}; + +/** + traversal function for distrend_dispatch_request(). + */ +int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info) +{ + if(handler_info->request_type == data->req->type) + (*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data); + + return TRUE; +} + +/** + helper for distrend_handleread() which looks up the correct + request handler and handles handing the the request to the + handler. :-p + */ +int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata) +{ + struct distrend_dispatch_request_data data; + + data.geninfo = listens->geninfo; + data.client = client; + data.req = req; + data.req_data = reqdata; + + list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE); + + return 0; +} + +/** + Helper for the distrend_listen_add() and functions that + call accept() to maintain the struct pollfd list in listens. + + @param listens The info related to listening to sockets. + @param fd the FD to add to the list of sockets. + @param poll_events The poll() events to register in the list of sockets. + @param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances. + */ +int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events) +{ + size_t new_len; + struct pollfd *new_pollfds; + + if(listens->nfds + 1 > listens->pollfds_len) + { + new_len = listens->pollfds_len * 2; + new_pollfds = malloc(sizeof(struct pollfd) * new_len); + if(!new_pollfds) + return 1; + + memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len); + + free(listens->pollfds); + listens->pollfds_len = new_len; + listens->pollfds = new_pollfds; + } + + sock->pollfd_offset = listens->nfds; + listens->pollfds[sock->pollfd_offset].fd = sock->sock; + listens->pollfds[sock->pollfd_offset].events = poll_events; + listens->nfds ++; + + fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events); + + return 0; +} + + +/** + Removes a particular struct pollfd from the listens->pollfds list. This + will likely also require remapping at least one other existing pollfd + entry, so this will also traverse the list of clients and update the + entry_offset of the client which has to be remapped. + + @param listens the struct distrend_listens + @param sock the relevent struct containing the offset of the pollfd that must be removed. + @return 0 on success, 1 on error. + */ +int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock) +{ + /* + the socket which is being moved from the end of the list + to the middle to fill in the hole left by removing sock + */ + struct distrend_polled_sock *displaced_sock; + + /* + special case: we're deleting the last element and thus + don't need to remap an existing entry. + */ + if(sock->pollfd_offset == listens->nfds - 1) + { + listens->nfds --; + return 0; + } + + /* + Shift whatever's at the end of the pollfds array into the position + of the pollfd being removed. Then update that client's struct distrend_client + entry to point to the new offset. + */ + memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd)); + displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1); + if(!displaced_sock) + { + fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n", + (int)listens->nfds - 1, (int)listens->nfds); + listens->nfds --; + return 1; + } + + displaced_sock->pollfd_offset = sock->pollfd_offset; + + listens->nfds --; + return 0; +} + +/** + For _distrend_polled_sock_get_by_offset_traverse() + */ +struct distrend_polled_sock_get_by_offset_data +{ + struct distrend_polled_sock *sock; + size_t pollfd_offset; +}; + +/** + List traversal helper function for distrend_polled_sock_get_by_offset(). + + @param sock The second argument is casting either sturct distrend_client or + struct distrend_listen_sock to a struct distrend_polled_sock which works + because each of the two former structs as a sturct distrend_polled_sock + as its first member. + */ +int _distrend_polled_sock_get_by_offset_traverse(struct distrend_polled_sock_get_by_offset_data *data, struct distrend_polled_sock *sock) +{ + if(data->pollfd_offset == sock->pollfd_offset) + { + data->sock = sock; + return FALSE; + } + return TRUE; +} + +/** + Returns a struct distrend_client based on its offset in the listens->pollfds + array. + + @param listens the listens. + @param pollfds_offset the index in the listens->pollfds array of the client we should find + @return NULL if the client is not found or a pointer to the struct distrend_client for the specified client. Generally, you would not free() this but delete the client by first removing it from the listen->client list. + */ +struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset) +{ + struct distrend_polled_sock_get_by_offset_data data; + + data.sock = NULL; + data.pollfd_offset = pollfds_offset; + + /** + These traversals both depend on struct distrend_polled_sock being + the first entry of struct distrend_listens and struct distrend_client. + */ + list_traverse(listens->clients, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE); + if(data.sock == NULL) + list_traverse(listens->listen_socks, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE); + + return data.sock; +} + diff --git a/src/server/listen.h b/src/server/listen.h --- a/src/server/listen.h +++ b/src/server/listen.h @@ -17,8 +17,8 @@ along with DistRen. If not, see . */ -struct distrend_clientset; -struct distrend_listen; +struct general_info; +struct distrend_listens; struct distrend_client; #ifndef _DISTREN_LISTEN_H @@ -27,6 +27,7 @@ struct distrend_client; #include "distrend.h" #include "common/protocol.h" +#include #include #include @@ -69,15 +70,57 @@ enum distrend_client_state DISTREND_CLIENT_DEAD }; -struct distrend_listen +/** + inheritence in C !?!? + */ +struct distrend_polled_sock { + int sock; + /* this socket's offset in the listens->pollfds array. */ + size_t pollfd_offset; +}; + +struct distrend_listen_sock +{ + /** + sock must be first, see distrend_polled_socket_get_by_offset() + */ + struct distrend_polled_sock sock; int port; - int sock; }; +struct distrend_listens +{ + /* of type (struct distrend_listen_sock *) */ + list_t listen_socks; + /* of type (struct distrend_request_handler_info) */ + list_t request_handlers; + /* the data to pass onto all request handlers */ + struct general_info *geninfo; + + /* of type (struct distrend_client) */ + list_t clients; + + /* the array passed to poll() */ + struct pollfd *pollfds; + /* the number of entries pollfds could hold */ + nfds_t pollfds_len; + /* the number of entries that pollfds does hold */ + nfds_t nfds; +}; + + +/** + The information necessary to recieve data from and send data + to a client. + */ struct distrend_client { - int sock; + /** + sock must be first, see distrend_polled_socket_get_by_offset() + */ + struct distrend_polled_sock sock; + enum distrend_client_state state; /** @@ -92,19 +135,45 @@ struct distrend_client size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */ QUEUE *inmsgs; QUEUE *outmsgs; + }; -typedef int(*distrend_handle_request_t)(struct distrend_client *client, struct distren_request *req, void *reqdata, void *data); +/** + A function signature that may be registered as a client + request handler. + @param client the client that sent the request + @param len the length of the message in bytes + @param data the message received from the client + */ +typedef int(*distrend_handle_request_func_t)(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data); /** - initializes the listens and clientset - @param config the configuration from distrend - @param clients a pointer to a struct distrend_clientset pointer which will be set to memory allocated for the clientset + Initializes the listens member of struct distrend_config. + + @param geninfo general info to apss to the request handler. + @return Must be free()d with distrend_listen_free(); +*/ +struct distrend_listens *distrend_listens_new(struct general_info *geninfo); + +/** + Adds a socket and configures it to listen(). + + + @param listens The handle for this set of listens, obtained via distrend_listen_init(). */ -int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients); +int distrend_listen_add(struct distrend_listens *listens, int port); + +/** + Register a request handler with the listener. + + @param config distrend's configuration + @param type the request type for which this handler should be called + @param handler the handler to call when a request of type type is received. + */ +int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler); /** checks states of the sockets I'm managing. If there are any new connections, @@ -112,20 +181,22 @@ int distrend_listen(struct distrend_conf I will block until there is some sort of activity, including signals. If you want to cleanly shut down, it's best to register signal handlers somewhere + + @param listens the config->listens after being initialized with distrend_listen() */ -int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata); +int distrend_accept(struct distrend_listens *listens); /** - cleans listening socket. Unnecessary for a working server, currently a stub. + cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub. */ -int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients); +int distrend_listen_free(struct distrend_listens *listens); /** writes message to client. @param towrite the caller is expected to free this string. This function will strdup() it, in essence. */ -int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen); +int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen); /** writes request to client. @@ -133,7 +204,7 @@ int distrend_client_write(struct distren @param req the request struct. caller must free this. @param data the data of the request which is req->len bytes long. caller must free this. */ -int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data); +int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data); /** This is probably just NOT a placeholder for remotio @@ -144,7 +215,7 @@ void remotio_send_to_client(); Queue a DISTREN_REQUEST_DISCONNECT and prepare a client to be disconnected. */ -int distrend_send_disconnect(struct distrend_client *client, char *quit_msg); +int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg); #endif