/* Copyright 2010 Nathan Phillip Brink This file is a part of DistRen. DistRen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DistRen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with DistRen. If not, see . */ #include "listen.h" #include #include #include #include #include #include #include #include #include #include #include /* local */ struct distrend_clientset { LIST *clients; int nfds; }; struct distrend_packet { size_t len; char *data; }; int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state); int distrend_client_free(struct distrend_client *client); void distrend_packet_free(struct distrend_packet *packet); int distrend_makesocknonblock(int sock); int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients) { int tmp; struct sockaddr_in6 sockaddr = { .sin6_family = AF_INET6, .sin6_port = 0, .sin6_flowinfo = 0, .sin6_addr = IN6ADDR_ANY_INIT, .sin6_scope_id = 0 }; *clients = malloc(sizeof(struct distrend_clientset)); (*clients)->clients = list_init(); (*clients)->nfds = 1; sockaddr.sin6_port = htons(4050); config->listens->sock = socket(AF_INET6, SOCK_STREAM, 0); tmp = bind(config->listens->sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); if(tmp == -1) { perror("bind"); free(*clients); return 1; } tmp = listen(config->listens->sock, 1); if(tmp == -1) { perror("listen"); free(*clients); return 1; } tmp = distrend_makesocknonblock(config->listens->sock); if(tmp) { free(*clients); return 1; } return 0; } int distrend_handleread(struct distrend_config *config, struct distrend_client *client) { struct distrend_packet *packet; ssize_t readlen; char buf[8192]; packet = malloc(sizeof(struct distrend_packet)); if(!packet) { fprintf(stderr, "OOM\n"); return 1; } packet->len = 0; packet->data = NULL; readlen = read(client->sock, buf, sizeof(buf)); if(readlen == -1) { perror("read"); switch(errno) { case EINTR: case EAGAIN: break; default: client->state = DISTREND_CLIENT_DEAD; free(packet); return 1; } } if(readlen == 0) /* handle EOF */ { client->state = DISTREND_CLIENT_DEAD; free(packet); return 1; } packet->len = readlen; packet->data = malloc(readlen); if(!packet->data) { fprintf(stderr, "OOM!\n"); free(packet); return 1; } memcpy(packet->data, buf, readlen); return 0; } struct distrend_accept_fdset_prepare_data { fd_set readfds; fd_set writefds; /** stores the highest FD number which is one less than the first arfgument to select(). So do select(maxfd + 1, ...) */ int maxfd; }; int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data, struct distrend_client *client) { if(client->state == DISTREND_CLIENT_DEAD) return TRUE; if(client->sock > data->maxfd) data->maxfd = client->sock; FD_SET(client->sock, &data->readfds); /** Only select on an outgoing socket if we have something to say. */ if(!q_empty(client->outmsgs)) FD_SET(client->sock, &data->writefds); return TRUE; } struct distrend_accept_client_proc_data { fd_set *fdset; struct distrend_config *config; enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode; }; int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data, struct distrend_client *client) { struct distrend_packet *packet; ssize_t written_amount; if(client->state == DISTREND_CLIENT_DEAD) return TRUE; if(!FD_ISSET(client->sock, data->fdset)) /** continue iteration through the list */ return TRUE; fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n", __FILE__, __LINE__, client->sock, (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "write"); switch(data->mode) { case DISTREND_ACCEPT_CLIENT_WRITE: if(q_empty(client->outmsgs)) return TRUE; packet = q_front(client->outmsgs); written_amount = write(client->sock, packet->data, packet->len); /** Disconnect in case of write error. */ if(written_amount == -1) { perror("write"); /* distrend_client_free(); Until liblist has the ability to delete nodes during traversal, we'll have to free the client during a client-list cleanup/pruning cycle. */ client->state = DISTREND_CLIENT_DEAD; } if(packet->len == written_amount) { q_dequeue(client->outmsgs); distrend_packet_free(packet); } else { /** shifting seems the simplest solution. */ packet->len -= written_amount; } break; case DISTREND_ACCEPT_CLIENT_READ: distrend_handleread(data->config, client); break; } /** continue iteration through the list */ return TRUE; } int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients) { int tmp; int newclientsock; struct distrend_accept_fdset_prepare_data fdsets; struct distrend_accept_client_proc_data travinfo; struct distrend_client *newclient; FD_ZERO(&fdsets.readfds); FD_ZERO(&fdsets.writefds); FD_SET(config->listens->sock, &fdsets.readfds); fdsets.maxfd = config->listens->sock; list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR); fprintf(stderr, "select()..."); tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL); if(tmp == -1) { perror("select"); return 1; } /** Deal first with data waiting to be sent and then with input data. */ travinfo.config = config; travinfo.fdset = &fdsets.writefds; travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE; list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); travinfo.fdset = &fdsets.readfds; travinfo.mode = DISTREND_ACCEPT_CLIENT_READ; list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR); /** Handle new connections. */ if(FD_ISSET(config->listens->sock, &fdsets.readfds)) { newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL); if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH)) { fprintf(stderr, "error allocating client struct\n"); return 1; } clients->nfds ++; list_insert_after(clients->clients, newclient, 0); fprintf(stderr, "accepted new connection\n"); distrend_client_write(newclient, "hello\n", 6); } /** Until liblist supports deleting nodes during a traversal, we clean dead clients here: */ list_mvfront(clients->clients); newclient = list_curr(clients->clients); while(newclient) { if(newclient->state == DISTREND_CLIENT_DEAD) { distrend_client_free(newclient); list_remove_curr(clients->clients); fprintf(stderr, "removed dead connection\n"); } list_mvnext(clients->clients); /* provide for termination of this loop */ if(newclient == list_curr(clients->clients)) newclient = NULL; else newclient = list_curr(clients->clients); } return 0; } int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients) { fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__); return 1; } /** This is probably just NOT a placeholder for remotio */ void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len) { fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__); } int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state) { int tmp; tmp = distrend_makesocknonblock(sock); if(tmp) { fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n"); return 1; } *client = malloc(sizeof(struct distrend_client)); if(!*client) { fprintf(stderr, "OOM\n"); return 1; } (*client)->sock = sock; (*client)->state = state; (*client)->inmsgs = q_init(); (*client)->outmsgs = q_init(); return 0; } int distrend_client_free(struct distrend_client *client) { q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free); q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free); close(client->sock); return 0; } void distrend_packet_free(struct distrend_packet *packet) { free(packet->data); free(packet); } int distrend_makesocknonblock(int sock) { int fdstatus; int tmp; fdstatus = fcntl(sock, F_GETFL); fdstatus |= O_NONBLOCK; tmp = fcntl(sock, F_SETFL, fdstatus); if(tmp == -1) { perror("fcntl"); return 1; } return 0; } int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen) { struct distrend_packet *packet; packet = malloc(sizeof(struct distrend_packet)); if(!packet) { fprintf(stderr, "OOM\n"); return 1; } packet->len = msglen; packet->data = malloc(msglen); if(!packet->data) { free(packet); fprintf(stderr, "OOM\n"); return 1; } memcpy(packet->data, towrite, msglen); q_enqueue(client->outmsgs, packet, 0); return 0; } int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread) { struct distrend_packet *packet; *lenread = 0; *toread = NULL; if(q_empty(client->inmsgs)) return 1; packet = q_dequeue(client->inmsgs); *lenread = packet->len; *toread = packet->data; free(packet); return 0; }