/* Copyright 2010 Nathan Phillip Brink This file is a part of DistRen. DistRen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DistRen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with DistRen. If not, see . */ #include "listen.h" #include #include #include #include #include #include #include #include #include #include #include /* local */ struct distrend_clientset { LIST *clients; int nfds; }; struct distrend_packet { size_t len; char *data; }; int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state); int distrend_client_free(struct distrend_client *client); void distrend_packet_free(struct distrend_packet *packet); int distrend_makesocknonblock(int sock); int distrend_packets_collapse(QUEUE *queue, size_t len); /** clean up after a partially completed distrend_listen() call which ends in error. */ int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter) { int sock; for(counter ++; counter > 0; counter --) { sock = config->listens[counter - 1].sock; if(sock >= 0) close(sock); } list_free(clients->clients, NULL); free(clients); return 0; } int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients) { int tmp; size_t counter; struct sockaddr_in6 sockaddr = { .sin6_family = AF_INET6, .sin6_port = 0, .sin6_flowinfo = 0, .sin6_addr = IN6ADDR_ANY_INIT, .sin6_scope_id = 0 }; *clients = malloc(sizeof(struct distrend_clientset)); (*clients)->clients = list_init(); (*clients)->nfds = 1; for(counter = 0; config->listens[counter].port; counter ++) { sockaddr.sin6_port = htons(config->listens[counter].port); config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0); tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); if(tmp == -1) { perror("bind"); distrend_listen_unwind(config, *clients, counter); return 1; } tmp = listen(config->listens->sock, 1); if(tmp == -1) { perror("listen"); distrend_listen_unwind(config, *clients, counter); return 1; } tmp = distrend_makesocknonblock(config->listens->sock); if(tmp) { /* error is already printed to user by distrend_makesocknonblock() */ distrend_listen_unwind(config, *clients, counter); return 1; } } return 0; } int distrend_handleread(struct distrend_config *config, struct distrend_client *client, distrend_handle_request_t handlereq, void *handlereqdata) { struct distrend_packet *packet; ssize_t readlen; char buf[8192]; struct distren_request *req; void *reqdata; /** we have to check that there's new data so that we may call ourself recursively */ fd_set readfd; struct timeval selecttimeout; packet = malloc(sizeof(struct distrend_packet)); if(!packet) { fprintf(stderr, "OOM\n"); return 1; } packet->len = 0; packet->data = NULL; FD_ZERO(&readfd); FD_SET(client->sock, &readfd); memset(&selecttimeout, '\0', sizeof(selecttimeout)); select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout); if(FD_ISSET(client->sock, &readfd)) { readlen = read(client->sock, buf, sizeof(buf)); if(readlen == -1) { perror("read"); switch(errno) { case EINTR: case EAGAIN: break; default: client->state = DISTREND_CLIENT_DEAD; free(packet); return 1; } } if(readlen == 0) /* handle EOF */ { client->state = DISTREND_CLIENT_DEAD; free(packet); return 1; } packet->len = readlen; packet->data = malloc(readlen); if(!packet->data) { fprintf(stderr, "OOM!\n"); free(packet); return 1; } memcpy(packet->data, buf, readlen); q_enqueue(client->inmsgs, packet, 0); client->inlen += packet->len; packet = NULL; } /* if(FD_ISSET(client->sock, &readfd)) */ /** Manage input, etc. */ if(client->expectlen == 0) { /* search out header from input so far */ distrend_packets_collapse(client->inmsgs, client->inlen); packet = q_front(client->inmsgs); if(packet->len > sizeof(struct distren_request)) { if(distren_request_new_fromdata(&req, packet->data, packet->len)) { fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n"); client->state = DISTREND_CLIENT_DEAD; return 1; } client->expectlen = req->len + sizeof(struct distren_request); distren_request_free(req); } } if(client->expectlen && client->inlen >= client->expectlen) { /** essentially un-queue-ize the queue ;-) */ distrend_packets_collapse(client->inmsgs, client->inlen); packet = q_front(client->inmsgs); if(distren_request_new_fromdata(&req, packet->data, packet->len)) { fprintf(stderr, "error handling data from client\n"); client->state = DISTREND_CLIENT_DEAD; return 1; } if(packet->len - sizeof(struct distren_request) < req->len) { fprintf(stderr, "error handling some data from client\n"); distren_request_free(req); client->state = DISTREND_CLIENT_DEAD; return 1; } reqdata = malloc(req->len); if(!reqdata) { fprintf(stderr, "OOM\n"); distren_request_free(req); return 1; } memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len); memmove(packet->data, ((void *)packet->data) + client->expectlen, packet->len - client->expectlen); packet->len -= client->expectlen; client->inlen -= client->expectlen; client->expectlen = 0; (*handlereq)(client, req, reqdata, handlereqdata); distren_request_free(req); } return 0; } struct distrend_accept_fdset_prepare_data { fd_set readfds; fd_set writefds; /** stores the highest FD number which is one less than the first arfgument to select(). So do select(maxfd + 1, ...) */ int maxfd; }; int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data, struct distrend_client *client) { if(client->state == DISTREND_CLIENT_DEAD) return TRUE; if(client->sock > data->maxfd) data->maxfd = client->sock; FD_SET(client->sock, &data->readfds); /** Only select on an outgoing socket if we have something to say. */ if(!q_empty(client->outmsgs)) FD_SET(client->sock, &data->writefds); return TRUE; } struct distrend_accept_client_proc_data { fd_set *fdset; struct distrend_config *config; distrend_handle_request_t handlereq; void *handlereqdata; enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode; }; int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data, struct distrend_client *client) { struct distrend_packet *packet; ssize_t written_amount; if(client->state == DISTREND_CLIENT_DEAD) return TRUE; if(!FD_ISSET(client->sock, data->fdset)) /** continue iteration through the list */ return TRUE; fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n", __FILE__, __LINE__, client->sock, (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ"); switch(data->mode) { case DISTREND_ACCEPT_CLIENT_WRITE: if(q_empty(client->outmsgs)) return TRUE; packet = q_front(client->outmsgs); written_amount = write(client->sock, packet->data, packet->len); /** Disconnect in case of write error. */ if(written_amount == -1) { perror("write"); /* distrend_client_free(); Until liblist has the ability to delete nodes during traversal, we'll have to free the client during a client-list cleanup/pruning cycle. */ client->state = DISTREND_CLIENT_DEAD; } if(packet->len == written_amount) { q_dequeue(client->outmsgs); distrend_packet_free(packet); } else { /** shifting seems the simplest solution. */ packet->len -= written_amount; } break; case DISTREND_ACCEPT_CLIENT_READ: distrend_handleread(data->config, client, data->handlereq, data->handlereqdata); break; } /** continue iteration through the list */ return TRUE; } int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata) { int tmp; int newclientsock; struct distrend_accept_fdset_prepare_data fdsets; struct distrend_accept_client_proc_data travinfo; struct distrend_client *newclient; FD_ZERO(&fdsets.readfds); FD_ZERO(&fdsets.writefds); FD_SET(config->listens->sock, &fdsets.readfds); fdsets.maxfd = config->listens->sock; list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR); fprintf(stderr, "select()..."); tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL); if(tmp == -1) { perror("select"); return 1; } /** Deal first with data waiting to be sent and then with input data. */ travinfo.config = config; travinfo.fdset = &fdsets.writefds; travinfo.handlereq = handlereq; travinfo.handlereqdata = handlereqdata; travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE; list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); travinfo.fdset = &fdsets.readfds; travinfo.mode = DISTREND_ACCEPT_CLIENT_READ; list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); /** Handle new connections. */ if(FD_ISSET(config->listens->sock, &fdsets.readfds)) { newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL); if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH)) { fprintf(stderr, "error allocating client struct\n"); return 1; } clients->nfds ++; list_insert_after(clients->clients, newclient, 0); fprintf(stderr, "accepted new connection\n"); distrend_client_write(newclient, "hello\n", 6); } /** Until liblist supports deleting nodes during a traversal, we clean dead clients here: */ list_mvfront(clients->clients); newclient = list_curr(clients->clients); while(newclient) { if(newclient->state == DISTREND_CLIENT_DEAD) { distrend_client_free(newclient); list_remove_curr(clients->clients); fprintf(stderr, "removed dead connection\n"); } list_mvnext(clients->clients); /* provide for termination of this loop */ if(newclient == list_curr(clients->clients)) newclient = NULL; else newclient = list_curr(clients->clients); } return 0; } int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients) { fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__); return 1; } /** This is probably just NOT a placeholder for remotio */ void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len) { fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__); } int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state) { int tmp; tmp = distrend_makesocknonblock(sock); if(tmp) { fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n"); return 1; } *client = malloc(sizeof(struct distrend_client)); if(!*client) { fprintf(stderr, "OOM\n"); return 1; } (*client)->sock = sock; (*client)->state = state; (*client)->inlen = 0; (*client)->expectlen = 0; (*client)->inmsgs = q_init(); (*client)->outmsgs = q_init(); return 0; } int distrend_client_free(struct distrend_client *client) { q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free); q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free); close(client->sock); return 0; } void distrend_packet_free(struct distrend_packet *packet) { free(packet->data); free(packet); } int distrend_makesocknonblock(int sock) { int fdstatus; int tmp; fdstatus = fcntl(sock, F_GETFL); fdstatus |= O_NONBLOCK; tmp = fcntl(sock, F_SETFL, fdstatus); if(tmp == -1) { perror("fcntl"); return 1; } return 0; } int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen) { struct distrend_packet *packet; packet = malloc(sizeof(struct distrend_packet)); if(!packet) { fprintf(stderr, "OOM\n"); return 1; } packet->len = msglen; packet->data = malloc(msglen); if(!packet->data) { free(packet); fprintf(stderr, "OOM\n"); return 1; } memcpy(packet->data, towrite, msglen); q_enqueue(client->outmsgs, packet, 0); return 0; } int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data) { char *towrite; int ret; size_t msglen; msglen = sizeof(struct distren_request) + req->len; towrite = malloc(msglen); if(!towrite) { fprintf(stderr, "OOM\n"); return 1; } memcpy(towrite, req, sizeof(struct distren_request)); memcpy(towrite + sizeof(struct distren_request), data, req->len); ret = distrend_client_write(client, towrite, msglen); free(towrite); return ret; } int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread) { struct distrend_packet *packet; *lenread = 0; *toread = NULL; if(q_empty(client->inmsgs)) return 1; packet = q_dequeue(client->inmsgs); *lenread = packet->len; *toread = packet->data; free(packet); return 0; } /** collapse a QUEUE of struct distrend_packet with a collective length of len into a single struct packet. */ int distrend_packets_collapse(QUEUE *queue, size_t len) { struct distrend_packet *newpacket; struct distrend_packet *oldpacket; size_t copiedlen; newpacket = malloc(sizeof(struct distrend_packet)); if(!newpacket) { fprintf(stderr, "OOM\n"); return 1; } newpacket->data = malloc(len); if(!newpacket->data) { fprintf(stderr, "OOM\n"); perror("malloc"); free(newpacket); return 1; } for(copiedlen = 0; copiedlen < len && !q_empty(queue); ) { oldpacket = q_dequeue(queue); memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len); copiedlen += oldpacket->len; distrend_packet_free(oldpacket); } newpacket->len = copiedlen; if(copiedlen < len || !q_empty(queue)) { fprintf(stderr, "inconsistency!\n"); return 1; } q_enqueue(queue, newpacket, 0); return 0; }