Changeset - 1c0bf5189204
[Not reviewed]
default
0 2 0
Nathan Brink (binki) - 15 years ago 2010-07-02 16:29:50
ohnobinki@ohnopublishing.net
Minor coments and cleanup.
2 files changed with 7 insertions and 2 deletions:
0 comments (0 inline, 0 general)
src/common/remoteio.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
#include "libremoteio.h"
 
#include "execio.h"
 
#include "asprintf.h"
 

	
 
#include <list.h>
 

	
 
#include <errno.h>
 
#ifndef _WIN32
 
#include <netdb.h>
 
#endif
 
#include <stdlib.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#ifdef _WIN32
 
#include <winsock2.h>
 
#include <ws2tcpip.h>
 
#else
 
#include <sys/socket.h>
 
#endif
 
#include <unistd.h>
 

	
 
#ifndef _WIN32
 
#include <sys/un.h>
 
#endif
 

	
 
/* local */
 

	
 
#define REMOTEIO_DEFAULT_PORT "4050"
 

	
 
int _remoteio_ssh_open(struct remoteio *rem, struct remoteio_server *server);
 
int _remoteio_ssh_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread);
 
int _remoteio_ssh_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten);
 
int _remoteio_ssh_close(struct remoteio *rem);
 

	
 
/**
 
   ``named pipes''
 
 */
 
#ifndef _WIN32
 
int _remoteio_sock_open(struct remoteio *rem, struct remoteio_server *server);
 
int _remoteio_sock_close(struct remoteio *rem);
 
#endif
 
int _remoteio_sock_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread);
 
int _remoteio_sock_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten);
 

	
 
/**
 
   These borrow from _remoteio_sock_read() and _remoteio_sock_write().
 
 */
 
int _remoteio_tcp_open(struct remoteio *rem, struct remoteio_server *server);
 
int _remoteio_tcp_close(struct remoteio *rem);
 

	
 
/**
 
  lookup table for different methods of remoteio:
 
  the enum remoteio_method is the index of the entry to use for that method. 
 
  Regardless, a NULL terminator is required because the configuration function
 
  searches through this table for the method specified in the config file.
 
*/
 
struct remoteio_method_funcmap funcmap[] = 
 
  {
 
    /* [REMOTEIO_METHOD_SSH] */
 
    {REMOTEIO_METHOD_SSH, &_remoteio_ssh_open, &_remoteio_ssh_read, &_remoteio_ssh_write, &_remoteio_ssh_close, "ssh"},
 
#ifndef _WIN32
 
    {REMOTEIO_METHOD_UNIX, &_remoteio_sock_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close, "unix"},
 
#endif
 
    {REMOTEIO_METHOD_TCP, &_remoteio_tcp_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_tcp_close, "tcp"},
 
    {REMOTEIO_METHOD_MAX, NULL, NULL, NULL, NULL, NULL}
 
  };
 

	
 
struct remoteio_server *remoteio_getserver(const struct remoteio_opts *opts, const char *servername);
 

	
 
int remoteio_config(cfg_t *cfg, struct remoteio_opts *opts)
 
{
 
  size_t numservers;
 
  size_t counter, counter2;
 
  static int haslisted_methods = 0;
 
  
 
  struct remoteio_server aserver;
 

	
 
  opts->servers = list_init();
 
  if(!opts->servers)
 
    {
 
      fprintf(stderr, "@todo cleanup!\n");
 
      abort();
 
    }
 
  
 
  numservers = cfg_size(cfg, "server");
 
  for(counter = 0; counter < numservers; counter ++)
 
    {
 
      cfg_t *cfg_aserver;
 
      char *method;
 
      
 
      cfg_aserver = cfg_getnsec(cfg, "server", counter);
 
      
 
      aserver.name = strdup(cfg_title(cfg_aserver));
 
      aserver.hostname = strdup(cfg_getstr(cfg_aserver, "hostname"));
 
      aserver.username = strdup(cfg_getstr(cfg_aserver, "username"));
 

	
 
      aserver.method = REMOTEIO_METHOD_MAX;
 
      method = cfg_getstr(cfg_aserver, "method");
 
      for(counter2 = 0; funcmap[counter2].name; counter2 ++)
 
	if(strcmp(method, funcmap[counter2].name) == 0)
 
	  aserver.method = funcmap[counter2].method;
 
      if(aserver.method == REMOTEIO_METHOD_MAX)
 
	{
 
	  fprintf(stderr, "No such method as %s\n", method);
 
	  if(!haslisted_methods)
 
	    {
 
	      fprintf(stderr, "Available methods:\n");
 
	      for(counter2 = 0; funcmap[counter2].name; counter2 ++)
 
		fprintf(stderr, "\t%s\n", funcmap[counter2].name);
 
	      
 
	      haslisted_methods ++;
 
	    }
 
	  abort();
 
	}
 
      list_insert_after(opts->servers, &aserver, sizeof(struct remoteio_server));
 
    }
 
  
 
  return 0;
 
}
 

	
 

	
 

	
 
