Changeset - 3efc48659d0b
[Not reviewed]
default
0 4 0
Nathan Brink (binki) - 15 years ago 2010-07-02 01:17:27
ohnobinki@ohnopublishing.net
Use poll() instead of select() (poll() is the thing to use nowadays ;-) ).
4 files changed with 508 insertions and 218 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
@@ -80,13 +80,13 @@ struct general_info
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
int distrend_do();
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 

	
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int update_xml_joblist(struct general_info *geninfo);
 
@@ -97,14 +97,14 @@ int interactiveTest(int test, struct gen
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  int tmp;
 
  struct general_info general_info;
 
  struct distrend_clientset *clients;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
@@ -151,25 +151,36 @@ int main(int argc, char *argv[])
 
    }
 
  fprintf(stderr,"Finished connecting!\n");
 

	
 
  /** Execute test function */
 
  interactiveTest(test, &general_info);
 

	
 
  if(distrend_listen(general_info.config, &clients))
 
  general_info.config->listens = distrend_listens_new(&general_info);
 
  if(!general_info.config->listens)
 
    {
 
      fprintf(stderr, "error listening\n");
 
      fprintf(stderr, "error initializing listens\n");
 
      return 1;
 
    }
 
  for(counter = 0; general_info.config->listen_ports[counter]; counter ++)
 
    {
 
      tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]);
 
      if(tmp)
 
	{
 
	  fprintf(stderr, "Error listening on port %d\n", general_info.config->listen_ports[counter]);
 
	  return 1;
 
	}
 
    }
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  general_info.config->die = 0;
 
  while(!general_info.config->die)
 
    {
 
      int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)&general_info);
 
      distrend_accept(general_info.config->listens);
 

	
 
      /* Run the watchdog, @TODO: like every 10 mins or something */
 
      frame_watchdog(general_info.conn);
 

	
 
      struct frameset frame;
 
      struct distrenjob *job;
 
@@ -191,13 +202,13 @@ int main(int argc, char *argv[])
 
	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
	  finish_frame(general_info.conn, 0, job->jobnum, frame.num); // @TODO: Make sure this actually works.
 
	}
 
      distrenjob_free(&job);
 
    }
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_listen_free(general_info.config->listens);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
@@ -205,13 +216,13 @@ int main(int argc, char *argv[])
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
{
 
  size_t counter;
 
  char *tmp_str;
 
  char fixedbuf[32];
 

	
 
  /* for response requests... if that makes any less sense ;-) */
 
@@ -243,17 +254,17 @@ int distrend_handle_request(struct distr
 
	  strncpy(fixedbuf, reqdata, 31);
 
	  fixedbuf[31] = '\0';
 
	  if(req->len < 31)
 
	    fixedbuf[req->len] = '\0';
 

	
 
	  _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, fixedbuf);
 
	  distrend_send_disconnect(client, tmp_str);
 
	  distrend_send_disconnect(listens, client, tmp_str);
 
	}
 

	
 
      distren_request_new(&newreq, strlen(VERSION), DISTREN_REQUEST_VERSION);
 
      distrend_client_write_request(client, newreq, VERSION);
 
      distrend_client_write_request(listens, client, newreq, VERSION);
 
      distren_request_free(newreq);
 
      break;
 
    }
 

	
 
  return 0;
 
}
 
@@ -305,32 +316,35 @@ int distrend_do_config(int argc, char *a
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  (*config)->listen_ports = malloc(sizeof(int) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
 
      (*config)->listens[counter].sock = -1;
 
      (*config)->listen_ports[counter] = cfg_getint(cfg_listen, "port");
 
    }
 
  memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
 
  (*config)->listen_ports[counter] = 0;
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 

	
 
  return 0;
 
}
 

	
 
int distrend_config_free(struct distrend_config *config)
 
{
 
  distrend_listen_free(config->listens);
 
  options_free(config->options);
 
  free(config->listen_ports);
 
  free(config);
 

	
 
  return 0;
 
}
 

	
 
/* ************************** XML Functions ************************* */
 

	
 
// writes the general_info.xml file which is a copy of the general_info structure
 
// except that it doesn't hold free_clients and rendering_clients
 
void update_general_info(struct general_info *geninfo)
 
