diff --git a/src/server/distrend.c b/src/server/distrend.c --- a/src/server/distrend.c +++ b/src/server/distrend.c @@ -511,7 +511,16 @@ int interactiveTest(int test, struct gen case 5: distrend_listen(geninfo->config, &clients); while(1) - distrend_accept(geninfo->config, clients); + { + distrend_accept(geninfo->config, clients); + /* + code for reading data from clients either goes here or in distrend_accept(). + it might make sense for us to just pass the distrend_accept() function a + callback which can handle packets or to have a generic packet handling + subsystem which gathers data into groups defined by by packet.h and then + passed onto the callback. + */ + } break; case 0: diff --git a/src/server/listen.c b/src/server/listen.c --- a/src/server/listen.c +++ b/src/server/listen.c @@ -19,6 +19,8 @@ #include "listen.h" +#include +#include #include #include #include @@ -27,6 +29,7 @@ #include #include #include +#include /* local */ @@ -34,16 +37,19 @@ struct distrend_clientset { LIST *clients; - /* - for select() - */ - fd_set readfds; - fd_set writefds; int nfds; }; +struct distrend_packet +{ + size_t len; + char *data; +}; + int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state); int distrend_client_free(struct distrend_client *client); +void distrend_packet_free(struct distrend_packet *packet); +int distrend_makesocknonblock(int sock); int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients) { @@ -61,8 +67,9 @@ int distrend_listen(struct distrend_conf *clients = malloc(sizeof(struct distrend_clientset)); (*clients)->clients = list_init(); + (*clients)->nfds = 1; - sockaddr.sin6_port = htonl(4050); + sockaddr.sin6_port = htons(4050); config->listens->sock = socket(AF_INET6, SOCK_STREAM, 0); tmp = bind(config->listens->sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); @@ -75,12 +82,20 @@ int distrend_listen(struct distrend_conf } tmp = listen(config->listens->sock, 1); + if(tmp == -1) + { + perror("listen"); + free(*clients); + + return 1; + } - FD_ZERO(&(*clients)->readfds); - FD_ZERO(&(*clients)->writefds); - - /** accept()able sockets are considered ``readable'' */ - FD_SET(config->listens->sock, &(*clients)->readfds); + tmp = distrend_makesocknonblock(config->listens->sock); + if(tmp) + { + free(*clients); + return 1; + } return 0; } @@ -88,27 +103,152 @@ int distrend_listen(struct distrend_conf int distrend_handleread(struct distrend_config *config, struct distrend_client *client) { - fprintf(stderr, "%s:%d: STUB: I'm supposed to read data from the client\n", - __FILE__, __LINE__); + struct distrend_packet *packet; + ssize_t readlen; + char buf[8192]; + + packet = malloc(sizeof(struct distrend_packet)); + if(!packet) + { + fprintf(stderr, "OOM\n"); + return 1; + } + packet->len = 0; + packet->data = NULL; + + readlen = read(client->sock, buf, sizeof(buf)); + if(readlen == -1) + { + perror("read"); + switch(errno) + { + case EINTR: + case EAGAIN: + break; + + default: + client->state = DISTREND_CLIENT_DEAD; + + free(packet); + return 1; + } + } + if(readlen == 0) + /* handle EOF */ + { + client->state = DISTREND_CLIENT_DEAD; + + free(packet); + return 1; + } + + packet->len = readlen; + packet->data = malloc(readlen); + if(!packet->data) + { + fprintf(stderr, "OOM!\n"); + + free(packet); + return 1; + } + memcpy(packet->data, buf, readlen); return 0; } +struct distrend_accept_fdset_prepare_data +{ + fd_set readfds; + fd_set writefds; + /** + stores the highest FD number which is one less than the first + arfgument to select(). So do select(maxfd + 1, ...) + */ + int maxfd; +}; +int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data, + struct distrend_client *client) +{ + if(client->state == DISTREND_CLIENT_DEAD) + return TRUE; + + if(client->sock > data->maxfd) + data->maxfd = client->sock; + FD_SET(client->sock, &data->readfds); + /** + Only select on an outgoing socket if we have something to + say. + */ + if(!q_empty(client->outmsgs)) + FD_SET(client->sock, &data->writefds); + + return TRUE; +} + struct distrend_accept_client_proc_data { fd_set *fdset; struct distrend_config *config; + enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode; }; int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data, struct distrend_client *client) { + struct distrend_packet *packet; + ssize_t written_amount; + + if(client->state == DISTREND_CLIENT_DEAD) + return TRUE; + if(!FD_ISSET(client->sock, data->fdset)) /** continue iteration through the list */ return TRUE; - fprintf(stderr, "%s:%d: My traversal says that sock %d has data waiting\n", - __FILE__, __LINE__, client->sock); - distrend_handleread(data->config, client); + fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n", + __FILE__, __LINE__, client->sock, + (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "write"); + + switch(data->mode) + { + case DISTREND_ACCEPT_CLIENT_WRITE: + if(q_empty(client->outmsgs)) + return TRUE; + + packet = q_front(client->outmsgs); + written_amount = write(client->sock, packet->data, packet->len); + /** + Disconnect in case of write error. + */ + if(written_amount == -1) + { + perror("write"); + /* + distrend_client_free(); + + Until liblist has the ability to delete nodes + during traversal, we'll have to free the client + during a client-list cleanup/pruning cycle. + */ + client->state = DISTREND_CLIENT_DEAD; + } + if(packet->len == written_amount) + { + q_dequeue(client->outmsgs); + distrend_packet_free(packet); + } + else + { + /** + shifting seems the simplest solution. + */ + packet->len -= written_amount; + } + break; + + case DISTREND_ACCEPT_CLIENT_READ: + distrend_handleread(data->config, client); + break; + } /** continue iteration through the list */ return TRUE; @@ -117,13 +257,21 @@ int distrend_accept_client_proc(struct d int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients) { int tmp; - fd_set readfds; - fd_set writefds; + int newclientsock; + + struct distrend_accept_fdset_prepare_data fdsets; + struct distrend_accept_client_proc_data travinfo; + struct distrend_client *newclient; - memcpy(&readfds, &clients->readfds, sizeof(fd_set)); - memcpy(&writefds, &clients->writefds, sizeof(fd_set)); + FD_ZERO(&fdsets.readfds); + FD_ZERO(&fdsets.writefds); + FD_SET(config->listens->sock, &fdsets.readfds); + fdsets.maxfd = config->listens->sock; - tmp = select(clients->nfds, &readfds, &writefds, NULL, (struct timeval *)NULL); + list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR); + + fprintf(stderr, "select()..."); + tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL); if(tmp == -1) { perror("select"); @@ -132,10 +280,57 @@ int distrend_accept(struct distrend_conf } /** - deal with all sockets that have data waiting + Deal first with data waiting to be sent and then + with input data. + */ + travinfo.config = config; + + travinfo.fdset = &fdsets.writefds; + travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE; + list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); + + travinfo.fdset = &fdsets.readfds; + travinfo.mode = DISTREND_ACCEPT_CLIENT_READ; + list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); + + /** + Handle new connections. */ - list_traverse(clients->clients, &readfds, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); + if(FD_ISSET(config->listens->sock, &fdsets.readfds)) + { + newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL); + if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH)) + { + fprintf(stderr, "error allocating client struct\n"); + return 1; + } + clients->nfds ++; + list_insert_after(clients->clients, newclient, 0); + fprintf(stderr, "accepted new connection\n"); + distrend_client_write(newclient, "hello\n", 6); + } + /** + Until liblist supports deleting nodes during a traversal, + we clean dead clients here: + */ + list_mvfront(clients->clients); + newclient = list_curr(clients->clients); + while(newclient) + { + if(newclient->state == DISTREND_CLIENT_DEAD) + { + distrend_client_free(newclient); + list_remove_curr(clients->clients); + fprintf(stderr, "removed dead connection\n"); + } + list_mvnext(clients->clients); + /* provide for termination of this loop */ + if(newclient == list_curr(clients->clients)) + newclient = NULL; + else + newclient = list_curr(clients->clients); + } return 0; } @@ -156,6 +351,15 @@ void remotio_send_to_client(struct distr int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state) { + int tmp; + + tmp = distrend_makesocknonblock(sock); + if(tmp) + { + fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n"); + return 1; + } + *client = malloc(sizeof(struct distrend_client)); if(!*client) { @@ -164,17 +368,87 @@ int distrend_client_new(struct distrend_ } (*client)->sock = sock; (*client)->state = state; - (*client)->inmsgs = list_init(); - (*client)->outmsgs = list_init(); + (*client)->inmsgs = q_init(); + (*client)->outmsgs = q_init(); return 0; } int distrend_client_free(struct distrend_client *client) { - list_free(client->inmsgs, (void *)LIST_DEALLOC); - list_free(client->outmsgs, (void *)LIST_DEALLOC); + q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free); + q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free); + + close(client->sock); + + return 0; +} + +void distrend_packet_free(struct distrend_packet *packet) +{ + free(packet->data); + free(packet); +} + +int distrend_makesocknonblock(int sock) +{ + int fdstatus; + int tmp; + + fdstatus = fcntl(sock, F_GETFL); + fdstatus |= O_NONBLOCK; + tmp = fcntl(sock, F_SETFL, fdstatus); + if(tmp == -1) + { + perror("fcntl"); + return 1; + } + + return 0; +} + + +int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen) +{ + struct distrend_packet *packet; - fprintf(stderr, "%s:%d: stub!", __FILE__, __LINE__); - return 1; + packet = malloc(sizeof(struct distrend_packet)); + if(!packet) + { + fprintf(stderr, "OOM\n"); + return 1; + } + + packet->len = msglen; + packet->data = malloc(msglen); + if(!packet->data) + { + free(packet); + fprintf(stderr, "OOM\n"); + return 1; + } + + memcpy(packet->data, towrite, msglen); + + q_enqueue(client->outmsgs, packet, 0); + + return 0; } + +int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread) +{ + struct distrend_packet *packet; + + *lenread = 0; + *toread = NULL; + + if(q_empty(client->inmsgs)) + return 1; + + packet = q_dequeue(client->inmsgs); + *lenread = packet->len; + *toread = packet->data; + free(packet); + + return 0; +} diff --git a/src/server/listen.h b/src/server/listen.h --- a/src/server/listen.h +++ b/src/server/listen.h @@ -26,13 +26,14 @@ struct distrend_client; #include "distrend.h" -#include +#include enum distrend_client_state { DISTREND_CLIENT_PREAUTH, DISTREND_CLIENT_GOODDOGGY, - DISTREND_CLIENT_BADBOY + DISTREND_CLIENT_BADBOY, + DISTREND_CLIENT_DEAD }; struct distrend_listen @@ -45,8 +46,8 @@ struct distrend_client { int sock; enum distrend_client_state state; - LIST *inmsgs; - LIST *outmsgs; + QUEUE *inmsgs; + QUEUE *outmsgs; }; @@ -73,6 +74,13 @@ int distrend_accept(struct distrend_conf int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients); /** + writes message to client. + @param towrite the caller is expected to free this string. This function will + strdup() it, in essence. + */ +int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen); + +/** This is probably just NOT a placeholder for remotio */ void remotio_send_to_client();