int remoteio_open(struct remoteio **remoteio, struct remoteio_opts *opts, const char *servername)
 
{
 
  struct remoteio_server *theserver;
 
  struct remoteio *rem;
 

	
 
  int tmp;
 

	
 
  if(!opts)
 
    {
 
      fprintf(stderr, "%s:%d: no null opts!\n\tThis is a bug, please report it (after making sure it isn't already reported)\n", __FILE__, __LINE__);
 
      return 1;
 
    }
 

	
 
  theserver = remoteio_getserver(opts, servername);
 
  if(!theserver)
 
    {
 
      fprintf(stderr, "%s:%d: Could not find server named ``%s''\n", __FILE__, __LINE__, servername);
 
      return 1;
 
    }
 

	
 
  if(theserver->method >= REMOTEIO_METHOD_MAX
 
     || theserver->method < 0)
 
    {
 
      fprintf(stderr, "%s:%d: Unsupported remoteio method %d\n\tThis is a bug, probably indicating memory corruption. This is, of course, probably my fault (not your hardware's) ;-)\n", __FILE__, __LINE__, theserver->method);
 
      return 1;
 
    }
 
  rem = malloc(sizeof(struct remoteio));
 
  if(!rem)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 2;
 
    }
 
  *remoteio = rem;
 

	
 
  rem->method = theserver->method;
 
  rem->opts = opts;
 

	
 
  tmp = funcmap[theserver->method].open_func(rem, theserver);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Error using method %s for server ``%s''", funcmap[theserver->method].name, servername);
 
      free(rem);
 
      *remoteio = NULL;
 
      return tmp;
 
    }
 
  
 
  return 0;
 
}
 

	
 
int remoteio_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread)
 
{
 
  return funcmap[rem->method].read_func(rem, buf, len, bytesread);
 
}
 

	
 
int remoteio_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten)
 
{
 
  return funcmap[rem->method].write_func(rem, buf, len, byteswritten);
 
}
 

	
 
int remoteio_close(struct remoteio *rem)
 
{
 
  int rtn;
 
  
 
  rtn = funcmap[rem->method].close_func(rem);
 
  free(rem);
 
  
 
  return rtn;
 
}
 

	
 
int _remoteio_getserver_traverse(char *servername, struct remoteio_server *aserver)
 
{
 
  if(!strcmp(aserver->name, servername))
 
    return FALSE; /* stop traversal */
 

	
 
  return TRUE;
 
}
 

	
 
struct remoteio_server *remoteio_getserver(const struct remoteio_opts *opts, const char *servername)
 
{
 
  int traversal_result;
 
  char *dispensible_servername;
 

	
 
  dispensible_servername = strdup(servername); /* for the sake of constness... */
 
  traversal_result = list_traverse(opts->servers, dispensible_servername, (list_traverse_func_t)&_remoteio_getserver_traverse, LIST_FRNT | LIST_ALTR);
 
  free(dispensible_servername);
 

	
 
  if(traversal_result == LIST_OK)
 
    return (struct remoteio_server *)list_curr(opts->servers);
 

	
 
  return (struct remoteio_server *)NULL;
 
}
 

	
 

	
 

	
 

	
 
/**
 
   different remoteio methods' implementations:
 
 */
 

	
 
/*
 
  SSH, via execio
 
*/
 

	
 