{
 
@@ -479,13 +493,12 @@ int interactiveTest(int test, struct gen
 
  int command;
 
  int32_t slaveKey = 1;
 
  jobnum_t jobKey = 0;
 
  int32_t frameNum = 0;
 
  int32_t newPriority = 0;
 
  int tmp = 0;
 
  struct distrend_clientset *clients;
 

	
 
  fprintf(stderr,"Hello!\n");
 

	
 
  while(test == 1)
 
   {
 
     fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
 
@@ -554,20 +567,15 @@ int interactiveTest(int test, struct gen
 

	
 
       start_frame(geninfo->conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Started Frame!\n");
 
       break;
 

	
 
     case 5:
 
       if(distrend_listen(geninfo->config, &clients))
 
	 {
 
	   fprintf(stderr, "error listening\n");
 
	   return 1;
 
	 }
 
       while(1)
 
	 {
 
	   distrend_accept(geninfo->config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)geninfo);
 
	   distrend_accept(geninfo->config->listens);
 
	   /*
 
	     code for reading data from clients either goes here or in distrend_accept().
 
	     it might make sense for us to just pass the distrend_accept() function a
 
	     callback which can handle packets or to have a generic packet handling
 
	     subsystem which gathers data into groups defined by by packet.h and then
 
	     passed onto the callback.
src/server/distrend.h
Show inline comments
 
@@ -28,13 +28,14 @@ struct distrend_config;
 
#include "listen.h"
 

	
 
struct distrend_config
 
{
 
  cfg_t *mycfg;
 
  struct options_common *options;
 
  struct distrend_listen *listens; /*< Null terminated array of structs */
 
  struct distrend_listens *listens;
 
  int *listen_ports;
 
  char *datadir;
 

	
 
  int die;
 

	
 
  char *mysql_user;
 
  char *mysql_host;
src/server/listen.c
Show inline comments
 
@@ -15,155 +15,179 @@
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "listen.h"
 
#include "common/protocol.h"
 

	
 
#include <errno.h>
 
#include <fcntl.h>
 
#include <list.h>
 
#include <malloc.h>
 
#include <netinet/in.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#include <sys/select.h>
 
#include <poll.h>
 
#include <sys/socket.h>
 
#include <unistd.h>
 

	
 
/* local */
 

	
 
struct distrend_clientset
 
{
 
  LIST *clients;
 

	
 
  int nfds;
 
};
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
 
struct distrend_request_handler_info
 
{
 
  enum distren_request_type request_type;
 
  distrend_handle_request_func_t handler;
 
};
 

	
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 
int distrend_packets_collapse(QUEUE *queue, size_t len);
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata);
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset);
 

	
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events);
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock);
 

	
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo)
 
{
 
  struct distrend_listens *listens;
 

	
 
  listens = malloc(sizeof(struct distrend_listens));
 
  if(!listens)
 
    return NULL;
 

	
 
  listens->listen_socks = list_init();
 
  listens->request_handlers = list_init();
 
  listens->clients = list_init();
 
  listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */
 
  listens->pollfds_len = 2;
 
  listens->nfds = 0;
 
  listens->geninfo = geninfo;
 

	
 
  if(!listens->listen_socks
 
     || !listens->request_handlers
 
     || !listens->clients
 
     || !listens->pollfds)
 
    {
 
      if(listens->listen_socks)
 
	list_free(listens->listen_socks, LIST_NODEALLOC);
 
      if(listens->request_handlers)
 
	list_free(listens->request_handlers, LIST_NODEALLOC);
 
      if(listens->clients)
 
	list_free(listens->clients, LIST_NODEALLOC);
 
      free(listens->pollfds);
 
      free(listens);
 
      return NULL;
 
    }
 

	
 
  return listens;
 
}
 

	
 
/**
 
   clean up after a partially completed distrend_listen()
 
   call which ends in error.
 
 */
 
int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter)
 
int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo)
 
