diff --git a/src/common/remoteio.c b/src/common/remoteio.c --- a/src/common/remoteio.c +++ b/src/common/remoteio.c @@ -28,6 +28,7 @@ #ifndef _WIN32 #include #endif +#include #include #include #include @@ -39,6 +40,7 @@ #include #endif #include +#include #ifndef _WIN32 #include @@ -48,6 +50,17 @@ #define REMOTEIO_DEFAULT_PORT "4050" +int _remoteio_handle_write(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem); +int _remoteio_handle_read(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem); + int _remoteio_ssh_open(struct remoteio *rem, struct remoteio_server *server); int _remoteio_ssh_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread); int _remoteio_ssh_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten); @@ -83,11 +96,14 @@ struct remoteio_method_funcmap funcmap[] {REMOTEIO_METHOD_UNIX, &_remoteio_sock_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close, "unix"}, #endif {REMOTEIO_METHOD_TCP, &_remoteio_tcp_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_tcp_close, "tcp"}, + {REMOTEIO_METHOD_SOCKET, NULL, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close}, {REMOTEIO_METHOD_MAX, NULL, NULL, NULL, NULL, NULL} }; struct remoteio_server *remoteio_getserver(const struct remoteio_opts *opts, const char *servername); +void remoteio_packet_free(struct remoteio_packet *packet); + int remoteio_config(cfg_t *cfg, struct remoteio_opts *opts) { size_t numservers; @@ -96,6 +112,19 @@ int remoteio_config(cfg_t *cfg, struct r struct remoteio_server aserver; + multiio_socket_type_register(opts->multiio, &opts->socket_type); + + multiio_event_handler_register(opts->multiio, + opts->socket_type, + POLLWRNORM, + (multiio_event_handler_func_t)&_remoteio_handle_write, + opts); + multiio_event_handler_register(opts->multiio, + opts->socket_type, + POLLRDNORM, + (multiio_event_handler_func_t)&_remoteio_handle_read, + opts); + opts->servers = list_init(); if(!opts->servers) { @@ -139,9 +168,69 @@ int remoteio_config(cfg_t *cfg, struct r return 0; } +int remoteio_generic_data_set(struct remoteio_opts *opts, void *generic_data) +{ + opts->generic_handler_data = generic_data; + return 0; +} + +int remoteio_open_common(struct remoteio **remoteio, + enum remoteio_method method, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data) +{ + struct remoteio *rem; + + rem = malloc(sizeof(struct remoteio)); + if(!rem) + { + fprintf(stderr, "OOM\n"); + return 2; + } + + *remoteio = rem; -int remoteio_open(struct remoteio **remoteio, struct remoteio_opts *opts, const char *servername) + rem->execio = NULL; + rem->method = method; + rem->opts = opts; + rem->inbuf.data = NULL; + rem->inbuf.len = 0; + rem->outmsgs = q_init(); + rem->read_handler = read_handler; + rem->read_handler_data = read_handler_data; + /* + * the following initialization is very important... though the + * others are too, I suppose :-p. See remoteio_close() + */ + rem->careful_free = 0; + + return 0; +} + +int remoteio_open_socket(struct remoteio **remoteio, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data, + int fd) +{ + struct remoteio *rem; + if(remoteio_open_common(remoteio, REMOTEIO_METHOD_SOCKET, opts, read_handler, read_handler_data)) + return 1; + rem = *remoteio; + + rem->sock = fd; + multiio_socket_add(opts->multiio, rem->sock, opts->socket_type, rem, POLLRDNORM); + + return 0; +} + +int remoteio_open_server(struct remoteio **remoteio, + struct remoteio_opts *opts, + remoteio_read_handle_func_t read_handler, + void *read_handler_data, + const char *servername) { struct remoteio_server *theserver; struct remoteio *rem; @@ -167,49 +256,261 @@ int remoteio_open(struct remoteio **remo fprintf(stderr, "%s:%d: Unsupported remoteio method %d\n\tThis is a bug, probably indicating memory corruption. This is, of course, probably my fault (not your hardware's) ;-)\n", __FILE__, __LINE__, theserver->method); return 1; } - rem = malloc(sizeof(struct remoteio)); - if(!rem) - { - fprintf(stderr, "OOM\n"); - return 2; - } - *remoteio = rem; - rem->method = theserver->method; - rem->opts = opts; + if(remoteio_open_common(remoteio, theserver->method, opts, read_handler, read_handler_data)) + return 1; + rem = *remoteio; tmp = funcmap[theserver->method].open_func(rem, theserver); if(tmp) { fprintf(stderr, "Error using method %s for server ``%s''", funcmap[theserver->method].name, servername); + free(rem->inbuf.data); + q_free(rem->outmsgs, QUEUE_NODEALLOC); free(rem); *remoteio = NULL; return tmp; } + + /** + * @todo make this code slightly more generic... able to handle + * execio's multi-sockets by letting execio register itself with + * multiio instead of us registering here perhaps + */ + multiio_socket_add(opts->multiio, rem->sock, opts->socket_type, rem, POLLRDNORM); return 0; } -int remoteio_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread) +/** + * Implementation of multiio_event_handler_func_t + */ +int _remoteio_handle_read(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem) { - return funcmap[rem->method].read_func(rem, buf, len, bytesread); + struct remoteio_packet packet; + size_t readlen; + char buf[8192]; + + int tmp; + + packet.len = 0; + packet.data = NULL; + + if(rem->sock != fd) + fprintf(stderr, "%d != %d\n", rem->sock, fd); + + tmp = funcmap[rem->method].read_func(rem, buf, sizeof(buf), &readlen); + if(tmp) + { + remoteio_close(rem); + return 1; + } + + /* expand the input buffer */ + packet.len = rem->inbuf.len + readlen; + packet.data = malloc(rem->inbuf.len + readlen); + if(!packet.data) + { + fprintf(stderr, "OOM!\n"); + + return 1; + } + if(rem->inbuf.data) + memcpy(packet.data, rem->inbuf.data, rem->inbuf.len); + memcpy(packet.data + rem->inbuf.len, buf, readlen); + free(rem->inbuf.data); + memcpy(&rem->inbuf, &packet, sizeof(struct remoteio_packet)); + + /* + * readlen wil now keeps track of how many bytes the handler + * function has read. + * + * Call the read_handler. Set careful_free, see remoteio_close(), so + * that rem->read_handler() may call remoteio_close() without + * segfaulting us ;-). + */ + rem->careful_free = 1; + readlen = (*rem->read_handler)(rem, rem->opts->generic_handler_data, rem->inbuf.data, rem->inbuf.len, rem->read_handler_data); + if(rem->careful_free == 2) + { + rem->careful_free = 0; + remoteio_close(rem); + + return 0; + } + rem->careful_free = 0; + + memmove(rem->inbuf.data, rem->inbuf.data + readlen, rem->inbuf.len - readlen); + rem->inbuf.len -= readlen; + + return 0; } -int remoteio_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten) + +int remoteio_write(struct remoteio *rem, const void *buf, size_t len) { - return funcmap[rem->method].write_func(rem, buf, len, byteswritten); + struct remoteio_packet *packet; + struct pollfd pollfd; + ssize_t bytes_written; + + /** + * This is probably about the only optimization that exists in + * distren.... :-D + * + * Write to the client immediately if there are no other messages + * waiting and if the client will accept it. + */ + if(q_empty(rem->outmsgs)) + { + pollfd.fd = rem->sock; + pollfd.revents = POLLWRNORM; + pollfd.events = 0; + poll(&pollfd, 1, 0); + if(pollfd.events & POLLWRNORM) + { + bytes_written = write(rem->sock, buf, len); + if(bytes_written > 0) + { + len -= bytes_written; + buf += bytes_written; + } + } + } + + /** + * zero length is easy... and might be possible if the above + * optimization works ;-) + */ + if(!len) + return 0; + + packet = malloc(sizeof(struct remoteio_packet)); + if(!packet) + { + fprintf(stderr, "OOM\n"); + return 1; + } + + packet->len = len; + packet->data = malloc(len); + if(!packet->data) + { + free(packet); + fprintf(stderr, "OOM\n"); + return 1; + } + + memcpy(packet->data, buf, len); + + q_enqueue(rem->outmsgs, packet, 0); + multiio_socket_event_enable(rem->opts->multiio, rem->sock, POLLWRNORM); + + return 0; } +int _remoteio_handle_write(multiio_context_t multiio, + int fd, + short revent, + struct remoteio_opts *opts, + struct remoteio *rem) +{ + struct remoteio_packet *packet; + size_t written_amount; + + int tmp; + + fprintf(stderr, "%s:%d: MySomeone else's traversal says that sock %d is available for writing\n", + __FILE__, __LINE__, fd); + + /* + * check if we're out of stuff to write. + */ + if(q_empty(rem->outmsgs)) + { + multiio_socket_event_disable(multiio, fd, POLLWRNORM); + return 0; + } + + packet = q_front(rem->outmsgs); + tmp = funcmap[rem->method].write_func(rem, packet->data, packet->len, &written_amount); + + /** + Disconnect in case of write error. + */ + if(tmp) + { + fprintf(stderr, __FILE__ ":%d: error handling for write() needs to be inserted into remoteio.... perhaps.. ;-)\n", __LINE__); + } + if(packet->len == written_amount) + { + q_dequeue(rem->outmsgs); + remoteio_packet_free(packet); + + if(q_empty(rem->outmsgs)) + multiio_socket_event_disable(multiio, fd, POLLWRNORM); + } + else + { + /** + * shifting seems the simplest solution. + */ + packet->len -= written_amount; + memmove(packet->data, packet->data + written_amount, packet->len); + } + + return 0; +} + + int remoteio_close(struct remoteio *rem) { int rtn; + /** + * See careful_free's and _remoteio_handle_read()'s docs. If + * careful_free is nonzero, then we shouldn't free it here because + * such a free would cause a segfault. However, whoever set + * rem->careful_free to nonzero will handle resetting + * rem->careful_free to zero and calling remoteio_close() if + * necessary. + */ + if(rem->careful_free) + { + rem->careful_free = 2; + return 0; + } + + /* cleanup multiiio stuff */ + multiio_socket_del(rem->opts->multiio, rem->sock); + + /* backend-specific cleanup */ rtn = funcmap[rem->method].close_func(rem); + + /* this part is normal ;-) */ + free(rem->inbuf.data); + q_free(rem->outmsgs, (list_dealloc_func_t)remoteio_packet_free); + free(rem); - + return rtn; } +/** + * Frees an entire packet, including the passed pointer. If you just + * want the contents of the packet free()ed, just do + * free(packet.data); + */ +void remoteio_packet_free(struct remoteio_packet *packet) +{ + free(packet->data); + free(packet); +} + + int _remoteio_getserver_traverse(char *servername, struct remoteio_server *aserver) { if(!strcmp(aserver->name, servername)) @@ -334,6 +635,8 @@ int _remoteio_sock_open(struct remoteio return 0; } +#endif /* _WIN32 */ + int _remoteio_sock_close(struct remoteio *rem) { close(rem->sock); @@ -341,8 +644,6 @@ int _remoteio_sock_close(struct remoteio return 0; } -#endif - int _remoteio_sock_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread) { ssize_t readrtn;