int _remoteio_ssh_open(struct remoteio *rem, struct remoteio_server *server)
 
{
 
  char *userhost;
 
  char *sshargs[] = {rem->opts->ssh_command, NULL /* userhost */, "distrend", "-d", (char *)NULL};
 

	
 
  int rtn;
 

	
 
  if(server->username)
 
    _distren_asprintf(&userhost, "%s@%s", server->username, server->hostname);
 
  else
 
    userhost = strdup(server->hostname);
 
  sshargs[1] = userhost;
 

	
 
  rtn = execio_open( &rem->execio, "ssh", sshargs);
 
  if(rtn)
 
    {
 
      fprintf(stderr, "error opening remoteio channel to ssh userhost ``%s''\n" , userhost);
 
      free(userhost);
 
      return 1;
 
    }
 
  free(userhost);
 
  
 
  return 0;
 
}
 

	
 
int _remoteio_ssh_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread)
 
{
 
  return execio_read(rem->execio, buf, len, bytesread);
 
}
 

	
 
int _remoteio_ssh_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten)
 
{
 
  return execio_write(rem->execio, buf, len, byteswritten);
 
}
 

	
 
int _remoteio_ssh_close(struct remoteio *rem)
 
{
 
  int rtn;
 
  
 
  rtn = execio_close(rem->execio);
 
  if(rtn)
 
    fprintf(stderr, "%s:%d: error closing execio\n", __FILE__, __LINE__);
 
  
 
  return rtn;
 
}
 

	
 
#ifndef _WIN32
 
/*
 
  local sockets implementation (``named pipes''), unix-only
 
 */
 
int _remoteio_sock_open(struct remoteio *rem, struct remoteio_server *server)
 