{
 
  int sock;
 

	
 
  for(counter ++; counter > 0; counter --)
 
    {
 
      sock = config->listens[counter - 1].sock;
 
      if(sock >= 0)
 
	close(sock);
 
    }
 
  list_free(clients->clients, NULL);
 
  free(clients);
 
  sock = sockinfo->sock.sock;
 
  if(sock >= 0)
 
    close(sock);
 

	
 
  free(sockinfo);
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
 
int distrend_listen_add(struct distrend_listens *listens, int port)
 
{
 
  int tmp;
 
  size_t counter;
 
  struct distrend_listen_sock *newsock;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
      .sin6_family = AF_INET6,
 
      .sin6_port = 0,
 
      .sin6_flowinfo = 0,
 
      .sin6_addr = IN6ADDR_ANY_INIT,
 
      .sin6_scope_id = 0
 
    };
 

	
 
  *clients = malloc(sizeof(struct distrend_clientset));
 
  sockaddr.sin6_port = htons(port);
 

	
 
  (*clients)->clients = list_init();
 
  (*clients)->nfds = 1;
 
  newsock = malloc(sizeof(struct distrend_listen_sock));
 
  if(!newsock)
 
    return 1;
 

	
 
  for(counter = 0; config->listens[counter].port; counter ++)
 
  newsock->port = port;
 
  newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  if(tmp == -1)
 
    {
 
      sockaddr.sin6_port = htons(config->listens[counter].port);
 
      perror("bind");
 
      distrend_listen_add_unwind(newsock);
 

	
 
      config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0);
 
      tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
      if(tmp == -1)
 
	{
 
	  perror("bind");
 
	  distrend_listen_unwind(config, *clients, counter);
 
      return 1;
 
    }
 

	
 
	  return 1;
 
	}
 

	
 
      tmp = listen(config->listens->sock, 1);
 
      if(tmp == -1)
 
	{
 
	  perror("listen");
 
	  distrend_listen_unwind(config, *clients, counter);
 
  tmp = listen(newsock->sock.sock, 1);
 
  if(tmp == -1)
 
    {
 
      perror("listen");
 
      distrend_listen_add_unwind(newsock);
 
      
 
	  return 1;
 
	}
 
      return 1;
 
    }
 

	
 
      tmp = distrend_makesocknonblock(config->listens->sock);
 
      if(tmp)
 
	{
 
	  /* error is already printed to user by distrend_makesocknonblock() */
 
	  distrend_listen_unwind(config, *clients, counter);
 
	  return 1;
 
	}
 
  tmp = distrend_makesocknonblock(newsock->sock.sock);
 
  if(tmp)
 
    {
 
      /* error is already printed to user by distrend_makesocknonblock() */
 
      distrend_listen_add_unwind(newsock);
 
      return 1;
 
    }
 

	
 
  distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM);
 
  list_insert_after(listens->listen_socks, newsock, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_handleread(struct distrend_config *config,
 
			struct distrend_client *client,
 
			distrend_handle_request_t handlereq,
 
			void *handlereqdata)
 
int distrend_handleread(struct distrend_listens *listens,
 
			struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
  char buf[8192];
 

	
 
  struct distren_request *req;
 
  void *reqdata;
 

	
 
  /**
 
     we have to check that there's new data so that
 
     we may call ourself recursively
 
  */
 
  fd_set readfd;
 
  struct timeval selecttimeout;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
  FD_ZERO(&readfd);
 
  FD_SET(client->sock, &readfd);
 
  memset(&selecttimeout, '\0', sizeof(selecttimeout));
 
  select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout);
 
  if(FD_ISSET(client->sock, &readfd))
 
    {
 
      readlen = read(client->sock, buf, sizeof(buf));
 
  //  if(revents & POLLRDNORM)
 
  //{
 
      readlen = read(client->sock.sock, buf, sizeof(buf));
 
      if(readlen == -1)
 
	{
 
	  perror("read");
 
	  switch(errno)
 
	    {
 
	    case EINTR:
 
@@ -196,13 +220,13 @@ int distrend_handleread(struct distrend_
 
	  return 1;
 
	}
 
      memcpy(packet->data, buf, readlen);
 
      q_enqueue(client->inmsgs, packet, 0);
 
      client->inlen += packet->len;
 
      packet = NULL;
 
    } /* if(FD_ISSET(client->sock, &readfd)) */
 
      //    } /*  */
 

	
 
  /**
 
     Manage input, etc.
 
   */
 
  if(client->expectlen == 0)
 
    {
 
@@ -259,100 +283,76 @@ int distrend_handleread(struct distrend_
 
	      packet->len - client->expectlen);
 
      packet->len -= client->expectlen;
 

	
 
      client->inlen -= client->expectlen;
 
      client->expectlen = 0;
 

	
 
      (*handlereq)(client, req, reqdata, handlereqdata);
 
      distrend_dispatch_request(listens, client, req, reqdata);
 
      distren_request_free(req);
 
    }
 
  return 0;
 
}
 

	
 
struct distrend_accept_fdset_prepare_data
 
{
 
  fd_set readfds;
 
  fd_set writefds;
 
  /**
 
     stores the highest FD number which is one less than the first
 
   arfgument to select(). So do select(maxfd + 1, ...)
 
  */
 
  int maxfd;
 
};
 
int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data,
 
				  struct distrend_client *client)
 
{
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  if(client->sock > data->maxfd)
 
    data->maxfd = client->sock;
 
  FD_SET(client->sock, &data->readfds);
 
  /**
 
     Only select on an outgoing socket if we have something to
 
     say.
 
   */
 
  if(!q_empty(client->outmsgs))
 
    FD_SET(client->sock, &data->writefds);
 
  
 
  return TRUE;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  fd_set *fdset;
 
  struct distrend_config *config;
 
  distrend_handle_request_t handlereq;
 
  void *handlereqdata;
 
  enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
 
  struct distrend_listens *listens;
 
  time_t current_time;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 
  short revents;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  /**
 
     Manage timing-out clients.
 
   */
 
  if(data->current_time > client->cleanup_time)
 
    switch(client->state)
 
      {
 
      case DISTREND_CLIENT_PREAUTH:
 
	distrend_send_disconnect(client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	break;
 

	
 
      case DISTREND_CLIENT_GOOD:
 
	distrend_send_disconnect(client, "Ping timeout :-p");
 
	distrend_send_disconnect(data->listens, client, "Ping timeout :-p");
 
	break;
 

	
 
      case DISTREND_CLIENT_BAD:
 
	client->state = DISTREND_CLIENT_DEAD;
 
	return TRUE;
 

	
 
      default:
 
	break;
 
      }
 

	
 
  if(!FD_ISSET(client->sock, data->fdset))
 
    /** continue iteration through the list */
 
    return TRUE;
 
  revents = data->listens->pollfds[client->sock.pollfd_offset].revents;
 
  if(revents & POLLRDNORM)
 
    /* handle reading */
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 
      distrend_handleread(data->listens, client);
 
    }
 
  if(revents & POLLWRNORM)
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 

	
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
 
	  __FILE__, __LINE__, client->sock,
 
	  (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ");
 

	
 
  switch(data->mode)
 
    {
 
    case DISTREND_ACCEPT_CLIENT_WRITE:
 
      if(q_empty(client->outmsgs))
 
	return TRUE;
 
	{
 
	  data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	  return TRUE;
 
	}
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock, packet->data, packet->len);
 
      written_amount = write(client->sock.sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
      if(written_amount == -1)
 
	{
 
	  perror("write");
 
@@ -366,116 +366,110 @@ int distrend_accept_client_proc(struct d
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	}
 
      if(packet->len == written_amount)
 
	{
 
	  q_dequeue(client->outmsgs);
 
	  distrend_packet_free(packet);
 

	
 
	  if(q_empty(client->outmsgs))
 
	    data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	}
 
      else
 
	{
 
	  /**
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	  memmove(packet->data, packet->data + written_amount, packet->len);
 
	}
 
      break;
 

	
 
    case DISTREND_ACCEPT_CLIENT_READ:
 
      distrend_handleread(data->config, client, data->handlereq, data->handlereqdata);
 
      break;
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock)
 
{
 
  short revents;
 
  int newclientsock;
 

	
 
  revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents;
 
  fprintf(stderr, "blah %d\n", (int)revents);
 
  if(revents & POLLRDNORM)
 
    {
 
      newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating/adding client struct\n");
 
	  return 1;
 
	}
 
      fprintf(stderr, "accepted new connection\n");
 
    }
 

	
 
  /**
 
     Check other listen blocks too.
 
   */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
{
 
  int tmp;
 
  int newclientsock;
 

	
 
  struct distrend_accept_fdset_prepare_data fdsets;
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  FD_ZERO(&fdsets.readfds);
 
  FD_ZERO(&fdsets.writefds);
 
  FD_SET(config->listens->sock, &fdsets.readfds);
 
  fdsets.maxfd = config->listens->sock;
 

	
 
  list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR);
 

	
 
  fprintf(stderr, "select()...");
 
  tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL);
 
  fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds);
 
  poll(listens->pollfds, listens->nfds, -1);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 

	
 
      return 1;
 
    }
 

	
 
  /**
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.config = config;
 
  travinfo.listens = listens;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 

	
 
  travinfo.fdset = &fdsets.writefds;
 
  travinfo.handlereq = handlereq;
 
  travinfo.handlereqdata = handlereqdata;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  travinfo.fdset = &fdsets.readfds;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 
  list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  if(FD_ISSET(config->listens->sock, &fdsets.readfds))
 
    {
 
      newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating client struct\n");
 
	  return 1;
 
	}
 
      clients->nfds ++;
 
      list_insert_after(clients->clients, newclient, 0);
 
      fprintf(stderr, "accepted new connection\n");
 
      distrend_client_write(newclient, "hello\n", 6);
 
    }
 

	
 
  list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE);
 
  /**
 
     Until liblist supports deleting nodes during a traversal,
 
     we clean dead clients here:
 
   */
 
  list_mvfront(clients->clients);
 
  newclient = list_curr(clients->clients);
 
  list_mvfront(listens->clients);
 
  newclient = list_curr(listens->clients);
 
  while(newclient)
 
    {
 
      if(newclient->state == DISTREND_CLIENT_DEAD)
 
	{
 
	  distrend_listen_poll_deletefd(listens, &newclient->sock);
 
	  distrend_client_free(newclient);
 
	  list_remove_curr(clients->clients);
 
	  list_remove_curr(listens->clients);
 
	  fprintf(stderr, "removed dead connection\n");
 
	}
 
      list_mvnext(clients->clients);
 
      list_mvnext(listens->clients);
 
      /* provide for termination of this loop */
 
      if(newclient == list_curr(clients->clients))
 
      if(newclient == list_curr(listens->clients))
 
	newclient = NULL;
 
      else
 
	newclient = list_curr(clients->clients);
 
	newclient = list_curr(listens->clients);
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients)
 
int distrend_listen_free(struct distrend_listens *listens)
 
{
 
  fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
 

	
 
  return 1;
 
}
 
/**
 
@@ -483,46 +477,65 @@ int distrend_unlisten(struct distrend_li
 
*/
 
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
 
{
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
 
}
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
 
/**
 
   Adds a newly connected distrend_client to the listens->clients list
 
 */
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state)
 
{
 
  int tmp;
 
  struct distrend_client *client;
 

	
 
  tmp = distrend_makesocknonblock(sock);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  *client = malloc(sizeof(struct distrend_client));
 
  if(!*client)
 
  client = malloc(sizeof(struct distrend_client));
 
  if(!client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      close(sock);
 
      return 1;
 
    }
 
  (*client)->sock = sock;
 
  (*client)->state = state;
 
  (*client)->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  (*client)->inlen = 0;
 
  (*client)->expectlen = 0;
 
  (*client)->inmsgs = q_init();
 
  (*client)->outmsgs = q_init();
 
  client->sock.sock = sock;
 
  distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */
 

	
 
  client->state = state;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  client->inlen = 0;
 
  client->expectlen = 0;
 
  client->inmsgs = q_init();
 
  client->outmsgs = q_init();
 

	
 
  list_insert_after(listens->clients, client, 0);
 

	
 
  /**
 
     For those using netcat/telnet to debug their internets.
 
   */
 
  distrend_client_write(listens, client, "distren\n", 8);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   This function shall only be called after the appropriate
 
   distrend_listen_poll_deletefd() has been called
 
 */
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock);
 
  close(client->sock.sock);
 
  
 
  return 0;
 
}
 

	
 
void distrend_packet_free(struct distrend_packet *packet)
 
{
 
@@ -545,13 +558,13 @@ int distrend_makesocknonblock(int sock)
 
    }
 
  
 
  return 0;
 
}
 

	
 

	
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen)
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen)
 
