# HG changeset patch # User Nathan Phillip Brink # Date 2010-07-15 08:02:07 # Node ID ef03a563218c3530ebfb3799350f42604f72a82f # Parent b58c8263d2f6fbba35044319bdd38945e664bf45 Created, added, and partially integrated into distrend a poll() wrapper, multiio. diff --git a/Makefile.am b/Makefile.am --- a/Makefile.am +++ b/Makefile.am @@ -19,6 +19,7 @@ pkglib_LTLIBRARIES = libdistrencommon.la libdistrencommon_la_SOURCES = src/common/asprintf.c src/common/asprintf.h \ src/common/execio.c src/common/execio.h \ src/common/misc.c src/common/misc.h \ + src/common/multiio.c src/common/multiio.h \ src/common/options.c src/common/options.h \ src/common/protocol.c src/common/protocol.h \ src/common/remoteio.h \ diff --git a/src/common/libremoteio.h b/src/common/libremoteio.h --- a/src/common/libremoteio.h +++ b/src/common/libremoteio.h @@ -20,10 +20,12 @@ #ifndef _DISTREN_LIBREMOTEIO_H #define _DISTREN_LIBREMOTEIO_H -#include "remoteio.h" +#include "common/multiio.h" +#include "common/remoteio.h" #include #include +#include /** private declarations for remoteio, to be included by options.c and remoteio.c @@ -37,7 +39,9 @@ enum remoteio_method #endif REMOTEIO_METHOD_TCP = 2, /* REMOETIO_METHOD_XMLRPC */ /*< maybe someday */ - REMOTEIO_METHOD_MAX = 3 /*< This is a number used to check the consitency of remoteio_server structs */ + /** method for the remoteio_open_socket() function, where we don't call open() ourselves: */ + REMOTEIO_METHOD_SOCKET = 3, + REMOTEIO_METHOD_MAX = 4 /*< This is a number used to check the consitency of remoteio_server structs */ }; struct remoteio_server @@ -53,15 +57,58 @@ struct remoteio_opts { char *ssh_command; list_t servers; /* type: (struct remoteio_server *) */ + /* store the multiio context for general use */ + multiio_context_t multiio; + /* store remoteio's socket_type */ + multiio_socket_type_t socket_type; + + /* an argument for the remoteio_read_handle_func_t */ + void *generic_handler_data; }; -struct remoteio { +/** + * Used to describe the nature of the data stored in the + * outbound and message queues that power remoteio_write() + * and remoteio_read() (?). + */ +struct remoteio_packet +{ + size_t len; + char *data; +}; + + +struct remoteio +{ enum remoteio_method method; struct remoteio_opts *opts; struct execio *execio; #ifndef WINDOWS int sock; #endif + + remoteio_read_handle_func_t read_handler; + /* for the read_handler */ + void *read_handler_data; + + /** + * Store a buffer of data waiting to be processed. + */ + struct remoteio_packet inbuf; + + /** + * Provide the asynchronosity abstraction by queuing outgoing messages. + */ + queue_t outmsgs; + + /** + * This is disappointingly hacky. If this variable is 0, then + * remoteio_close() will act normal. If set to 1, then + * remoteio_close() will not actually free this struct but instead + * increment this variable to 2. This is so that read_handler can + * call remoteio_close() without segfaulting us. + */ + short careful_free; }; diff --git a/src/common/multiio.c b/src/common/multiio.c new file mode 100644 --- /dev/null +++ b/src/common/multiio.c @@ -0,0 +1,378 @@ +/* + Copyright 2010 Nathan Phillip Brink + + This file is a part of DistRen. + + DistRen is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DistRen is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with DistRen. If not, see . + +*/ + +#include "common/multiio.h" + +#include +#include +#include +#include +#include +#include + +struct multiio_socket_info +{ + /* the type of socket */ + multiio_socket_type_t socket_type; + /* to be passed to the socket handler */ + void *socket_data; +}; + +struct multiio_socket_type_info +{ + /* type: struct multiio_socket_type_handler_info */ + list_t type_handlers; +}; + +/* + The basic concept is that pollfds is an array. Thus, we have to + have another array reflecting that array to hold socket-specific + information. + */ +struct multiio_context +{ + /* the array passed to poll() */ + struct pollfd *pollfds; + /* the number of entries pollfds could hold */ + nfds_t pollfds_len; + /* the number of entries that pollfds does hold */ + nfds_t nfds; + + /* + an array whose order mirrors pollfds containing information + about the fd mentioned in pollfds. + */ + struct multiio_socket_info *socket_infos; + + /* the number of socket types registered (equivilent to the type ID of the next registered type) */ + multiio_socket_type_t num_socket_types; + /* the information about each individual socket type keyed by the type ID */ + struct multiio_socket_type_info *socket_types; +}; + +/* + stores information about a handler associated with a socket type/class. + */ +struct multiio_socket_type_handler_info +{ + /* the poll() event to which this handler responds */ + short event; + /* the handler function to call when the event is matched */ + multiio_event_handler_func_t handler; + /* the pointer passed to multiio_event_handler_register() */ + void *handler_data; +}; + +/** + Finds the index in the context->pollfds array of a particular socket. + + @param context the multiio context + @param fd the socket to search for + @param index a pointer which will be filled with the index of the socket if found + @return 0 if the pollfds entry is found, 1 if the entry is not found + */ +int multiio_pollfd_index_by_fd(const multiio_context_t context, int fd, size_t *index); + +multiio_context_t multiio_context_new() +{ + struct multiio_context *context; + + context = malloc(sizeof(struct multiio_context)); + if(!context) + return NULL; + + context->pollfds = malloc(sizeof(struct pollfd) * 2); + if(!context->pollfds) + { + free(context); + return NULL; + } + context->pollfds_len = 2; + context->nfds = 0; + context->socket_infos = malloc(sizeof(struct multiio_socket_info) * 2); + + context->num_socket_types = 0; + context->socket_types = NULL; + + if(!context->pollfds + || !context->socket_infos) + { + free(context->pollfds); + free(context->socket_infos); + free(context); + + return NULL; + } + + return context; +} + +int multiio_context_free(multiio_context_t context) +{ + size_t counter; + /* + clean up pollfds and socket_infos + */ + free(context->pollfds); + free(context->socket_infos); + + /* + wipe up the socket_types + */ + for(counter = 0; counter < context->num_socket_types; counter ++) + list_free(context->socket_types[counter].type_handlers, LIST_DEALLOC); + free(context->socket_types); + + /* + bye! + */ + free(context); + + return 0; +} + +struct multiio_poll_travinfo +{ + multiio_context_t multiio; + struct multiio_socket_info *socket_info; + struct pollfd *pollfd; +}; +/** + list traverser for multiio_poll() + */ +int mulitiio_poll_invoke_handlers(struct multiio_poll_travinfo *travinfo, struct multiio_socket_type_handler_info *handler_info) +{ + short event; + + event = travinfo->pollfd->events & handler_info->event; + if(!event) + return TRUE; + + handler_info->handler(travinfo->multiio, + travinfo->pollfd->fd, + event, + handler_info->handler_data, + travinfo->socket_info->socket_data); + + return TRUE; +} + +int multiio_poll(multiio_context_t context) +{ + size_t counter; + struct multiio_poll_travinfo travinfo; + + poll(context->pollfds, context->nfds, -1); + + for(counter = 0; counter < context->nfds; counter ++) + if(context->pollfds[counter].revents) + { + travinfo.multiio = context; + travinfo.socket_info = &context->socket_infos[counter]; + travinfo.pollfd = &context->pollfds[counter]; + list_traverse(context->socket_types[context->socket_infos[counter].socket_type].type_handlers, + &travinfo, + (list_traverse_func_t)&mulitiio_poll_invoke_handlers, + LIST_FRNT | LIST_SAVE); + } + + return 0; +} + +int multiio_socket_type_register(multiio_context_t context, multiio_socket_type_t *new_type) +{ + struct multiio_socket_type_info *new_socket_types; + + new_socket_types = malloc(sizeof(struct multiio_socket_type_info) * (context->num_socket_types + 1)); + if(!new_socket_types) + return 1; + + *new_type = context->num_socket_types; + new_socket_types[*new_type].type_handlers = list_init(); + if(!new_socket_types[*new_type].type_handlers) + { + free(new_socket_types); + return 2; + } + + + if(context->num_socket_types) + { + memcpy(new_socket_types, context->socket_types, sizeof(struct multiio_socket_type_info) * context->num_socket_types); + free(context->socket_types); + } + + context->socket_types = new_socket_types; + context->num_socket_types ++; + + return 0; +} + +int multiio_event_handler_register(multiio_context_t context, multiio_socket_type_t socket_type, short event, multiio_event_handler_func_t handler_func, void *handler_data) +{ + struct multiio_socket_type_handler_info handler_info; + + if(socket_type >= context->num_socket_types) + return 1; + + if(!event) + return 2; + + handler_info.event = event; + handler_info.handler = handler_func; + handler_info.handler_data = handler_data; + + list_insert_after(context->socket_types[socket_type].type_handlers, &handler_info, sizeof(struct multiio_socket_type_handler_info)); + + return 0; +} + +int multiio_socket_add(multiio_context_t context, int fd, multiio_socket_type_t socket_type, void *socket_data, short events) +{ + struct pollfd *pollfds; + struct multiio_socket_info *socket_infos; + size_t new_pollfds_len; + + int fdstatus; + int tmp; + + fdstatus = fcntl(fd, F_GETFL); + fdstatus |= O_NONBLOCK; + tmp = fcntl(fd, F_SETFL, fdstatus); + if(tmp == -1) + { + perror("fcntl"); + return 1; + } + + if(socket_type >= context->num_socket_types) + return 1; + + /** + extend sockfds array + */ + if(context->nfds >= context->pollfds_len) + { + new_pollfds_len = (context->pollfds_len + 1) * 2; + + pollfds = malloc(sizeof(struct pollfd) * new_pollfds_len); + socket_infos = malloc(sizeof(struct multiio_socket_info) * new_pollfds_len); + + if(!pollfds + || !socket_infos) + return 1; + + memcpy(pollfds, context->pollfds, sizeof(struct pollfd) * context->pollfds_len); + memcpy(socket_infos, context->socket_infos, sizeof(struct multiio_socket_info) * context->pollfds_len); + + free(context->pollfds); + free(context->socket_infos); + + context->pollfds = pollfds; + context->socket_infos = socket_infos; + context->pollfds_len = new_pollfds_len; + } + + pollfds = context->pollfds; + socket_infos = context->socket_infos; + + pollfds[context->nfds].fd = fd; + pollfds[context->nfds].events = events; + socket_infos[context->nfds].socket_type = socket_type; + socket_infos[context->nfds].socket_data = socket_data; + + context->nfds ++; + + return 0; +} + +int multiio_socket_del(multiio_context_t context, int fd) +{ + size_t index; + int tmp; + + tmp = multiio_pollfd_index_by_fd(context, fd, &index); + if(tmp) + return 1; + + /** + * Special case: one left, if we tried filling in holes... segfault ;-) + */ + if(context->nfds == 1) + { + context->nfds --; + return 0; + } + + /** + * For now we'll unconditionally fill in holes. + * In fact, it looks simple enough that this method + * may be what we always use ;-). + */ + memcpy(&context->pollfds[index], &context->pollfds[context->nfds - 1], sizeof(struct pollfd)); + memcpy(&context->socket_infos[index], &context->socket_infos[context->nfds - 1], sizeof(struct multiio_socket_info)); + context->nfds --; + + return 0; +} + +int multiio_socket_event_enable(multiio_context_t context, int fd, short event) +{ + size_t index; + int tmp; + + tmp = multiio_pollfd_index_by_fd(context, fd, &index); + if(tmp) + return 1; + + context->pollfds[index].events |= event; + + return 0; +} + +int multiio_socket_event_disable(multiio_context_t context, int fd, short event) +{ + size_t index; + int tmp; + + tmp = multiio_pollfd_index_by_fd(context, fd, &index); + if(tmp) + return 1; + + context->pollfds[index].events &= ~event; + + return 0; +} + + + +int multiio_pollfd_index_by_fd(const multiio_context_t context, int fd, size_t *index) +{ + size_t counter; + + for(counter = 0; counter < context->nfds; counter ++) + if(context->pollfds[counter].fd == fd) + { + *index = counter; + return 0; + } + + return 1; +} diff --git a/src/common/multiio.h b/src/common/multiio.h new file mode 100644 --- /dev/null +++ b/src/common/multiio.h @@ -0,0 +1,161 @@ +/* + Copyright 2010 Nathan Phillip Brink + + This file is a part of DistRen. + + DistRen is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DistRen is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with DistRen. If not, see . + +*/ + +#ifndef DISTREN_MULTIIO_H +#define DISTREN_MULTIIO_H + +/** + @file multiio provides the event-oriented interface to accessing + sockets of different sorts. Its purpose is to provide a unified + interface around poll() which distrend and distren (client) would + both be able to use. This means supporting tracking file descriptors + from sockets of different types that must be handled differently, etc. + + This interface will be as simple as possible to provide greatest + flexibility and most ease of maintainence. + */ + +struct pollfd; + +/** + A socket type or classification. + + Each socket type is associated with a set of socket event handlers. + This allows, for example, a listen()/accept() socket to be distinguised + and reacted to differently from a read()/write() socket. It would also + allow one set of read()/write() sockets to be treated differently from + another set of read()/write() sockets. + */ +typedef int multiio_socket_type_t; + +struct multiio_context; +/** + The multiio_context a program is using. + + Each process should only use one multiio_context. Different types of events + should be handled through the use of different multiio_socket_type_t values. + */ +typedef struct multiio_context *multiio_context_t; + +/** + Initializes and returns a multiio_context. + */ +multiio_context_t multiio_context_new(); + +/** + Destroys and frees a multiio_context. + + @param context the context to destroy. + */ +int multiio_context_free(multiio_context_t context); + +/** + Call poll() on the registered sockets and react to events accordingly. + + @param context the context which sockets and handlers were registered with + */ +int multiio_poll(multiio_context_t context); + +/** + Registers a new socket type/classification for use with this multiio_context. + + @param context the multiio context to register this type with. + @param new_type a pointer to a type which this function will set to the value of + the newly registered type. The caller should record this type and use it for + future multiio_event_handler_register() calls, etc. + @return 0 on success, other on error + */ +int multiio_socket_type_register(multiio_context_t context, multiio_socket_type_t *new_type); + +/** + The basic event handler prototype. + + @param multiio the multiio context which is required for many multiio API functions. + @param fd the handle for which data was recieved. + @param revent the event that this handler was configured to respond to. + @param handler_data the same pointer named handler_data that was passed to multiio_event_handler_register(). This allows handlers to store general context information (i.e., pointer to a general settings struct) + @param socket_data the same pointer named socket_data that was passed to multiio_socket_add(). This allows handlers to access socket-specific information. + */ +typedef int(*multiio_event_handler_func_t)(multiio_context_t multiio, int fd, short revent, void *handler_data, void *socket_data); + +/** + Register a socket event handler for a specified socket type/class. + + Don't forget to register handlers for POLLERR, POLLHUP, and POLLNVAL. + You can probably just register one function to handle each of these + categories and clean up the socket. (and then don't forget to call + multiio_socket_del() :-p (or to free your per-socket pointers, etc.)). + + Handlers are called in the order they're registered. + + @param context the multiio_context + @param socket_type the type of socket this handler should be registered for. + @param event the poll() event on which this handler should be called. + @param handler_func the function to be called when event is matched. + @param handler_data a pointer that will be passed to handler_func whenever this event is matched for sockets of this socket_type. + */ +int multiio_event_handler_register(multiio_context_t context, multiio_socket_type_t socket_type, short event, multiio_event_handler_func_t handler_func, void *handler_data); + +/** + Register a socket of a specified type so that it may be checked for events + and the event handlers for its socket_type may be called. + + This function will set the socket into nonblocking mode. + + @param context the multiio context. + @param fd the socket. + @param socket_type the type/classification of this socket + @param socket_data a pointer to socket-specific data to pass to the multiio_event_handler_func_t. + @param events the poll() events with which this socket should be set to listen for. Certain events may be added or removed later using multiio_socket_event_enable() and multiio_socket_event_disable(). + */ +int multiio_socket_add(multiio_context_t context, int fd, multiio_socket_type_t socket_type, void *socket_data, short events); + +/** + Unregisters a socket from a multiio context + + @param context the context + @param fd the socket to remove + */ +int multiio_socket_del(multiio_context_t context, int fd); + +/** + Enable checking for the specified event for the specified socket. + + The specified socket must have been beforehand registered using multiio_socket_add(). + + @param context the multiio context + @param fd the socket for which checking of an event should be enabled + @param event the poll() event which should be enabled for this socket. + */ +int multiio_socket_event_enable(multiio_context_t context, int fd, short event); + +/** + Disable checking for the specified event for the specified socket. + + Disabling all events for a particular socket does not remove it from the + multiio context. + + @param context the multiio context + @param fd the socket for which checking of an event should be disabled + @param event the poll() event which should be disabled on this socket. + */ +int multiio_socket_event_disable(multiio_context_t context, int fd, short event); + +#endif /* DISTREN_MULTIIO_H */ diff --git a/src/common/options.c b/src/common/options.c --- a/src/common/options.c +++ b/src/common/options.c @@ -17,10 +17,11 @@ along with DistRen. If not, see . */ -#include "options.h" +#include "common/options.h" #include "common/asprintf.h" #include "common/misc.h" +#include "common/multiio.h" #include "common/libremoteio.h" #include "common/protocol.h" @@ -40,7 +41,7 @@ struct options_common_data Not reentrant because of call to getenv() @todo replace abort()s with something useful */ -int options_init(int argc, char *argv[], cfg_t **mycfg, cfg_opt_t *myopts, char *myname, struct options_common **allopts) +int options_init(int argc, char *argv[], cfg_t **mycfg, cfg_opt_t *myopts, char *myname, struct options_common **allopts, multiio_context_t multiio) { char *configfileprefix; char *configfile; @@ -153,8 +154,9 @@ int options_init(int argc, char *argv[], } memset((*allopts)->remoteio, '\0', sizeof(struct remoteio_opts)); (*allopts)->remoteio->ssh_command = strdup("ssh"); - /** @TODO: check for NULL return above */ - + /** @TODO: check for NULL return above -- a segfault dereferencing NULL is more fun than such a check ;-) */ + (*allopts)->remoteio->multiio = multiio; + multiio_socket_type_register(multiio, &(*allopts)->remoteio->socket_type); cfg_opt_t common_opts[] = { diff --git a/src/common/options.h b/src/common/options.h --- a/src/common/options.h +++ b/src/common/options.h @@ -27,6 +27,8 @@ #include +#include "common/multiio.h" + /** incomplete: maybe remoteio.h should define its own struct which is injected into this struct and have its own handlers that parse its section of the confuse config file... (it'd be more modular)? @@ -56,8 +58,9 @@ struct options_common \param allopts will be set to a pointer that many functions need for operation \param argc the argc that was passed to main() -- these will be used to determine the location of the config file \param argv the argv that was passed to main() + \param multiio the multiio to associate with remoteio's configuration */ -int options_init(int argc, char *argv[], cfg_t **mycfg, cfg_opt_t *myopts, char *myname, struct options_common **allopts); +int options_init(int argc, char *argv[], cfg_t **mycfg, cfg_opt_t *myopts, char *myname, struct options_common **allopts, multiio_context_t multiio); /** Frees the struct options_common. This should be run before the program exits. diff --git a/src/common/protocol.c b/src/common/protocol.c --- a/src/common/protocol.c +++ b/src/common/protocol.c @@ -48,9 +48,7 @@ int distren_request_new(struct distren_r int distren_request_send(struct remoteio *rem, struct distren_request *req, void *data) { void *packet; - void *packet_ptr; size_t len; - size_t byteswritten; int write_err; if(req->magic != DISTREN_REQUEST_MAGIC) @@ -67,15 +65,8 @@ int distren_request_send(struct remoteio memcpy(packet, req, sizeof(struct distren_request)); memcpy(packet + sizeof(struct distren_request), data, req->len); - write_err = 0; - packet_ptr = packet; - while(len - && !write_err) - { - write_err = remoteio_write(rem, packet_ptr, len, &byteswritten); - len -= byteswritten; - packet_ptr += byteswritten; - } + write_err = remoteio_write(rem, packet, len); + free(packet); return 0; diff --git a/src/common/protocol.h b/src/common/protocol.h --- a/src/common/protocol.h +++ b/src/common/protocol.h @@ -100,7 +100,7 @@ struct distren_request }; /** - initializes and allocates request + * initializes and allocates request */ int distren_request_new(struct distren_request **req, uint32_t len, enum distren_request_type type); @@ -118,14 +118,22 @@ struct remoteio; int distren_request_send(struct remoteio *rem, struct distren_request *req, void *data); /** - initializes and allocates request based on raw input data - which includes the headers of the request. + * initializes and allocates request based on raw input data + * which includes the headers of the request. */ int distren_request_new_fromdata(struct distren_request **req, void *data, size_t len); /** - frees request + * frees request */ int distren_request_free(struct distren_request *req); +/** + * An implementation of remoteio_read_handle_func_t for use with remoteio. + * + * To use this handler, first initialize a struct distren_request_remoteio_data. + * (to be continued... or not...? ;-) ) + */ +/* size_t distren_request_remoteio_handle(struct remoteio *rem, void *generic_data, void *buf, size_t len, void *data); */ + #endif diff --git a/src/common/remoteio.c b/src/common/remoteio.c --- a/src/common/remoteio.c +++ b/src/common/remoteio.c @@ -28,6 +28,7 @@ #ifndef _WIN32 #include #endif +#include #include #include #include @@ -39,6 +40,7 @@ #include #endif #include +#include #ifndef _WIN32 #include @@ -48,6 +50,17 @@ #define REMOTEIO_DEFAULT_PORT "4050" +int _remoteio_handle_write(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem); +int _remoteio_handle_read(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem); + int _remoteio_ssh_open(struct remoteio *rem, struct remoteio_server *server); int _remoteio_ssh_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread); int _remoteio_ssh_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten); @@ -83,11 +96,14 @@ struct remoteio_method_funcmap funcmap[] {REMOTEIO_METHOD_UNIX, &_remoteio_sock_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close, "unix"}, #endif {REMOTEIO_METHOD_TCP, &_remoteio_tcp_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_tcp_close, "tcp"}, + {REMOTEIO_METHOD_SOCKET, NULL, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close}, {REMOTEIO_METHOD_MAX, NULL, NULL, NULL, NULL, NULL} }; struct remoteio_server *remoteio_getserver(const struct remoteio_opts *opts, const char *servername); +void remoteio_packet_free(struct remoteio_packet *packet); + int remoteio_config(cfg_t *cfg, struct remoteio_opts *opts) { size_t numservers; @@ -96,6 +112,19 @@ int remoteio_config(cfg_t *cfg, struct r struct remoteio_server aserver; + multiio_socket_type_register(opts->multiio, &opts->socket_type); + + multiio_event_handler_register(opts->multiio, + opts->socket_type, + POLLWRNORM, + (multiio_event_handler_func_t)&_remoteio_handle_write, + opts); + multiio_event_handler_register(opts->multiio, + opts->socket_type, + POLLRDNORM, + (multiio_event_handler_func_t)&_remoteio_handle_read, + opts); + opts->servers = list_init(); if(!opts->servers) { @@ -139,9 +168,69 @@ int remoteio_config(cfg_t *cfg, struct r return 0; } +int remoteio_generic_data_set(struct remoteio_opts *opts, void *generic_data) +{ + opts->generic_handler_data = generic_data; + return 0; +} + +int remoteio_open_common(struct remoteio **remoteio, + enum remoteio_method method, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data) +{ + struct remoteio *rem; + + rem = malloc(sizeof(struct remoteio)); + if(!rem) + { + fprintf(stderr, "OOM\n"); + return 2; + } + + *remoteio = rem; -int remoteio_open(struct remoteio **remoteio, struct remoteio_opts *opts, const char *servername) + rem->execio = NULL; + rem->method = method; + rem->opts = opts; + rem->inbuf.data = NULL; + rem->inbuf.len = 0; + rem->outmsgs = q_init(); + rem->read_handler = read_handler; + rem->read_handler_data = read_handler_data; + /* + * the following initialization is very important... though the + * others are too, I suppose :-p. See remoteio_close() + */ + rem->careful_free = 0; + + return 0; +} + +int remoteio_open_socket(struct remoteio **remoteio, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data, + int fd) +{ + struct remoteio *rem; + if(remoteio_open_common(remoteio, REMOTEIO_METHOD_SOCKET, opts, read_handler, read_handler_data)) + return 1; + rem = *remoteio; + + rem->sock = fd; + multiio_socket_add(opts->multiio, rem->sock, opts->socket_type, rem, POLLRDNORM); + + return 0; +} + +int remoteio_open_server(struct remoteio **remoteio, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data, + const char *servername) { struct remoteio_server *theserver; struct remoteio *rem; @@ -167,49 +256,261 @@ int remoteio_open(struct remoteio **remo fprintf(stderr, "%s:%d: Unsupported remoteio method %d\n\tThis is a bug, probably indicating memory corruption. This is, of course, probably my fault (not your hardware's) ;-)\n", __FILE__, __LINE__, theserver->method); return 1; } - rem = malloc(sizeof(struct remoteio)); - if(!rem) - { - fprintf(stderr, "OOM\n"); - return 2; - } - *remoteio = rem; - rem->method = theserver->method; - rem->opts = opts; + if(remoteio_open_common(remoteio, theserver->method, opts, read_handler, read_handler_data)) + return 1; + rem = *remoteio; tmp = funcmap[theserver->method].open_func(rem, theserver); if(tmp) { fprintf(stderr, "Error using method %s for server ``%s''", funcmap[theserver->method].name, servername); + free(rem->inbuf.data); + q_free(rem->outmsgs, QUEUE_NODEALLOC); free(rem); *remoteio = NULL; return tmp; } + + /** + * @todo make this code slightly more generic... able to handle + * execio's multi-sockets by letting execio register itself with + * multiio instead of us registering here perhaps + */ + multiio_socket_add(opts->multiio, rem->sock, opts->socket_type, rem, POLLRDNORM); return 0; } -int remoteio_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread) +/** + * Implementation of multiio_event_handler_func_t + */ +int _remoteio_handle_read(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem) { - return funcmap[rem->method].read_func(rem, buf, len, bytesread); + struct remoteio_packet packet; + size_t readlen; + char buf[8192]; + + int tmp; + + packet.len = 0; + packet.data = NULL; + + if(rem->sock != fd) + fprintf(stderr, "%d != %d\n", rem->sock, fd); + + tmp = funcmap[rem->method].read_func(rem, buf, sizeof(buf), &readlen); + if(tmp) + { + remoteio_close(rem); + return 1; + } + + /* expand the input buffer */ + packet.len = rem->inbuf.len + readlen; + packet.data = malloc(rem->inbuf.len + readlen); + if(!packet.data) + { + fprintf(stderr, "OOM!\n"); + + return 1; + } + if(rem->inbuf.data) + memcpy(packet.data, rem->inbuf.data, rem->inbuf.len); + memcpy(packet.data + rem->inbuf.len, buf, readlen); + free(rem->inbuf.data); + memcpy(&rem->inbuf, &packet, sizeof(struct remoteio_packet)); + + /* + * readlen wil now keeps track of how many bytes the handler + * function has read. + * + * Call the read_handler. Set careful_free, see remoteio_close(), so + * that rem->read_handler() may call remoteio_close() without + * segfaulting us ;-). + */ + rem->careful_free = 1; + readlen = (*rem->read_handler)(rem, rem->opts->generic_handler_data, rem->inbuf.data, rem->inbuf.len, rem->read_handler_data); + if(rem->careful_free == 2) + { + rem->careful_free = 0; + remoteio_close(rem); + + return 0; + } + rem->careful_free = 0; + + memmove(rem->inbuf.data, rem->inbuf.data + readlen, rem->inbuf.len - readlen); + rem->inbuf.len -= readlen; + + return 0; } -int remoteio_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten) + +int remoteio_write(struct remoteio *rem, const void *buf, size_t len) { - return funcmap[rem->method].write_func(rem, buf, len, byteswritten); + struct remoteio_packet *packet; + struct pollfd pollfd; + ssize_t bytes_written; + + /** + * This is probably about the only optimization that exists in + * distren.... :-D + * + * Write to the client immediately if there are no other messages + * waiting and if the client will accept it. + */ + if(q_empty(rem->outmsgs)) + { + pollfd.fd = rem->sock; + pollfd.revents = POLLWRNORM; + pollfd.events = 0; + poll(&pollfd, 1, 0); + if(pollfd.events & POLLWRNORM) + { + bytes_written = write(rem->sock, buf, len); + if(bytes_written > 0) + { + len -= bytes_written; + buf += bytes_written; + } + } + } + + /** + * zero length is easy... and might be possible if the above + * optimization works ;-) + */ + if(!len) + return 0; + + packet = malloc(sizeof(struct remoteio_packet)); + if(!packet) + { + fprintf(stderr, "OOM\n"); + return 1; + } + + packet->len = len; + packet->data = malloc(len); + if(!packet->data) + { + free(packet); + fprintf(stderr, "OOM\n"); + return 1; + } + + memcpy(packet->data, buf, len); + + q_enqueue(rem->outmsgs, packet, 0); + multiio_socket_event_enable(rem->opts->multiio, rem->sock, POLLWRNORM); + + return 0; } +int _remoteio_handle_write(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem) +{ + struct remoteio_packet *packet; + size_t written_amount; + + int tmp; + + fprintf(stderr, "%s:%d: MySomeone else's traversal says that sock %d is available for writing\n", + __FILE__, __LINE__, fd); + + /* + * check if we're out of stuff to write. + */ + if(q_empty(rem->outmsgs)) + { + multiio_socket_event_disable(multiio, fd, POLLWRNORM); + return 0; + } + + packet = q_front(rem->outmsgs); + tmp = funcmap[rem->method].write_func(rem, packet->data, packet->len, &written_amount); + + /** + Disconnect in case of write error. + */ + if(tmp) + { + fprintf(stderr, __FILE__ ":%d: error handling for write() needs to be inserted into remoteio.... perhaps.. ;-)\n", __LINE__); + } + if(packet->len == written_amount) + { + q_dequeue(rem->outmsgs); + remoteio_packet_free(packet); + + if(q_empty(rem->outmsgs)) + multiio_socket_event_disable(multiio, fd, POLLWRNORM); + } + else + { + /** + * shifting seems the simplest solution. + */ + packet->len -= written_amount; + memmove(packet->data, packet->data + written_amount, packet->len); + } + + return 0; +} + + int remoteio_close(struct remoteio *rem) { int rtn; + /** + * See careful_free's and _remoteio_handle_read()'s docs. If + * careful_free is nonzero, then we shouldn't free it here because + * such a free would cause a segfault. However, whoever set + * rem->careful_free to nonzero will handle resetting + * rem->careful_free to zero and calling remoteio_close() if + * necessary. + */ + if(rem->careful_free) + { + rem->careful_free = 2; + return 0; + } + + /* cleanup multiiio stuff */ + multiio_socket_del(rem->opts->multiio, rem->sock); + + /* backend-specific cleanup */ rtn = funcmap[rem->method].close_func(rem); + + /* this part is normal ;-) */ + free(rem->inbuf.data); + q_free(rem->outmsgs, (list_dealloc_func_t)remoteio_packet_free); + free(rem); - + return rtn; } +/** + * Frees an entire packet, including the passed pointer. If you just + * want the contents of the packet free()ed, just do + * free(packet.data); + */ +void remoteio_packet_free(struct remoteio_packet *packet) +{ + free(packet->data); + free(packet); +} + + int _remoteio_getserver_traverse(char *servername, struct remoteio_server *aserver) { if(!strcmp(aserver->name, servername)) @@ -334,6 +635,8 @@ int _remoteio_sock_open(struct remoteio return 0; } +#endif /* _WIN32 */ + int _remoteio_sock_close(struct remoteio *rem) { close(rem->sock); @@ -341,8 +644,6 @@ int _remoteio_sock_close(struct remoteio return 0; } -#endif - int _remoteio_sock_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread) { ssize_t readrtn; diff --git a/src/common/remoteio.h b/src/common/remoteio.h --- a/src/common/remoteio.h +++ b/src/common/remoteio.h @@ -22,32 +22,95 @@ #include -/* - RemoteIO provides an abstraction to the method of talking to a remote distrend. It is a layer on top of execio that should provide an equivalent interface. +/** + * @file RemoteIO provides an abstraction to the method of talking to a remote distrend. It is a layer on top of execio that should provide an equivalent interface. + * + * RemoteIO works on top of multiio, so for it to work you write your + * program in an event-oriented fashion and call multiio_poll(). */ struct remoteio_opts; struct remoteio; /** - Opens connection with remote distrend. Returns 1 on error. + * asynchronous read handler signature. + * + * This is the signature of the callback that is called when data is + * available for reading. The handler may process as much of the + * available data as it wants. When it has processed a chunk of data, + * it must return the length of the data that it processed. If the + * handler returns 0, the handler is essentially signalling ``I can't + * figure out what this means until I see more''. + * + * If you need to close a socket after reading certain data from it, + * you may call remoteio_close() from inside of this function. + * + * @param rem the associated remoteio handle + * @param generic_data a pointer that is stored in remoteio's struct remoteio_opts which isn't client-specific + * @param buf a pointer to the buffer containing data waiting to be processed + * @param len the size of buf that may be accessed + * @param data the pointer passed to remoteio_open. _NOT_ the data just received on the socket. + * @return the number of bytes that the function accepted and thus should be removed from the rem handle. + */ +typedef size_t(*remoteio_read_handle_func_t)(struct remoteio *rem, void *generic_data, void *buf, size_t len, void *data); - @todo should this be asynchronous? +/** + * Determines the value of generic_data which is passed to + * remoteio_read_handle_func_t + * + * @param opts the remoteio runtime options */ -int remoteio_open(struct remoteio **rem, struct remoteio_opts *opts, const char *servername); +int remoteio_generic_data_set(struct remoteio_opts *opts, void *generic_data); /** - non-blocking I/O. - @param len must be greater than 0 + Opens connection with to a remote distrend. Returns 1 on error. - @return 0 on success, 1 on failure. + @todo should this be asynchronous? YES! but optionally, perhaps + @param opts the configuration settings for remoteio gotten from options_init(). + @param read_handler the function to call when data has been read from the server. + @param read_handler_data the data to pass to the read_handler function. + @param servername the name of the configuration file entry for the server. + From this, information about how to make the outgoing connection is derived. */ -int remoteio_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread); -int remoteio_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten); +int remoteio_open_server(struct remoteio **rem, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data, + const char *servername); /** - Closes a remoteio session. - @return nonzero on error + * Initializes a remoteio instance for a socket that's already been floating + * around for a while. I.e., this socket probably came from accept(). + * + * @param rem a pointer to where the poiner to the newly allocated struct remoteio should be stored. + * @param opts remoteio's options + * @param read_handler the function to call when data has been read from the server. + * @param read_handler_data the data to pass to the read_handler function. + * @param opts self explanatory. + */ +int remoteio_open_socket(struct remoteio **rem, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data, + int fd); + +/** + * Queue bytes to be written to the remote host. + * + * @param rem the remoteio handle + * @param buf a buffer to be queued for writing. We will copy this, so the caller has to handle its memory (and free() it if necessary). + * @param len number of bytes to grab from buf + * @return 0 on success, 1 on failure + */ +int remoteio_write(struct remoteio *rem, const void *buf, size_t len); + +/** + * Closes a remoteio session. + * + * It is safe to call this function from within + * remoteio_read_handle_func_t. + * + * @return nonzero on error */ int remoteio_close(struct remoteio *rem); diff --git a/src/server/distrend.c b/src/server/distrend.c --- a/src/server/distrend.c +++ b/src/server/distrend.c @@ -81,7 +81,7 @@ struct general_info /* ************General Functions************* */ int distrend_do(); -int distrend_do_config(int argc, char *argv[], struct distrend_config **config); +int distrend_do_config(int argc, char *argv[], struct distrend_config **config, multiio_context_t multiio); int distrend_config_free(struct distrend_config *config); int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo); @@ -97,7 +97,7 @@ int import_general_info(struct general_i int update_xml_joblist(struct general_info *geninfo); /* **************Test Functions**************** */ -int interactiveTest(int test, struct general_info *general_info); +int interactiveTest(int test, multiio_context_t multiio, struct general_info *general_info); /* **************** Main ********************* */ int main(int argc, char *argv[]) @@ -107,6 +107,7 @@ int main(int argc, char *argv[]) int test = 0; /*< Interactive mode if 1 */ int tmp; struct general_info general_info; + multiio_context_t multiio; enum clientstatus { @@ -134,7 +135,9 @@ int main(int argc, char *argv[]) } - if(distrend_do_config(argc, argv, &general_info.config)) + multiio = multiio_context_new(); + + if(distrend_do_config(argc, argv, &general_info.config, multiio)) return 1; /** preset paths */ @@ -151,20 +154,21 @@ int main(int argc, char *argv[]) { fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__); fprintf(stderr, "don't test mysql stuff\n"); - interactiveTest(test, &general_info); + interactiveTest(test, multiio, &general_info); return 1; } fprintf(stderr,"Finished connecting!\n"); /** Execute test function */ - interactiveTest(test, &general_info); + interactiveTest(test, multiio, &general_info); - general_info.config->listens = distrend_listens_new(&general_info); + general_info.config->listens = distrend_listens_new(multiio, &general_info, general_info.config->options); if(!general_info.config->listens) { fprintf(stderr, "error initializing listens\n"); return 1; } + remoteio_generic_data_set(general_info.config->options->remoteio, general_info.config->listens); for(counter = 0; general_info.config->listen_ports[counter]; counter ++) { tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]); @@ -185,7 +189,8 @@ int main(int argc, char *argv[]) { int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */ - distrend_accept(general_info.config->listens); + multiio_poll(multiio); + /* distrend_accept(general_info.config->listens); */ /* Run the watchdog, @TODO: like every 10 mins or something */ frame_watchdog(general_info.conn); @@ -236,7 +241,7 @@ int distrend_handle_version(struct gener if(client->state != DISTREND_CLIENT_PREVERSION) { - distrend_send_disconnect(geninfo->config->listens, client, "You have already sent the VERSION command."); + distrend_send_disconnect(client, "You have already sent the VERSION command."); } if(strlen(PACKAGE_STRING) == req_len && !strncmp(PACKAGE_STRING, req_data, req_len)) @@ -251,7 +256,7 @@ int distrend_handle_version(struct gener client->state = DISTREND_CLIENT_PREAUTH; distren_request_new(&newreq, strlen(PACKAGE_STRING), DISTREN_REQUEST_VERSION); - distrend_client_write_request(geninfo->config->listens, client, newreq, PACKAGE_STRING); + distrend_client_write_request(client, newreq, PACKAGE_STRING); distren_request_free(newreq); } else @@ -266,7 +271,7 @@ int distrend_handle_version(struct gener fixedbuf[req_len] = '\0'; _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, fixedbuf); - distrend_send_disconnect(geninfo->config->listens, client, tmp_str); + distrend_send_disconnect(client, tmp_str); } return 0; @@ -277,14 +282,14 @@ int distrend_handle_ping(struct general_ struct distren_request *pong_req; if(req_len > 32) - distrend_send_disconnect(geninfo->config->listens, client, "You have tried to send a PING packet with a length longer than 32 bytes."); + distrend_send_disconnect(client, "You have tried to send a PING packet with a length longer than 32 bytes."); /** respond to the client using the data he sent in his PONG command. */ distren_request_new(&pong_req, req_len, DISTREN_REQUEST_PONG); - distrend_client_write_request(geninfo->config->listens, client, pong_req, req_data); + distrend_client_write_request(client, pong_req, req_data); distren_request_free(pong_req); return 0; @@ -299,7 +304,7 @@ int distrend_do() } /* Grabs config info from confs */ -int distrend_do_config(int argc, char *argv[], struct distrend_config **config) +int distrend_do_config(int argc, char *argv[], struct distrend_config **config, multiio_context_t multiio) { unsigned int counter; @@ -334,7 +339,7 @@ int distrend_do_config(int argc, char *a myopts[5].simple_value = &(*config)->mysql_pass; myopts[6].simple_value = &(*config)->mysql_database; - if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options)) + if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options, multiio)) return 1; /** @@ -509,7 +514,7 @@ int update_xml_joblist(struct general_in /** Interactive test for the queuing system */ /* @QUEUE: Test uses methods not present in C code using mysql web-based system */ -int interactiveTest(int test, struct general_info *geninfo) +int interactiveTest(int test, multiio_context_t multiio, struct general_info *geninfo) { int command; int32_t slaveKey = 1; @@ -593,14 +598,7 @@ int interactiveTest(int test, struct gen case 5: while(1) { - distrend_accept(geninfo->config->listens); - /* - code for reading data from clients either goes here or in distrend_accept(). - it might make sense for us to just pass the distrend_accept() function a - callback which can handle packets or to have a generic packet handling - subsystem which gathers data into groups defined by by packet.h and then - passed onto the callback. - */ + multiio_poll(multiio); } break; diff --git a/src/server/listen.c b/src/server/listen.c --- a/src/server/listen.c +++ b/src/server/listen.c @@ -19,9 +19,9 @@ #include "listen.h" #include "common/protocol.h" +#include "common/remoteio.h" #include -#include #include #include #include @@ -34,30 +34,46 @@ /* local */ -struct distrend_packet -{ - size_t len; - char *data; -}; - struct distrend_request_handler_info { enum distren_request_type request_type; distrend_handle_request_func_t handler; }; -int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state); +struct distrend_client *distrend_client_new(struct distrend_listens *listens, + enum distrend_client_state state, + struct remoteio *rem); int distrend_client_free(struct distrend_client *client); -void distrend_packet_free(struct distrend_packet *packet); -int distrend_makesocknonblock(int sock); -int distrend_packets_collapse(QUEUE *queue, size_t len); -int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata); +int distrend_dispatch_request(struct distrend_listens *listens, struct remoteio *rem, struct distrend_client *client, struct distren_request *req, void *reqdata); struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset); +size_t distrend_listen_read_handle(struct remoteio *rem, struct distrend_listens *listens, void *buf, size_t len, struct distrend_client *client); -int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events); -int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock); +int listen_handle_accept(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + int *port); +int listen_handle_error(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + int *port); -struct distrend_listens *distrend_listens_new(struct general_info *geninfo) +/*** TO BE MOVED TO REMOTEIO */ +int listen_handle_read(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + struct distrend_client *client); +int listen_handle_existence(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + struct distrend_client *client); + +struct distrend_listens *distrend_listens_new(multiio_context_t multiio, + struct general_info *geninfo, + struct options_common *opts) { struct distrend_listens *listens; @@ -65,54 +81,41 @@ struct distrend_listens *distrend_listen if(!listens) return NULL; - listens->listen_socks = list_init(); listens->request_handlers = list_init(); - listens->clients = list_init(); - listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */ - listens->pollfds_len = 2; - listens->nfds = 0; - listens->geninfo = geninfo; - - if(!listens->listen_socks - || !listens->request_handlers - || !listens->clients - || !listens->pollfds) + if(!listens->request_handlers) { - if(listens->listen_socks) - list_free(listens->listen_socks, LIST_NODEALLOC); - if(listens->request_handlers) - list_free(listens->request_handlers, LIST_NODEALLOC); - if(listens->clients) - list_free(listens->clients, LIST_NODEALLOC); - free(listens->pollfds); free(listens); return NULL; } - return listens; -} + listens->options = opts; + listens->geninfo = geninfo; + + /* multiio */ + listens->multiio = multiio; + + /* This type is used for accepting connections with accept() */ + multiio_socket_type_register(multiio, &listens->socket_type); -/** - clean up after a partially completed distrend_listen() - call which ends in error. - */ -int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo) -{ - int sock; + multiio_event_handler_register(multiio, + listens->socket_type, + POLLERR | POLLHUP | POLLNVAL, + (multiio_event_handler_func_t)&listen_handle_error, + listens); + multiio_event_handler_register(multiio, + listens->socket_type, + POLLRDNORM, + (multiio_event_handler_func_t)&listen_handle_accept, + listens); - sock = sockinfo->sock.sock; - if(sock >= 0) - close(sock); - - free(sockinfo); - - return 0; + return listens; } int distrend_listen_add(struct distrend_listens *listens, int port) { int tmp; - struct distrend_listen_sock *newsock; + int fd; + int *saved_port; struct sockaddr_in6 sockaddr = { @@ -125,119 +128,238 @@ int distrend_listen_add(struct distrend_ sockaddr.sin6_port = htons(port); - newsock = malloc(sizeof(struct distrend_listen_sock)); - if(!newsock) - return 1; - - newsock->port = port; - newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0); - tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); + fd = socket(AF_INET6, SOCK_STREAM, 0); + tmp = bind(fd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)); if(tmp == -1) { perror("bind"); - distrend_listen_add_unwind(newsock); + close(fd); return 1; } - tmp = listen(newsock->sock.sock, 1); + tmp = listen(fd, 1); if(tmp == -1) { perror("listen"); - distrend_listen_add_unwind(newsock); + close(fd); return 1; } - tmp = distrend_makesocknonblock(newsock->sock.sock); + saved_port = malloc(sizeof(int)); + if(!saved_port) + { + perror("malloc"); + close(fd); + + return 1; + } + *saved_port = port; + + tmp = multiio_socket_add(listens->multiio, fd, listens->socket_type, saved_port, POLLRDNORM); if(tmp) { - /* error is already printed to user by distrend_makesocknonblock() */ - distrend_listen_add_unwind(newsock); + close(fd); + return 1; } - distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM); - list_insert_after(listens->listen_socks, newsock, 0); + /** + * @todo perhaps we'll someday want the ability to unlisten()? :-p + * Then we have to store the fd somewheres so that we can call + * multiio_socket_del() on it. So far, however, that functionality + * isn't needed. + */ + + return 0; +} + +int listen_handle_error(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + int *port) +{ + fprintf(stderr, "Port %d experienced an error or is closed. Closing it.\n", *port); + multiio_socket_del(listens->multiio, fd); + close(fd); + + free(port); return 0; } -int distrend_handleread(struct distrend_listens *listens, - struct distrend_client *client) -{ - struct distrend_packet *packet; - ssize_t readlen; - char buf[8192]; - struct distren_request *req; - void *reqdata; - - packet = malloc(sizeof(struct distrend_packet)); - if(!packet) - { - fprintf(stderr, "OOM\n"); - return 1; - } - packet->len = 0; - packet->data = NULL; - - readlen = read(client->sock.sock, buf, sizeof(buf)); - if(readlen == -1) - { - perror("read"); - switch(errno) - { - case EINTR: - case EAGAIN: - break; +//int listen_handle_read(struct distrend_listens *listens, +// struct distrend_client *client) - default: - client->state = DISTREND_CLIENT_DEAD; - - free(packet); - return 1; - } - } - if(readlen == 0) - /* handle EOF */ - { - client->state = DISTREND_CLIENT_DEAD; - - free(packet); - return 1; - } - - packet->len = readlen; - packet->data = malloc(readlen); - if(!packet->data) - { - fprintf(stderr, "OOM!\n"); - - free(packet); - return 1; - } - memcpy(packet->data, buf, readlen); - q_enqueue(client->inmsgs, packet, 0); - client->inlen += packet->len; - packet = NULL; +/** + * an important thing to handle + * + * @deprecated to be replaced with table_tennis + */ +int listen_handle_existence(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + struct distrend_client *client) +{ + /** + * handle dead/closed sockets here! + */ + fprintf(stderr, __FILE__ ":%d: handle dead/closed sockets here!\n", __LINE__); /** - Manage input, etc. + * Manage timing-out clients. */ - if(client->expectlen == 0) - { - /* search out header from input so far */ - distrend_packets_collapse(client->inmsgs, client->inlen); - packet = q_front(client->inmsgs); + if(time(NULL) > client->cleanup_time) + switch(client->state) + { + case DISTREND_CLIENT_PREVERSION: + distrend_send_disconnect(client, "You have failed to present version information in a timely manner. Cya :-p"); + break; + case DISTREND_CLIENT_PREAUTH: + distrend_send_disconnect(client, "You have failed to present authentication information in a timely manner. Cya ;-)"); + break; + + case DISTREND_CLIENT_GOOD: + /* + * pings should be managed with two queues sharing this struct: + * struct pingpong_queue { struct distrend_client *client; time_t ping_time }; + * + * - queue to_ping: contains queue of clients to send pings to + * with the client whose ping time is earliest at the front + * + * - queue to_be_ponged: contains a queue of clients to clean up + * if they haven't recieved pongs. client with earliest cleanup + * time is at the front. If a PONG packet is received in time, the + * cleanup_time is bumped but the queue is left alone. When the + * queue's element is encountered, cleanup_time is checked and then + * the client is readded to the to_ping queue. + * + * Each queue shall be eaten as time passes and continue forever + * in circularity. + * + * data structures. + * fun. + * impossible to explain in text. + * easy to think up. + * impossible to determine the workings of existing ones. + * and... when you need them, screenshot utilities just aren't available :-/. + */ + distrend_send_disconnect(client, "Ping timeout :-p"); + break; + + case DISTREND_CLIENT_BAD: + fprintf(stderr, __FILE__ ":%d: aaarrrrgh!\n :-D\n", __LINE__); + break; + + default: + break; + } + return 0; +} + +struct distrend_accept_client_proc_data +{ + struct distrend_listens *listens; + time_t current_time; +}; + + +/** + * Handle new connections. + */ +int listen_handle_accept(multiio_context_t multiio, + int fd, + short revent, + struct distrend_listens *listens, + int *port) + //int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata) + { + struct distrend_client *newclient; + + int newclientsock; + struct remoteio *rem; + + newclientsock = accept(fd, (struct sockaddr *)NULL, (socklen_t *)NULL); + /* used to call int distrend_client_add(struct distrend_listens *listens, int sock, DISTREND_CLIENT_PREVERSION)*/ - if(packet->len > sizeof(struct distren_request)) - { - if(distren_request_new_fromdata(&req, packet->data, packet->len)) - { - fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n"); + newclient = distrend_client_new(listens, DISTREND_CLIENT_PREVERSION, NULL); + + if(remoteio_open_socket(&rem, listens->options->remoteio, (remoteio_read_handle_func_t)&distrend_listen_read_handle, newclient, newclientsock)) + { + fprintf(stderr, "error allocating/adding client struct\n"); + return 1; + } + newclient->rem = rem; + + fprintf(stderr, __FILE__ ":%d: This client SHOULD be registered in the table tennis system here\n", __LINE__); + fprintf(stderr, "accepted new connection; fd=%d\n", newclientsock); + + /** + * For those using netcat/telnet to debug their internets. + */ + #ifndef PACKAGE_URL + #define PACKAGE_URL "http://ohnopub.net/distren/" + #endif + #define DISTREN_GREETING PACKAGE_STRING " " PACKAGE_URL " : Nathan Phillip Brink && Ethan Michael Zonca\n" + /* using sizeof() - 1 because the sizeof() includes a NULL byte we want to ignore. */ + remoteio_write(newclient->rem, DISTREN_GREETING, sizeof(DISTREN_GREETING) - 1); - client->state = DISTREND_CLIENT_DEAD; + /* + list_mvfront(listens->clients); + newclient = list_curr(listens->clients); + while(newclient) + { + if(newclient->state == DISTREND_CLIENT_DEAD) + { + distrend_listen_poll_deletefd(listens, &newclient->sock); + distrend_client_free(newclient); + list_remove_curr(listens->clients); + fprintf(stderr, "removed dead connection\n"); + } + list_mvnext(listens->clients); + *//* provide for termination of this loop *//* + if(newclient == list_curr(listens->clients)) + newclient = NULL; + else + newclient = list_curr(listens->clients); + } + */ + return 0; + } + + /** + * Handle read events from remoteio, remoteio_read_handle_func_t. + * + * This func requires that someone called remoteio_generic_data_set(remoteio_opts, listens); + * + * @param client the client associated with this remoteio instance. + */ +size_t distrend_listen_read_handle(struct remoteio *rem, struct distrend_listens *listens, void *buf, size_t len, struct distrend_client *client) + { + struct distren_request *req; + void *reqdata; + + /** + * Manage input, etc. + */ + if(client->expectlen == 0) + { + /* search out header from input so far */ + if(len > sizeof(struct distren_request)) + { + if(distren_request_new_fromdata(&req, buf, len)) + { + fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n"); + + /* + * yes, this is safe and legal... because of hackishness + * in remoteio_close() ;-) + */ + remoteio_close(rem); return 1; } client->expectlen = req->len + sizeof(struct distren_request); @@ -245,24 +367,30 @@ int distrend_handleread(struct distrend_ } } if(client->expectlen - && client->inlen >= client->expectlen) + && len >= client->expectlen) { - /** essentially un-queue-ize the queue ;-) */ - distrend_packets_collapse(client->inmsgs, client->inlen); - packet = q_front(client->inmsgs); + if(distren_request_new_fromdata(&req, buf, len)) + { + if(client->state == DISTREND_CLIENT_PREAUTH) + remoteio_close(rem); + else + distrend_send_disconnect(client, "Protocol error."); - if(distren_request_new_fromdata(&req, packet->data, packet->len)) - { - fprintf(stderr, "error handling data from client\n"); - client->state = DISTREND_CLIENT_DEAD; return 1; } - if(packet->len - sizeof(struct distren_request) < req->len) + /* + * this really shouldn't happen... reparsing the same data with + * distren_request_new_fromdata() a second time shouldn't yeild + * a different req->len than it did before. + */ + if(len - sizeof(struct distren_request) < req->len) { - fprintf(stderr, "error handling some data from client\n"); + fprintf(stderr, "Unexpected error handling some data from client\n"); distren_request_free(req); - client->state = DISTREND_CLIENT_DEAD; + + /* but we should pay homage to W3C if the impossible happens */ + distrend_send_disconnect(client, "HTTP/1.1 503 Internal Server Error"); return 1; } @@ -274,193 +402,16 @@ int distrend_handleread(struct distrend_ distren_request_free(req); return 1; } - memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len); - memmove(packet->data, - ((void *)packet->data) + client->expectlen, - packet->len - client->expectlen); - packet->len -= client->expectlen; + memcpy(reqdata, ((void *)buf) + sizeof(struct distren_request), req->len); - client->inlen -= client->expectlen; client->expectlen = 0; - distrend_dispatch_request(listens, client, req, reqdata); + distrend_dispatch_request(listens, rem, client, req, reqdata); free(reqdata); distren_request_free(req); - } - return 0; -} -struct distrend_accept_client_proc_data -{ - struct distrend_listens *listens; - time_t current_time; -}; -int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data, - struct distrend_client *client) -{ - struct distrend_packet *packet; - ssize_t written_amount; - short revents; - - if(client->state == DISTREND_CLIENT_DEAD) - return TRUE; - - /** - Manage timing-out clients. - */ - if(data->current_time > client->cleanup_time) - switch(client->state) - { - case DISTREND_CLIENT_PREVERSION: - distrend_send_disconnect(data->listens, client, "You have failed to present version information in a timely manner. Cya :-p"); - break; - case DISTREND_CLIENT_PREAUTH: - distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)"); - break; - - case DISTREND_CLIENT_GOOD: - distrend_send_disconnect(data->listens, client, "Ping timeout :-p"); - break; - - case DISTREND_CLIENT_BAD: - client->state = DISTREND_CLIENT_DEAD; - return TRUE; - - default: - break; - } - - revents = data->listens->pollfds[client->sock.pollfd_offset].revents; - if(revents & POLLRDNORM) - /* handle reading */ - { - fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n", - __FILE__, __LINE__, client->sock.sock); - distrend_handleread(data->listens, client); - } - if(revents & POLLWRNORM) - { - fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n", - __FILE__, __LINE__, client->sock.sock); - - if(q_empty(client->outmsgs)) - { - data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM; - return TRUE; - } - - packet = q_front(client->outmsgs); - written_amount = write(client->sock.sock, packet->data, packet->len); - /** - Disconnect in case of write error. - */ - if(written_amount == -1) - { - perror("write"); - /* - distrend_client_free(); - - Until liblist has the ability to delete nodes - during traversal, we'll have to free the client - during a client-list cleanup/pruning cycle. - */ - client->state = DISTREND_CLIENT_DEAD; - } - if(packet->len == written_amount) - { - q_dequeue(client->outmsgs); - distrend_packet_free(packet); - - if(q_empty(client->outmsgs)) - data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM; - } - else - { - /** - shifting seems the simplest solution. - */ - packet->len -= written_amount; - memmove(packet->data, packet->data + written_amount, packet->len); - } - } - - /** continue iteration through the list */ - return TRUE; -} - -int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock) -{ - short revents; - int newclientsock; - - revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents; - if(revents & POLLRDNORM) - { - newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL); - if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREVERSION)) - { - fprintf(stderr, "error allocating/adding client struct\n"); - return 1; - } - fprintf(stderr, "accepted new connection\n"); - } - - /** - Check other listen blocks too. - */ - return TRUE; -} - -int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata) -{ - int tmp; - - struct distrend_accept_client_proc_data travinfo; - struct distrend_client *newclient; - - fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds); - poll(listens->pollfds, listens->nfds, -1); - if(tmp == -1) - { - perror("select"); - - return 1; - } - - /** - Deal first with data waiting to be sent and then - with input data. - */ - travinfo.listens = listens; - travinfo.current_time = time(NULL); /*< cache the time */ - - list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE); - - /** - Handle new connections. - */ - list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE); - /** - Until liblist supports deleting nodes during a traversal, - we clean dead clients here: - */ - list_mvfront(listens->clients); - newclient = list_curr(listens->clients); - while(newclient) - { - if(newclient->state == DISTREND_CLIENT_DEAD) - { - distrend_listen_poll_deletefd(listens, &newclient->sock); - distrend_client_free(newclient); - list_remove_curr(listens->clients); - fprintf(stderr, "removed dead connection\n"); - } - list_mvnext(listens->clients); - /* provide for termination of this loop */ - if(newclient == list_curr(listens->clients)) - newclient = NULL; - else - newclient = list_curr(listens->clients); + /* I actually just used recursion in non-LISP code! :-D */ + return req->len + distrend_listen_read_handle(rem, listens, buf + req->len, len - req->len, client); } return 0; @@ -481,121 +432,46 @@ void remotio_send_to_client(struct distr } /** - Adds a newly connected distrend_client to the listens->clients list + * Allocates and initializes a struct distrend_client. + * + * */ -int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state) +struct distrend_client *distrend_client_new(struct distrend_listens *listens, enum distrend_client_state state, struct remoteio *rem) { - int tmp; struct distrend_client *client; - tmp = distrend_makesocknonblock(sock); - if(tmp) - { - fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n"); - close(sock); - return 1; - } - client = malloc(sizeof(struct distrend_client)); if(!client) { fprintf(stderr, "OOM\n"); - close(sock); - return 1; + + return NULL; } - client->sock.sock = sock; - distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */ - client->state = state; client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME; client->inlen = 0; client->expectlen = 0; - client->inmsgs = q_init(); - client->outmsgs = q_init(); - - list_insert_after(listens->clients, client, 0); + client->rem = rem; - /** - For those using netcat/telnet to debug their internets. - */ -#ifndef PACKAGE_URL -#define PACKAGE_URL "http://ohnopub.net/distren/" -#endif -#define DISTREN_GREETING PACKAGE_STRING " " PACKAGE_URL " : Nathan Phillip Brink && Ethan Michael Zonca\n" - /* using sizeof() - 1 because the sizeof() includes a NULL byte we want to ignore. */ - distrend_client_write(listens, client, DISTREN_GREETING, sizeof(DISTREN_GREETING) - 1); - - return 0; + return client; } /** This function shall only be called after the appropriate distrend_listen_poll_deletefd() has been called + + @deprecated blah + @todo kill this func? */ int distrend_client_free(struct distrend_client *client) { - q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free); - q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free); - - close(client->sock.sock); + fprintf(stderr, " Kaaaaaaa bOOOOOOOOMNMMMM!\n"); + abort(); return 0; } -void distrend_packet_free(struct distrend_packet *packet) -{ - free(packet->data); - free(packet); -} - -int distrend_makesocknonblock(int sock) -{ - int fdstatus; - int tmp; - - fdstatus = fcntl(sock, F_GETFL); - fdstatus |= O_NONBLOCK; - tmp = fcntl(sock, F_SETFL, fdstatus); - if(tmp == -1) - { - perror("fcntl"); - return 1; - } - - return 0; -} - - -int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen) -{ - struct distrend_packet *packet; - - packet = malloc(sizeof(struct distrend_packet)); - if(!packet) - { - fprintf(stderr, "OOM\n"); - return 1; - } - - packet->len = msglen; - packet->data = malloc(msglen); - if(!packet->data) - { - free(packet); - fprintf(stderr, "OOM\n"); - return 1; - } - - memcpy(packet->data, towrite, msglen); - - q_enqueue(client->outmsgs, packet, 0); - - listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM; - - return 0; -} - -int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data) +int distrend_client_write_request(struct distrend_client *client, const struct distren_request *req, const void *data) { char *towrite; int ret; @@ -613,12 +489,13 @@ int distrend_client_write_request(struct memcpy(towrite, req, sizeof(struct distren_request)); memcpy(towrite + sizeof(struct distren_request), data, req->len); - ret = distrend_client_write(listens, client, towrite, msglen); + ret = remoteio_write(client->rem, towrite, msglen); free(towrite); return ret; } +/* int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread) { struct distrend_packet *packet; @@ -636,66 +513,15 @@ int distrend_client_read(struct distrend return 0; } - -/** - collapse a QUEUE of struct distrend_packet with a collective length - of len into a single struct packet. - */ -int distrend_packets_collapse(QUEUE *queue, size_t len) -{ - struct distrend_packet *newpacket; - struct distrend_packet *oldpacket; - - size_t copiedlen; - - newpacket = malloc(sizeof(struct distrend_packet)); - if(!newpacket) - { - fprintf(stderr, "OOM\n"); - return 1; - } - - newpacket->data = malloc(len); - if(!newpacket->data) - { - fprintf(stderr, "OOM\n"); - perror("malloc"); - free(newpacket); - - return 1; - } - - for(copiedlen = 0; - copiedlen < len && !q_empty(queue); - ) - { - oldpacket = q_dequeue(queue); - - memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len); - - copiedlen += oldpacket->len; - distrend_packet_free(oldpacket); - } - newpacket->len = copiedlen; - - if(copiedlen < len || !q_empty(queue)) - { - fprintf(stderr, "inconsistency!\n"); - return 1; - } - - q_enqueue(queue, newpacket, 0); - - return 0; -} +*/ -int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg) +int distrend_send_disconnect(struct distrend_client *client, const char *quit_msg) { struct distren_request *req; distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT); - distrend_client_write_request(listens, client, req, quit_msg); + distrend_client_write_request(client, req, quit_msg); distren_request_free(req); client->state = DISTREND_CLIENT_BAD; @@ -739,11 +565,11 @@ int _distrend_dispatch_request_trav(stru } /** - helper for distrend_handleread() which looks up the correct + helper for listen_handle_read() which looks up the correct request handler and handles handing the the request to the handler. :-p */ -int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata) +int distrend_dispatch_request(struct distrend_listens *listens, struct remoteio *rem, struct distrend_client *client, struct distren_request *req, void *reqdata) { struct distrend_dispatch_request_data data; @@ -757,144 +583,3 @@ int distrend_dispatch_request(struct dis return 0; } -/** - Helper for the distrend_listen_add() and functions that - call accept() to maintain the struct pollfd list in listens. - - @param listens The info related to listening to sockets. - @param fd the FD to add to the list of sockets. - @param poll_events The poll() events to register in the list of sockets. - @param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances. - */ -int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events) -{ - size_t new_len; - struct pollfd *new_pollfds; - - if(listens->nfds + 1 > listens->pollfds_len) - { - new_len = listens->pollfds_len * 2; - new_pollfds = malloc(sizeof(struct pollfd) * new_len); - if(!new_pollfds) - return 1; - - memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len); - - free(listens->pollfds); - listens->pollfds_len = new_len; - listens->pollfds = new_pollfds; - } - - sock->pollfd_offset = listens->nfds; - listens->pollfds[sock->pollfd_offset].fd = sock->sock; - listens->pollfds[sock->pollfd_offset].events = poll_events; - listens->nfds ++; - - fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events); - - return 0; -} - - -/** - Removes a particular struct pollfd from the listens->pollfds list. This - will likely also require remapping at least one other existing pollfd - entry, so this will also traverse the list of clients and update the - entry_offset of the client which has to be remapped. - - @param listens the struct distrend_listens - @param sock the relevent struct containing the offset of the pollfd that must be removed. - @return 0 on success, 1 on error. - */ -int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock) -{ - /* - the socket which is being moved from the end of the list - to the middle to fill in the hole left by removing sock - */ - struct distrend_polled_sock *displaced_sock; - - /* - special case: we're deleting the last element and thus - don't need to remap an existing entry. - */ - if(sock->pollfd_offset == listens->nfds - 1) - { - listens->nfds --; - return 0; - } - - /* - Shift whatever's at the end of the pollfds array into the position - of the pollfd being removed. Then update that client's struct distrend_client - entry to point to the new offset. - */ - memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd)); - displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1); - if(!displaced_sock) - { - fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n", - (int)listens->nfds - 1, (int)listens->nfds); - listens->nfds --; - return 1; - } - - displaced_sock->pollfd_offset = sock->pollfd_offset; - - listens->nfds --; - return 0; -} - -/** - For _distrend_polled_sock_get_by_offset_traverse() - */ -struct distrend_polled_sock_get_by_offset_data -{ - struct distrend_polled_sock *sock; - size_t pollfd_offset; -}; - -/** - List traversal helper function for distrend_polled_sock_get_by_offset(). - - @param sock The second argument is casting either sturct distrend_client or - struct distrend_listen_sock to a struct distrend_polled_sock which works - because each of the two former structs as a sturct distrend_polled_sock - as its first member. - */ -int _distrend_polled_sock_get_by_offset_traverse(struct distrend_polled_sock_get_by_offset_data *data, struct distrend_polled_sock *sock) -{ - if(data->pollfd_offset == sock->pollfd_offset) - { - data->sock = sock; - return FALSE; - } - return TRUE; -} - -/** - Returns a struct distrend_client based on its offset in the listens->pollfds - array. - - @param listens the listens. - @param pollfds_offset the index in the listens->pollfds array of the client we should find - @return NULL if the client is not found or a pointer to the struct distrend_client for the specified client. Generally, you would not free() this but delete the client by first removing it from the listen->client list. - */ -struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset) -{ - struct distrend_polled_sock_get_by_offset_data data; - - data.sock = NULL; - data.pollfd_offset = pollfds_offset; - - /** - These traversals both depend on struct distrend_polled_sock being - the first entry of struct distrend_listens and struct distrend_client. - */ - list_traverse(listens->clients, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE); - if(data.sock == NULL) - list_traverse(listens->listen_socks, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE); - - return data.sock; -} - diff --git a/src/server/listen.h b/src/server/listen.h --- a/src/server/listen.h +++ b/src/server/listen.h @@ -17,6 +17,14 @@ along with DistRen. If not, see . */ +/** + @file listen provides the ability to set up a listening socket + through multiio's poll() interface. This means that if the listen()-ish + libc calls we use are incompatible with windows, we don't have to + throw ifdefs all throughout src/common. We just put that code in the + only place that needs it: in src/server. + */ + struct general_info; struct distrend_listens; struct distrend_client; @@ -25,9 +33,12 @@ struct distrend_client; #define _DISTREN_LISTEN_H #include "distrend.h" -#include "common/protocol.h" -#include +#include "common/options.h" +#include "common/multiio.h" +#include "common/protocol.h" +#include "common/remoteio.h" + #include #include @@ -67,50 +78,27 @@ enum distrend_client_state disconnect message/error before being dumped). */ DISTREND_CLIENT_BAD, - /** - The socket used to communicate with the client is closed. Its entry - in the client list should be removed on the next garbage clean-up round. - */ - DISTREND_CLIENT_DEAD }; -/** - inheritence in C !?!? - */ -struct distrend_polled_sock -{ - int sock; - /* this socket's offset in the listens->pollfds array. */ - size_t pollfd_offset; -}; - -struct distrend_listen_sock -{ - /** - sock must be first, see distrend_polled_socket_get_by_offset() - */ - struct distrend_polled_sock sock; - int port; -}; - struct distrend_listens { - /* of type (struct distrend_listen_sock *) */ - list_t listen_socks; /* of type (struct distrend_request_handler_info) */ list_t request_handlers; - /* the data to pass onto all request handlers */ + /* the data to pass on to all request handlers */ struct general_info *geninfo; + /* the distrend config */ + struct options_common *options; - /* of type (struct distrend_client) */ - list_t clients; + /* of type (struct distrend_client) (multiio stores a pointer per socket, we'll store each strut distrend_client as that pointer instead of using this list) */ + /* list_t clients; */ - /* the array passed to poll() */ - struct pollfd *pollfds; - /* the number of entries pollfds could hold */ - nfds_t pollfds_len; - /* the number of entries that pollfds does hold */ - nfds_t nfds; + /* the multiio context the listening interface should use/initialize */ + multiio_context_t multiio; + /* + * the socket type reserved for us, i.e., the listen()/accept() + * socket type whose events we handle. + */ + multiio_socket_type_t socket_type; }; @@ -120,11 +108,6 @@ struct distrend_listens */ struct distrend_client { - /** - sock must be first, see distrend_polled_socket_get_by_offset() - */ - struct distrend_polled_sock sock; - enum distrend_client_state state; /** @@ -137,9 +120,13 @@ struct distrend_client size_t inlen; /*< number of bytes waiting to be processed */ size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */ - QUEUE *inmsgs; - QUEUE *outmsgs; + /** + * Yes, even though the remoteio will have a void *pointer to this + * struct, we need a reverse pointer for something like + * distrend_client_write() to work. + */ + struct remoteio *rem; }; @@ -157,10 +144,11 @@ typedef int(*distrend_handle_request_fun /** Initializes the listens member of struct distrend_config. + @param multiio the multiio context in which we should register a new socket type and insert records for clients who connect. @param geninfo general info to apss to the request handler. @return Must be free()d with distrend_listen_free(); */ -struct distrend_listens *distrend_listens_new(struct general_info *geninfo); +struct distrend_listens *distrend_listens_new(multiio_context_t multiio, struct general_info *geninfo, struct options_common *opts); /** Adds a socket and configures it to listen(). @@ -171,44 +159,26 @@ struct distrend_listens *distrend_listen int distrend_listen_add(struct distrend_listens *listens, int port); /** - Register a request handler with the listener. - - @param config distrend's configuration - @param type the request type for which this handler should be called - @param handler the handler to call when a request of type type is received. + * Register a request handler with the listener. + * + * @param config distrend's configuration + * @param type the request type for which this handler should be called + * @param handler the handler to call when a request of type type is received. */ int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler); /** - checks states of the sockets I'm managing. If there are any new connections, - or activity on any sockets, I'll call the appropriate function. - I will block until there is some sort of activity, including - signals. If you want to cleanly shut down, it's best to register - signal handlers somewhere - - @param listens the config->listens after being initialized with distrend_listen() - */ -int distrend_accept(struct distrend_listens *listens); - -/** - cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub. + * cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub. */ int distrend_listen_free(struct distrend_listens *listens); /** - writes message to client. - @param towrite the caller is expected to free this string. This function will - strdup() it, in essence. - */ -int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen); - -/** writes request to client. @param client client to write to @param req the request struct. caller must free this. @param data the data of the request which is req->len bytes long. caller must free this. */ -int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data); +int distrend_client_write_request(struct distrend_client *client, const struct distren_request *req, const void *data); /** This is probably just NOT a placeholder for remotio @@ -216,10 +186,10 @@ int distrend_client_write_request(struct void remotio_send_to_client(); /** - Queue a DISTREN_REQUEST_DISCONNECT and prepare a client - to be disconnected. + * Queue a DISTREN_REQUEST_DISCONNECT and prepare a client + * to be disconnected. */ -int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg); +int distrend_send_disconnect(struct distrend_client *client, const char *quit_msg); #endif diff --git a/src/server/simpleslave.c b/src/server/simpleslave.c --- a/src/server/simpleslave.c +++ b/src/server/simpleslave.c @@ -40,6 +40,8 @@ int main(int argc, char *argv[]) char *password; char *hostname; + multiio_context_t multiio; + cfg_opt_t myopts[] = { CFG_SIMPLE_STR("username", &username), CFG_SIMPLE_STR("password", &password), @@ -64,6 +66,7 @@ int main(int argc, char *argv[]) char curopt; int runBenchmark = 0; int updateConf = 0; + multiio = multiio_context_new(); while(((char)-1) != (curopt = getopt(argc, argv, "u:p:rh"))) { @@ -109,7 +112,7 @@ if(updateConf){ return 0; } /* Get conf data */ - options_init(argc, argv, &my_cfg, myopts, "slave", &commonopts); + options_init(argc, argv, &my_cfg, myopts, "slave", &commonopts, multiio); if(!datadir) { diff --git a/src/server/slave.c b/src/server/slave.c --- a/src/server/slave.c +++ b/src/server/slave.c @@ -21,6 +21,7 @@ #include "slavefuncs.h" #include "common/asprintf.h" +#include "common/multiio.h" #include "common/options.h" #include "common/protocol.h" #include "common/remoteio.h" @@ -36,6 +37,7 @@ int main(int argc, char *argv[]) { + multiio_context_t multiio; char *datadir; char *server; @@ -71,6 +73,8 @@ int main(int argc, char *argv[]) char curopt; int runBenchmark = 0; + multiio = multiio_context_new(); + while(((char)-1) != (curopt = getopt(argc, argv, "u:rh"))) { if(curopt == ':') @@ -104,7 +108,7 @@ int main(int argc, char *argv[]) } /* Get conf data */ - options_init(argc, argv, &my_cfg, myopts, "slave", &commonopts); + options_init(argc, argv, &my_cfg, myopts, "slave", &commonopts, multiio); if(!datadir) { @@ -142,7 +146,7 @@ int main(int argc, char *argv[]) } fprintf(stderr, "Connecting to server...\n"); - if(remoteio_open(&comm_slave, commonopts->remoteio, server)) + if(remoteio_open_server(&comm_slave, commonopts->remoteio, NULL, NULL, server)) { fprintf(stderr, "Error connecting to server; exiting\n"); return 1; diff --git a/src/server/slavefuncs.c b/src/server/slavefuncs.c --- a/src/server/slavefuncs.c +++ b/src/server/slavefuncs.c @@ -836,25 +836,15 @@ int login_user(char *username) } /** - Sends the server a single request (see protocol.h) - ohnobinki: This should hopefully work, maybe ;D + * Sends the server a single request (see protocol.h) + * ohnobinki: This should hopefully work, maybe ;D + * + * @deprecated THIS FUNCTION SHOULD DIE VERY, VERY SOON! + * (and painfully :-p) */ int sendSignal(struct remoteio *rem, char signal) { - size_t written; - size_t towrite; - char *ssignal; - - _distren_asprintf(&ssignal, "%c", signal); - towrite = strlen(ssignal); - while( towrite - && !remoteio_write(rem, ssignal, towrite, &written)) - { - fprintf(stderr, "Sending request...\n"); - towrite -= written; - } - if(written) - return 0; + remoteio_write(rem, &signal, 1); /** if remoteio_write returned 1, the connection @@ -864,33 +854,24 @@ int sendSignal(struct remoteio *rem, cha } /** - Sends the server an extended signal (request + data) - ohnobinki: I have no clue how you really want to handle this. Please clarify/edit -*/ + * Sends the server an extended signal (request + data) + * ohnobinki: I have no clue how you really want to handle this. Please clarify/edit + * normaldotcom: I see more and more how clueless you are, I hope to get to his soon ;-) + * + * @deprecated lol + */ int sendExtSignal(struct remoteio *rem, char signal, char *data) { - size_t written; - size_t towrite; + size_t len; char *ssignal; /** - Just append the data FIXME: We should do this differently - */ + * Just append the data FIXME: We should do this differently + */ _distren_asprintf(&ssignal, "%c%s", signal, data); - towrite = strlen(ssignal); - while( towrite - && !remoteio_write(rem, ssignal, towrite, &written)) - { - fprintf(stderr, "Sending request...\n"); - towrite -= written; - } - if(written) - return 0; + len = strlen(ssignal); + remoteio_write(rem, ssignal, len); - /** - if remoteio_write returned 1, the connection - is probably dead or there was a real error - */ - return 1; + return 0; }