{
 
  int sock;
 
  struct sockaddr_un sockaddr;
 

	
 
  /*
 
    The POSIX docs pretty much say that I can't depend on sockpath being able to be longer than 
 
    some proprietary length. So, if the compiler specifies a long path for RUNSTATEDIR, it could
 
    cause a buffer overflow.
 
   */
 
  char *sockpath = RUNSTATEDIR "/distrend.sock";
 
  unsigned int sockaddr_len;
 

	
 
  sock = socket(AF_UNIX, SOCK_STREAM, 0);
 
  if(sock == -1)
 
    {
 
      perror("socket");
 
      return 1;
 
    }
 

	
 
  sockaddr.sun_family = AF_UNIX;
 
  /*
 
    The terminating NULL should not be included in what's copied to sun_path,
 
    although it won't hurt as long as strlen(sockpath) < max socket length
 
   */
 
  for(sockaddr_len = 0; sockpath[sockaddr_len]; sockaddr_len ++)
 
    sockaddr.sun_path[sockaddr_len] = sockpath[sockaddr_len];
 

	
 
  if(connect(sock, (struct sockaddr *)&sockaddr, sockaddr_len) == -1)
 
    {
 
      perror("connect");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  rem->sock = sock;
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_sock_close(struct remoteio *rem)
 
{
 
  close(rem->sock);
 

	
 
  return 0;
 
}
 

	
 
#endif
 

	
 
int _remoteio_sock_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread)
 
{
 
  ssize_t readrtn;
 

	
 
  readrtn = read(rem->sock, buf, len);
 
  /*
 
    The following is valid for blocking sockets:
 
   */
 
  if(readrtn == -1)
 
    {
 
      /*
 
	in this case, we may have been interrupted by a signal and errno == EINTR
 
	or the connection was reset and errno = ECONNRESET
 

	
 
	Some of these are not error conditions:
 
       */
 
      perror("read");
 
      *bytesread = 0;
 

	
 
      if(errno != EINTR)
 
	{
 
	  fprintf(stderr, "error reading socket in remoteio_sock_read\n");
 
	  return 1;
 
	}
 

	
 
      return 0;
 
    }
 

	
 
  *bytesread = readrtn;
 
  if(!readrtn)
 
    {
 
      /*
 
	means EOF except when FD is in ``message-nondiscard'' or ``message-discard''
 
	modes.
 
       */
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_sock_write(struct remoteio *rem, void *buf, size_t len, size_t *byteswritten)
 
{
 
  ssize_t writertn;
 

	
 
  writertn = write(rem->sock, buf, len);
 

	
 
  if(writertn == -1)
 
    {
 
      perror("write");
 
      if(errno != EINTR)
 
	{
 
	  fprintf(stderr, "error writing to socket in remoteio_sock_write()\n");
 
	  return 1;
 
	}
 
    }
 

	
 
  *byteswritten = writertn;
 
  if(!writertn)
 
    {
 
      /*
 
	should we consider this an error? I'm pretty
 
	sure we should :-/
 
       */
 
      fprintf(stderr, "write() returned 0 in remoteio_sock_write()\n");
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
/**
 
   TCP, taking advantage of the two generic socks read/write functions:
 
 */
 
int _remoteio_tcp_open(struct remoteio *rem, struct remoteio_server *server)
 
{
 
  int tmp;
 
  int tmp2;
 

	
 
  int sock;
 

	
 
  char *hostname;
 
  char *port;
 

	
 
  struct addrinfo addrinfo_hints;
 
  struct addrinfo *addrinfo_res;
 

	
 
  /**
 
     only hostname should be free()-ed, not port,
 
     because both are from the same block of malloc()-ed
 
     memory
 
   */
 
  hostname = strdup(server->hostname);
 
  for(port = hostname;
 
      *port && *port != ':';
 
      port ++)
 
    ;
 
  if(*port)
 
    {
 
      *port = '\0';
 
      port ++;
 
    }
 
  else
 
    port = REMOTEIO_DEFAULT_PORT;
 

	
 
  memset(&addrinfo_hints, '\0', sizeof(struct addrinfo));
 
  addrinfo_hints.ai_family = AF_UNSPEC;
src/server/listen.c
Show inline comments
 
@@ -13,853 +13,852 @@
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "listen.h"
 
#include "common/protocol.h"
 

	
 
#include <errno.h>
 
#include <fcntl.h>
 
#include <list.h>
 
#include <malloc.h>
 
#include <netinet/in.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#include <poll.h>
 
#include <sys/socket.h>
 
#include <unistd.h>
 

	
 
/* local */
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
struct distrend_request_handler_info
 
{
 
  enum distren_request_type request_type;
 
  distrend_handle_request_func_t handler;
 
};
 

	
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 
int distrend_packets_collapse(QUEUE *queue, size_t len);
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata);
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset);
 

	
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events);
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock);
 

	
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo)
 
{
 
  struct distrend_listens *listens;
 

	
 
  listens = malloc(sizeof(struct distrend_listens));
 
  if(!listens)
 
    return NULL;
 

	
 
  listens->listen_socks = list_init();
 
  listens->request_handlers = list_init();
 
  listens->clients = list_init();
 
  listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */
 
  listens->pollfds_len = 2;
 
  listens->nfds = 0;
 
  listens->geninfo = geninfo;
 

	
 
  if(!listens->listen_socks
 
     || !listens->request_handlers
 
     || !listens->clients
 
     || !listens->pollfds)
 
    {
 
      if(listens->listen_socks)
 
	list_free(listens->listen_socks, LIST_NODEALLOC);
 
      if(listens->request_handlers)
 
	list_free(listens->request_handlers, LIST_NODEALLOC);
 
      if(listens->clients)
 
	list_free(listens->clients, LIST_NODEALLOC);
 
      free(listens->pollfds);
 
      free(listens);
 
      return NULL;
 
    }
 

	
 
  return listens;
 
}
 

	
 
/**
 
   clean up after a partially completed distrend_listen()
 
   call which ends in error.
 
 */
 
int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo)
 