{
 
  struct distrend_packet *packet;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
@@ -569,16 +582,18 @@ int distrend_client_write(struct distren
 
    }
 

	
 
  memcpy(packet->data, towrite, msglen);
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM;
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data)
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data)
 
{
 
  char *towrite;
 
  int ret;
 
  size_t msglen;
 

	
 
  msglen = sizeof(struct distren_request) + req->len;
 
@@ -590,13 +605,13 @@ int distrend_client_write_request(struct
 
      return 1;
 
    }
 

	
 
  memcpy(towrite, req, sizeof(struct distren_request));
 
  memcpy(towrite + sizeof(struct distren_request), data, req->len);
 

	
 
  ret = distrend_client_write(client, towrite, msglen);
 
  ret = distrend_client_write(listens, client, towrite, msglen);
 

	
 
  free(towrite);
 
  return ret;
 
}
 

	
 
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
 
@@ -667,19 +682,214 @@ int distrend_packets_collapse(QUEUE *que
 
  q_enqueue(queue, newpacket, 0);
 

	
 
  return 0;
 
}
 

	
 

	
 
int distrend_send_disconnect(struct distrend_client *client, char *quit_msg)
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg)
 
{
 
  struct distren_request *req;
 

	
 
  distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT);
 
  distrend_client_write_request(client, req, quit_msg);
 
  distrend_client_write_request(listens, client, req, quit_msg);
 
  distren_request_free(req);
 

	
 
  client->state = DISTREND_CLIENT_BAD;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_DISCONNECT_GRACE;
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler)
 
{
 
  struct distrend_request_handler_info *handler_info;
 

	
 
  handler_info = malloc(sizeof(struct distrend_request_handler_info));
 
  if(!handler_info)
 
    return 1;
 

	
 
  handler_info->request_type = type;
 
  handler_info->handler = handler;
 
  list_insert_after(listens->request_handlers, handler_info, 0);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_dispatch_request_data
 
{
 
  struct general_info *geninfo;
 
  struct distrend_client *client;
 
  struct distren_request *req;
 
  void *req_data;
 
};
 

	
 
/**
 
   traversal function for distrend_dispatch_request().
 
 */
 
int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info)
 
{
 
  if(handler_info->request_type == data->req->type)
 
    (*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data);
 

	
 
  return TRUE;
 
}
 

	
 
/**
 
   helper for distrend_handleread() which looks up the correct
 
   request handler and handles handing the the request to the
 
   handler. :-p
 
 */
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata)
 
{
 
  struct distrend_dispatch_request_data data;
 

	
 
  data.geninfo = listens->geninfo;
 
  data.client = client;
 
  data.req = req;
 
  data.req_data = reqdata;
 

	
 
  list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Helper for the distrend_listen_add() and functions that
 
   call accept() to maintain the struct pollfd list in listens.
 

	
 
   @param listens The info related to listening to sockets.
 
   @param fd the FD to add to the list of sockets.
 
   @param poll_events The poll() events to register in the list of sockets.
 
   @param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances.
 
 */
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events)
 