{
 
  int sock;
 

	
 
  sock = sockinfo->sock.sock;
 
  if(sock >= 0)
 
    close(sock);
 

	
 
  free(sockinfo);
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_add(struct distrend_listens *listens, int port)
 
{
 
  int tmp;
 
  struct distrend_listen_sock *newsock;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
      .sin6_family = AF_INET6,
 
      .sin6_port = 0,
 
      .sin6_flowinfo = 0,
 
      .sin6_addr = IN6ADDR_ANY_INIT,
 
      .sin6_scope_id = 0
 
    };
 

	
 
  sockaddr.sin6_port = htons(port);
 

	
 
  newsock = malloc(sizeof(struct distrend_listen_sock));
 
  if(!newsock)
 
    return 1;
 

	
 
  newsock->port = port;
 
  newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  if(tmp == -1)
 
    {
 
      perror("bind");
 
      distrend_listen_add_unwind(newsock);
 

	
 
      return 1;
 
    }
 

	
 
  tmp = listen(newsock->sock.sock, 1);
 
  if(tmp == -1)
 
    {
 
      perror("listen");
 
      distrend_listen_add_unwind(newsock);
 
      
 
      return 1;
 
    }
 

	
 
  tmp = distrend_makesocknonblock(newsock->sock.sock);
 
  if(tmp)
 
    {
 
      /* error is already printed to user by distrend_makesocknonblock() */
 
      distrend_listen_add_unwind(newsock);
 
      return 1;
 
    }
 

	
 
  distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM);
 
  list_insert_after(listens->listen_socks, newsock, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_handleread(struct distrend_listens *listens,
 
			struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
  char buf[8192];
 

	
 
  struct distren_request *req;
 
  void *reqdata;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
      readlen = read(client->sock.sock, buf, sizeof(buf));
 
      if(readlen == -1)
 
	{
 
	  perror("read");
 
	  switch(errno)
 
	    {
 
	    case EINTR:
 
	    case EAGAIN:
 
	      break;
 

	
 
	    default:
 
	      client->state = DISTREND_CLIENT_DEAD;
 

	
 
	      free(packet);
 
	      return 1;
 
	    }
 
	}
 
      if(readlen == 0)
 
	/* handle EOF */
 
	{
 
	  client->state = DISTREND_CLIENT_DEAD;
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 

	
 
      packet->len = readlen;
 
      packet->data = malloc(readlen);
 
      if(!packet->data)
 
	{
 
	  fprintf(stderr, "OOM!\n");
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 
      memcpy(packet->data, buf, readlen);
 
      q_enqueue(client->inmsgs, packet, 0);
 
      client->inlen += packet->len;
 
      packet = NULL;
 

	
 
  /**
 
     Manage input, etc.
 
   */
 
  if(client->expectlen == 0)
 
    {
 
      /* search out header from input so far */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(packet->len > sizeof(struct distren_request))
 
	{
 
	  if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	    {
 
	      fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
 

	
 
	      client->state = DISTREND_CLIENT_DEAD;
 
	      return 1;
 
	    }
 
	  client->expectlen = req->len + sizeof(struct distren_request);
 
	  distren_request_free(req);
 
	}
 
    }
 
  if(client->expectlen
 
     && client->inlen >= client->expectlen)
 
    {
 
      /** essentially un-queue-ize the queue ;-) */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	{
 
	  fprintf(stderr, "error handling data from client\n");
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      if(packet->len - sizeof(struct distren_request) < req->len)
 
	{
 
	  fprintf(stderr, "error handling some data from client\n");
 
	  distren_request_free(req);
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      reqdata = malloc(req->len);
 
      if(!reqdata)
 
	{
 
	  fprintf(stderr, "OOM\n");
 

	
 
	  distren_request_free(req);
 
	  return 1;
 
	}
 
      memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len);
 
      memmove(packet->data,
 
	      ((void *)packet->data) + client->expectlen,
 
	      packet->len - client->expectlen);
 
      packet->len -= client->expectlen;
 

	
 
      client->inlen -= client->expectlen;
 
      client->expectlen = 0;
 

	
 
      distrend_dispatch_request(listens, client, req, reqdata);
 
      free(reqdata);
 
      distren_request_free(req);
 
    }
 
  return 0;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  struct distrend_listens *listens;
 
  time_t current_time;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 
  short revents;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  /**
 
     Manage timing-out clients.
 
   */
 
  if(data->current_time > client->cleanup_time)
 
    switch(client->state)
 
      {
 
      case DISTREND_CLIENT_PREVERSION:
 
	distrend_send_disconnect(data->listens, client, "You have failed to present version information in a timely manner. Cya :-p");
 
	break;
 
      case DISTREND_CLIENT_PREAUTH:
 
	distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	break;
 

	
 
      case DISTREND_CLIENT_GOOD:
 
	distrend_send_disconnect(data->listens, client, "Ping timeout :-p");
 
	break;
 

	
 
      case DISTREND_CLIENT_BAD:
 
	client->state = DISTREND_CLIENT_DEAD;
 
	return TRUE;
 

	
 
      default:
 
	break;
 
      }
 

	
 
  revents = data->listens->pollfds[client->sock.pollfd_offset].revents;
 
  if(revents & POLLRDNORM)
 
    /* handle reading */
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 
      distrend_handleread(data->listens, client);
 
    }
 
  if(revents & POLLWRNORM)
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 

	
 
      if(q_empty(client->outmsgs))
 
	{
 
	  data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	  return TRUE;
 
	}
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock.sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
      if(written_amount == -1)
 
	{
 
	  perror("write");
 
	  /*
 
	    distrend_client_free();
 

	
 
	    Until liblist has the ability to delete nodes
 
	    during traversal, we'll have to free the client
 
	    during a client-list cleanup/pruning cycle.
 
	  */
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	}
 
      if(packet->len == written_amount)
 
	{
 
	  q_dequeue(client->outmsgs);
 
	  distrend_packet_free(packet);
 

	
 
	  if(q_empty(client->outmsgs))
 
	    data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	}
 
      else
 
	{
 
	  /**
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	  memmove(packet->data, packet->data + written_amount, packet->len);
 
	}
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock)
 
{
 
  short revents;
 
  int newclientsock;
 

	
 
  revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents;
 
  fprintf(stderr, "blah %d\n", (int)revents);
 
  if(revents & POLLRDNORM)
 
    {
 
      newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREVERSION))
 
	{
 
	  fprintf(stderr, "error allocating/adding client struct\n");
 
	  return 1;
 
	}
 
      fprintf(stderr, "accepted new connection\n");
 
    }
 

	
 
  /**
 
     Check other listen blocks too.
 
   */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
{
 
  int tmp;
 

	
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds);
 
  poll(listens->pollfds, listens->nfds, -1);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 

	
 
      return 1;
 
    }
 

	
 
  /**
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.listens = listens;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 

	
 
  list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE);
 
  /**
 
     Until liblist supports deleting nodes during a traversal,
 
     we clean dead clients here:
 
   */
 
  list_mvfront(listens->clients);
 
  newclient = list_curr(listens->clients);
 
  while(newclient)
 
    {
 
      if(newclient->state == DISTREND_CLIENT_DEAD)
 
	{
 
	  distrend_listen_poll_deletefd(listens, &newclient->sock);
 
	  distrend_client_free(newclient);
 
	  list_remove_curr(listens->clients);
 
	  fprintf(stderr, "removed dead connection\n");
 
	}
 
      list_mvnext(listens->clients);
 
      /* provide for termination of this loop */
 
      if(newclient == list_curr(listens->clients))
 
	newclient = NULL;
 
      else
 
	newclient = list_curr(listens->clients);
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_free(struct distrend_listens *listens)
 
{
 
  fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
 

	
 
  return 1;
 
}
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
 
{
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client.... or should I? :-p\n", __FILE__, __LINE__);
 
}
 

	
 
/**
 
   Adds a newly connected distrend_client to the listens->clients list
 
 */
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state)
 
{
 
  int tmp;
 
  struct distrend_client *client;
 

	
 
  tmp = distrend_makesocknonblock(sock);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  client = malloc(sizeof(struct distrend_client));
 
  if(!client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      close(sock);
 
      return 1;
 
    }
 
  client->sock.sock = sock;
 
  distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */
 

	
 
  client->state = state;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  client->inlen = 0;
 
  client->expectlen = 0;
 
  client->inmsgs = q_init();
 
  client->outmsgs = q_init();
 

	
 
  list_insert_after(listens->clients, client, 0);
 

	
 
  /**
 
     For those using netcat/telnet to debug their internets.
 
   */
 
#ifndef PACKAGE_URL
 
#define PACKAGE_URL "http://ohnopub.net/distren/"
 
#endif
 
#define DISTREN_GREETING PACKAGE_STRING " " PACKAGE_URL " : Nathan Phillip Brink && Ethan Michael Zonca\n"
 
  /* using sizeof() - 1 because the sizeof() includes a NULL byte we want to ignore. */
 
  distrend_client_write(listens, client, DISTREN_GREETING, sizeof(DISTREN_GREETING) - 1);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   This function shall only be called after the appropriate
 
   distrend_listen_poll_deletefd() has been called
 
 */
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock.sock);
 
  
 
  return 0;
 
}
 

	
 
void distrend_packet_free(struct distrend_packet *packet)
 
{
 
  free(packet->data);
 
  free(packet); 
 
}
 

	
 
int distrend_makesocknonblock(int sock)
 
{
 
  int fdstatus;
 
  int tmp;
 

	
 
  fdstatus = fcntl(sock, F_GETFL);
 
  fdstatus |= O_NONBLOCK;
 
  tmp = fcntl(sock, F_SETFL, fdstatus);
 
  if(tmp == -1)
 
    {
 
      perror("fcntl");
 
      return 1;
 
    }
 
  
 
  return 0;
 
}
 

	
 

	
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen)
 
{
 
  struct distrend_packet *packet;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  packet->len = msglen;
 
  packet->data = malloc(msglen);
 
  if(!packet->data)
 
    {
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, towrite, msglen);
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM;
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data)
 
{
 
  char *towrite;
 
  int ret;
 
  size_t msglen;
 

	
 
  msglen = sizeof(struct distren_request) + req->len;
 

	
 
  towrite = malloc(msglen);
 
  if(!towrite)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(towrite, req, sizeof(struct distren_request));
 
  memcpy(towrite + sizeof(struct distren_request), data, req->len);
 

	
 
  ret = distrend_client_write(listens, client, towrite, msglen);
 

	
 
  free(towrite);
 
  return ret;
 
}
 

	
 
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
 
{
 
  struct distrend_packet *packet;
 

	
 
  *lenread = 0;
 
  *toread = NULL;
 

	
 
  if(q_empty(client->inmsgs))
 
    return 1;
 

	
 
  packet = q_dequeue(client->inmsgs);
 
  *lenread = packet->len;
 
  *toread = packet->data;
 
  free(packet);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   collapse a QUEUE of struct distrend_packet with a collective length
 
   of len into a single struct packet.
 
 */
 
int distrend_packets_collapse(QUEUE *queue, size_t len)
 
{
 
  struct distrend_packet *newpacket;
 
  struct distrend_packet *oldpacket;
 

	
 
  size_t copiedlen;
 

	
 
  newpacket = malloc(sizeof(struct distrend_packet));
 
  if(!newpacket)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  
 
  newpacket->data = malloc(len);
 
  if(!newpacket->data)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      perror("malloc");
 
      free(newpacket);
 

	
 
      return 1;
 
    }
 

	
 
  for(copiedlen = 0;
 
      copiedlen < len && !q_empty(queue);
 
      )
 
    {
 
      oldpacket = q_dequeue(queue);
 

	
 
      memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len);
 

	
 
      copiedlen += oldpacket->len;
 
      distrend_packet_free(oldpacket);
 
    }
 
  newpacket->len = copiedlen;
 

	
 
  if(copiedlen < len || !q_empty(queue))
 
    {
 
      fprintf(stderr, "inconsistency!\n");
 
      return 1;
 
    }
 

	
 
  q_enqueue(queue, newpacket, 0);
 

	
 
  return 0;
 
}
 

	
 

	
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg)
 