{
 
  size_t new_len;
 
  struct pollfd *new_pollfds;
 

	
 
  if(listens->nfds + 1 > listens->pollfds_len)
 
    {
 
      new_len = listens->pollfds_len * 2;
 
      new_pollfds = malloc(sizeof(struct pollfd) * new_len);
 
      if(!new_pollfds)
 
	return 1;
 

	
 
      memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len);
 

	
 
      free(listens->pollfds);
 
      listens->pollfds_len = new_len;
 
      listens->pollfds = new_pollfds;
 
    }
 

	
 
  sock->pollfd_offset = listens->nfds;
 
  listens->pollfds[sock->pollfd_offset].fd = sock->sock;
 
  listens->pollfds[sock->pollfd_offset].events = poll_events;
 
  listens->nfds ++;
 

	
 
  fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   Removes a particular struct pollfd from the listens->pollfds list. This
 
   will likely also require remapping at least one other existing pollfd
 
   entry, so this will also traverse the list of clients and update the
 
   entry_offset of the client which has to be remapped.
 

	
 
   @param listens the struct distrend_listens
 
   @param sock the relevent struct containing the offset of the pollfd that must be removed.
 
   @return 0 on success, 1 on error.
 
 */
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock)
 
{
 
  /*
 
    the socket which is being moved from the end of the list
 
    to the middle to fill in the hole left by removing sock
 
  */
 
  struct distrend_polled_sock *displaced_sock;
 

	
 
  /*
 
    special case: we're deleting the last element and thus
 
    don't need to remap an existing entry.
 
   */
 
  if(sock->pollfd_offset == listens->nfds - 1)
 
    {
 
      listens->nfds --;
 
      return 0;
 
    }
 

	
 
  /*
 
    Shift whatever's at the end of the pollfds array into the position
 
    of the pollfd being removed. Then update that client's struct distrend_client
 
    entry to point to the new offset.
 
   */
 
  memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd));
 
  displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1);
 
  if(!displaced_sock)
 
    {
 
      fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n",
 
	      (int)listens->nfds - 1, (int)listens->nfds);
 
      listens->nfds --;
 
      return 1;
 
    }
 

	
 
  displaced_sock->pollfd_offset = sock->pollfd_offset;
 

	
 
  listens->nfds --;
 
  return 0;
 
}
 

	
 
/**
 
   For _distrend_polled_sock_get_by_offset_traverse()
 
 */
 
struct distrend_polled_sock_get_by_offset_data
 
{
 
  struct distrend_polled_sock *sock;
 
  size_t pollfd_offset;
 
};
 

	
 
/**
 
   List traversal helper function for distrend_polled_sock_get_by_offset().
 

	
 
   @param sock The second argument is casting either sturct distrend_client or
 
   struct distrend_listen_sock to a struct distrend_polled_sock which works
 
   because each of the two former structs as a sturct distrend_polled_sock
 
   as its first member.
 
 */
 
int _distrend_polled_sock_get_by_offset_traverse(struct distrend_polled_sock_get_by_offset_data *data, struct distrend_polled_sock *sock)
 
{
 
  if(data->pollfd_offset == sock->pollfd_offset)
 
    {
 
      data->sock = sock;
 
      return FALSE;
 
    }
 
  return TRUE;
 
}
 

	
 
/**
 
   Returns a struct distrend_client based on its offset in the listens->pollfds
 
   array.
 

	
 
   @param listens the listens.
 
   @param pollfds_offset the index in the listens->pollfds array of the client we should find
 
   @return NULL if the client is not found or a pointer to the struct distrend_client for the specified client. Generally, you would not free() this but delete the client by first removing it from the listen->client list.
 
 */
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset)
 