{
 
  struct distren_request *req;
 

	
 
  distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT);
 
  distrend_client_write_request(listens, client, req, quit_msg);
 
  distren_request_free(req);
 

	
 
  client->state = DISTREND_CLIENT_BAD;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_DISCONNECT_GRACE;
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler)
 
{
 
  struct distrend_request_handler_info *handler_info;
 

	
 
  handler_info = malloc(sizeof(struct distrend_request_handler_info));
 
  if(!handler_info)
 
    return 1;
 

	
 
  handler_info->request_type = type;
 
  handler_info->handler = handler;
 
  list_insert_after(listens->request_handlers, handler_info, 0);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_dispatch_request_data
 
{
 
  struct general_info *geninfo;
 
  struct distrend_client *client;
 
  struct distren_request *req;
 
  void *req_data;
 
};
 

	
 
/**
 
   traversal function for distrend_dispatch_request().
 
 */
 
int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info)
 
{
 
  if(handler_info->request_type == data->req->type)
 
    (*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data);
 

	
 
  return TRUE;
 
}
 

	
 
/**
 
   helper for distrend_handleread() which looks up the correct
 
   request handler and handles handing the the request to the
 
   handler. :-p
 
 */
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata)
 
{
 
  struct distrend_dispatch_request_data data;
 

	
 
  data.geninfo = listens->geninfo;
 
  data.client = client;
 
  data.req = req;
 
  data.req_data = reqdata;
 

	
 
  list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Helper for the distrend_listen_add() and functions that
 
   call accept() to maintain the struct pollfd list in listens.
 

	
 
   @param listens The info related to listening to sockets.
 
   @param fd the FD to add to the list of sockets.
 
   @param poll_events The poll() events to register in the list of sockets.
 
   @param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances.
 
 */
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events)
 