{
 
  struct distrend_polled_sock_get_by_offset_data data;
 

	
 
  data.sock = NULL;
 
  data.pollfd_offset = pollfds_offset;
 

	
 
  /**
 
     These traversals both depend on struct distrend_polled_sock being
 
     the first entry of struct distrend_listens and struct distrend_client.
 
   */
 
  list_traverse(listens->clients, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
 
  if(data.sock == NULL)
 
    list_traverse(listens->listen_socks, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
 

	
 
  return data.sock;
 
}
 

	
src/server/listen.h
Show inline comments
 
@@ -14,22 +14,23 @@
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
struct distrend_clientset;
 
struct distrend_listen;
 
struct general_info;
 
struct distrend_listens;
 
struct distrend_client;
 

	
 
#ifndef _DISTREN_LISTEN_H
 
#define _DISTREN_LISTEN_H
 

	
 
#include "distrend.h"
 
#include "common/protocol.h"
 

	
 
#include <poll.h>
 
#include <queue.h>
 
#include <time.h>
 

	
 
/**
 
   How long a client has after connecting to send
 
   authentication information before his connection is cleaned
 
@@ -66,21 +67,63 @@ enum distrend_client_state
 
       The socket used to communicate with the client is closed. Its entry
 
       in the client list should be removed on the next garbage clean-up round.
 
     */
 
    DISTREND_CLIENT_DEAD
 
  };
 

	
 
struct distrend_listen
 
/**
 
   inheritence in C !?!?
 
 */
 
struct distrend_polled_sock
 
{
 
  int sock;
 
  /* this socket's offset in the listens->pollfds array. */
 
  size_t pollfd_offset;
 
};
 

	
 
struct distrend_listen_sock
 
{
 
  /**
 
     sock must be first, see distrend_polled_socket_get_by_offset()
 
   */
 
  struct distrend_polled_sock sock;
 
  int port;
 
  int sock;
 
};
 

	
 
struct distrend_listens
 
{
 
  /* of type (struct distrend_listen_sock *) */
 
  list_t listen_socks;
 
  /* of type (struct distrend_request_handler_info) */
 
  list_t request_handlers;
 
  /* the data to pass onto all request handlers */
 
  struct general_info *geninfo;
 

	
 
  /* of type (struct distrend_client) */
 
  list_t clients;
 

	
 
  /* the array passed to poll() */
 
  struct pollfd *pollfds;
 
  /* the number of entries pollfds could hold */
 
  nfds_t pollfds_len;
 
  /* the number of entries that pollfds does hold */
 
  nfds_t nfds;
 
};
 

	
 

	
 
/**
 
   The information necessary to recieve data from and send data
 
   to a client.
 
 */
 
struct distrend_client
 
{
 
  int sock;
 
  /**
 
     sock must be first, see distrend_polled_socket_get_by_offset()
 
   */
 
  struct distrend_polled_sock sock;
 

	
 
  enum distrend_client_state state;
 

	
 
  /**
 
     The absolute time at which this client's entry in the client list will be
 
     expired, closed, and marked as dead so that it may be cleaned up. This is
 
     used to implement ping timeouts (if state == DISTREND_CLIENT_GOOD) and 
 
@@ -89,62 +132,90 @@ struct distrend_client
 
  time_t cleanup_time;
 

	
 
  size_t inlen; /*< number of bytes waiting to be processed */
 
  size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */
 
  QUEUE *inmsgs;
 
  QUEUE *outmsgs;
 

	
 
};
 

	
 

	
 

	
 
typedef int(*distrend_handle_request_t)(struct distrend_client *client, struct distren_request *req, void *reqdata, void *data);
 
/**
 
   A function signature that may be registered as a client
 
   request handler.
 

	
 
   @param client the client that sent the request
 
   @param len the length of the message in bytes
 
   @param data the message received from the client
 
 */
 
typedef int(*distrend_handle_request_func_t)(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data);
 

	
 
/**
 
   initializes the listens and clientset
 
   @param config the configuration from distrend
 
   @param clients a pointer to a struct distrend_clientset pointer which will be set to memory allocated for the clientset
 
   Initializes the listens member of struct distrend_config.
 

	
 
   @param geninfo general info to apss to the request handler.
 
   @return Must be free()d with distrend_listen_free();
 
*/
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo);
 

	
 
/**
 
   Adds a socket and configures it to listen().
 

	
 

	
 
   @param listens The handle for this set of listens, obtained via distrend_listen_init().
 
 */
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients);
 
int distrend_listen_add(struct distrend_listens *listens, int port);
 

	
 
/**
 
   Register a request handler with the listener.
 

	
 
   @param config distrend's configuration
 
   @param type the request type for which this handler should be called
 
   @param handler the handler to call when a request of type type is received.
 
 */
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler);
 

	
 
/**
 
   checks states of the sockets I'm managing. If there are any new connections,
 
   or activity on any sockets, I'll call the appropriate function.
 
   I will block until there is some sort of activity, including
 
   signals. If you want to cleanly shut down, it's best to register
 
   signal handlers somewhere
 

	
 
   @param listens the config->listens after being initialized with distrend_listen()
 
 */
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata);
 
int distrend_accept(struct distrend_listens *listens);
 

	
 
/**
 
   cleans listening socket. Unnecessary for a working server, currently a stub.
 
   cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub.
 
 */
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients);
 
int distrend_listen_free(struct distrend_listens *listens);
 

	
 
/**
 
   writes message to client.
 
   @param towrite the caller is expected to free this string. This function will
 
   strdup() it, in essence.
 
 */
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen);
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen);
 

	
 
/**
 
   writes request to client.
 
   @param client client to write to
 
   @param req the request struct. caller must free this.
 
   @param data the data of the request which is req->len bytes long. caller must free this.
 
 */
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data);
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client();
 

	
 
/**
 
   Queue a DISTREN_REQUEST_DISCONNECT and prepare a client
 
   to be disconnected.
 
 */
 
int distrend_send_disconnect(struct distrend_client *client, char *quit_msg);
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg);
 

	
 

	
 
#endif
0 comments (0 inline, 0 general)