{
 
  size_t new_len;
 
  struct pollfd *new_pollfds;
 

	
 
  if(listens->nfds + 1 > listens->pollfds_len)
 
    {
 
      new_len = listens->pollfds_len * 2;
 
      new_pollfds = malloc(sizeof(struct pollfd) * new_len);
 
      if(!new_pollfds)
 
	return 1;
 

	
 
      memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len);
 

	
 
      free(listens->pollfds);
 
      listens->pollfds_len = new_len;
 
      listens->pollfds = new_pollfds;
 
    }
 

	
 
  sock->pollfd_offset = listens->nfds;
 
  listens->pollfds[sock->pollfd_offset].fd = sock->sock;
 
  listens->pollfds[sock->pollfd_offset].events = poll_events;
 
  listens->nfds ++;
 

	
 
  fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   Removes a particular struct pollfd from the listens->pollfds list. This
 
   will likely also require remapping at least one other existing pollfd
 
   entry, so this will also traverse the list of clients and update the
 
   entry_offset of the client which has to be remapped.
 

	
 
   @param listens the struct distrend_listens
 
   @param sock the relevent struct containing the offset of the pollfd that must be removed.
 
   @return 0 on success, 1 on error.
 
 */
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock)
 
{
 
  /*
 
    the socket which is being moved from the end of the list
 
    to the middle to fill in the hole left by removing sock
 
  */
 
  struct distrend_polled_sock *displaced_sock;
 

	
 
  /*
 
    special case: we're deleting the last element and thus
 
    don't need to remap an existing entry.
 
   */
 
  if(sock->pollfd_offset == listens->nfds - 1)
 
    {
 
      listens->nfds --;
 
      return 0;
 
    }
 

	
 
  /*
 
    Shift whatever's at the end of the pollfds array into the position
 
    of the pollfd being removed. Then update that client's struct distrend_client
 
    entry to point to the new offset.
 
   */
 
  memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd));
 
  displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1);
 
  if(!displaced_sock)
 
    {
 
      fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n",
 
	      (int)listens->nfds - 1, (int)listens->nfds);
 
      listens->nfds --;
 
      return 1;
 
    }
 

	
 
  displaced_sock->pollfd_offset = sock->pollfd_offset;
 

	
 
  listens->nfds --;
 
  return 0;
 
}
 

	
 
/**
 
   For _distrend_polled_sock_get_by_offset_traverse()
 
 */
 
struct distrend_polled_sock_get_by_offset_data
 
{
 
  struct distrend_polled_sock *sock;
 
  size_t pollfd_offset;
 
};
 

	
 
/**
 
   List traversal helper function for distrend_polled_sock_get_by_offset().
 

	
 
   @param sock The second argument is casting either sturct distrend_client or
 
   struct distrend_listen_sock to a struct distrend_polled_sock which works
 
   because each of the two former structs as a sturct distrend_polled_sock
 
   as its first member.
 
 */
0 comments (0 inline, 0 general)