Changeset - 3efc48659d0b
[Not reviewed]
default
0 4 0
Nathan Brink (binki) - 15 years ago 2010-07-02 01:17:27
ohnobinki@ohnopublishing.net
Use poll() instead of select() (poll() is the thing to use nowadays ;-) ).
4 files changed with 492 insertions and 202 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
@@ -83,7 +83,7 @@ struct general_info
 
int distrend_do();
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 

	
 

	
 
/* **************XML Functions**************** */
 
@@ -100,8 +100,8 @@ int main(int argc, char *argv[])
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  int tmp;
 
  struct general_info general_info;
 
  struct distrend_clientset *clients;
 

	
 
  enum clientstatus
 
  {
 
@@ -154,19 +154,30 @@ int main(int argc, char *argv[])
 
  /** Execute test function */
 
  interactiveTest(test, &general_info);
 

	
 
  if(distrend_listen(general_info.config, &clients))
 
  general_info.config->listens = distrend_listens_new(&general_info);
 
  if(!general_info.config->listens)
 
    {
 
      fprintf(stderr, "error listening\n");
 
      fprintf(stderr, "error initializing listens\n");
 
      return 1;
 
    }
 
  for(counter = 0; general_info.config->listen_ports[counter]; counter ++)
 
    {
 
      tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]);
 
      if(tmp)
 
	{
 
	  fprintf(stderr, "Error listening on port %d\n", general_info.config->listen_ports[counter]);
 
	  return 1;
 
	}
 
    }
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  general_info.config->die = 0;
 
  while(!general_info.config->die)
 
    {
 
      int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)&general_info);
 
      distrend_accept(general_info.config->listens);
 

	
 
      /* Run the watchdog, @TODO: like every 10 mins or something */
 
      frame_watchdog(general_info.conn);
 
@@ -194,7 +205,7 @@ int main(int argc, char *argv[])
 
      distrenjob_free(&job);
 
    }
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_listen_free(general_info.config->listens);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 
@@ -208,7 +219,7 @@ int main(int argc, char *argv[])
 

	
 
/* ********************** Functions ************************* */
 

	
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
{
 
  size_t counter;
 
  char *tmp_str;
 
@@ -246,11 +257,11 @@ int distrend_handle_request(struct distr
 
	    fixedbuf[req->len] = '\0';
 

	
 
	  _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, fixedbuf);
 
	  distrend_send_disconnect(client, tmp_str);
 
	  distrend_send_disconnect(listens, client, tmp_str);
 
	}
 

	
 
      distren_request_new(&newreq, strlen(VERSION), DISTREN_REQUEST_VERSION);
 
      distrend_client_write_request(client, newreq, VERSION);
 
      distrend_client_write_request(listens, client, newreq, VERSION);
 
      distren_request_free(newreq);
 
      break;
 
    }
 
@@ -308,26 +319,29 @@ int distrend_do_config(int argc, char *a
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  (*config)->listen_ports = malloc(sizeof(int) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
 
      (*config)->listens[counter].sock = -1;
 
      (*config)->listen_ports[counter] = cfg_getint(cfg_listen, "port");
 
    }
 
  memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
 
  (*config)->listen_ports[counter] = 0;
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 

	
 
  return 0;
 
}
 

	
 
int distrend_config_free(struct distrend_config *config)
 
{
 
  distrend_listen_free(config->listens);
 
  options_free(config->options);
 
  free(config->listen_ports);
 
  free(config);
 

	
 
  return 0;
 
}
 

	
 
/* ************************** XML Functions ************************* */
 

	
 
// writes the general_info.xml file which is a copy of the general_info structure
 
@@ -482,7 +496,6 @@ int interactiveTest(int test, struct gen
 
  int32_t frameNum = 0;
 
  int32_t newPriority = 0;
 
  int tmp = 0;
 
  struct distrend_clientset *clients;
 

	
 
  fprintf(stderr,"Hello!\n");
 

	
 
@@ -557,14 +570,9 @@ int interactiveTest(int test, struct gen
 
       break;
 

	
 
     case 5:
 
       if(distrend_listen(geninfo->config, &clients))
 
	 {
 
	   fprintf(stderr, "error listening\n");
 
	   return 1;
 
	 }
 
       while(1)
 
	 {
 
	   distrend_accept(geninfo->config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)geninfo);
 
	   distrend_accept(geninfo->config->listens);
 
	   /*
 
	     code for reading data from clients either goes here or in distrend_accept().
 
	     it might make sense for us to just pass the distrend_accept() function a
src/server/distrend.h
Show inline comments
 
@@ -31,7 +31,8 @@ struct distrend_config
 
{
 
  cfg_t *mycfg;
 
  struct options_common *options;
 
  struct distrend_listen *listens; /*< Null terminated array of structs */
 
  struct distrend_listens *listens;
 
  int *listen_ports;
 
  char *datadir;
 

	
 
  int die;
src/server/listen.c
Show inline comments
 
@@ -18,6 +18,7 @@
 
*/
 

	
 
#include "listen.h"
 
#include "common/protocol.h"
 

	
 
#include <errno.h>
 
#include <fcntl.h>
 
@@ -27,55 +28,91 @@
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#include <sys/select.h>
 
#include <poll.h>
 
#include <sys/socket.h>
 
#include <unistd.h>
 

	
 
/* local */
 

	
 
struct distrend_clientset
 
{
 
  LIST *clients;
 

	
 
  int nfds;
 
};
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
 
struct distrend_request_handler_info
 
{
 
  enum distren_request_type request_type;
 
  distrend_handle_request_func_t handler;
 
};
 

	
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 
int distrend_packets_collapse(QUEUE *queue, size_t len);
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata);
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset);
 

	
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events);
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock);
 

	
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo)
 
{
 
  struct distrend_listens *listens;
 

	
 
  listens = malloc(sizeof(struct distrend_listens));
 
  if(!listens)
 
    return NULL;
 

	
 
  listens->listen_socks = list_init();
 
  listens->request_handlers = list_init();
 
  listens->clients = list_init();
 
  listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */
 
  listens->pollfds_len = 2;
 
  listens->nfds = 0;
 
  listens->geninfo = geninfo;
 

	
 
  if(!listens->listen_socks
 
     || !listens->request_handlers
 
     || !listens->clients
 
     || !listens->pollfds)
 
    {
 
      if(listens->listen_socks)
 
	list_free(listens->listen_socks, LIST_NODEALLOC);
 
      if(listens->request_handlers)
 
	list_free(listens->request_handlers, LIST_NODEALLOC);
 
      if(listens->clients)
 
	list_free(listens->clients, LIST_NODEALLOC);
 
      free(listens->pollfds);
 
      free(listens);
 
      return NULL;
 
    }
 

	
 
  return listens;
 
}
 

	
 
/**
 
   clean up after a partially completed distrend_listen()
 
   call which ends in error.
 
 */
 
int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter)
 
int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo)
 
{
 
  int sock;
 

	
 
  for(counter ++; counter > 0; counter --)
 
    {
 
      sock = config->listens[counter - 1].sock;
 
  sock = sockinfo->sock.sock;
 
      if(sock >= 0)
 
	close(sock);
 
    }
 
  list_free(clients->clients, NULL);
 
  free(clients);
 

	
 
  free(sockinfo);
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
 
int distrend_listen_add(struct distrend_listens *listens, int port)
 
{
 
  int tmp;
 
  size_t counter;
 
  struct distrend_listen_sock *newsock;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
@@ -86,50 +123,48 @@ int distrend_listen(struct distrend_conf
 
      .sin6_scope_id = 0
 
    };
 

	
 
  *clients = malloc(sizeof(struct distrend_clientset));
 

	
 
  (*clients)->clients = list_init();
 
  (*clients)->nfds = 1;
 
  sockaddr.sin6_port = htons(port);
 

	
 
  for(counter = 0; config->listens[counter].port; counter ++)
 
    {
 
      sockaddr.sin6_port = htons(config->listens[counter].port);
 
  newsock = malloc(sizeof(struct distrend_listen_sock));
 
  if(!newsock)
 
    return 1;
 

	
 
      config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0);
 
      tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  newsock->port = port;
 
  newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
      if(tmp == -1)
 
	{
 
	  perror("bind");
 
	  distrend_listen_unwind(config, *clients, counter);
 
      distrend_listen_add_unwind(newsock);
 

	
 
	  return 1;
 
	}
 

	
 
      tmp = listen(config->listens->sock, 1);
 
  tmp = listen(newsock->sock.sock, 1);
 
      if(tmp == -1)
 
	{
 
	  perror("listen");
 
	  distrend_listen_unwind(config, *clients, counter);
 
      distrend_listen_add_unwind(newsock);
 
      
 
	  return 1;
 
	}
 

	
 
      tmp = distrend_makesocknonblock(config->listens->sock);
 
  tmp = distrend_makesocknonblock(newsock->sock.sock);
 
      if(tmp)
 
	{
 
	  /* error is already printed to user by distrend_makesocknonblock() */
 
	  distrend_listen_unwind(config, *clients, counter);
 
      distrend_listen_add_unwind(newsock);
 
	  return 1;
 
	}
 
    }
 

	
 
  distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM);
 
  list_insert_after(listens->listen_socks, newsock, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_handleread(struct distrend_config *config,
 
			struct distrend_client *client,
 
			distrend_handle_request_t handlereq,
 
			void *handlereqdata)
 
int distrend_handleread(struct distrend_listens *listens,
 
			struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
@@ -138,13 +173,6 @@ int distrend_handleread(struct distrend_
 
  struct distren_request *req;
 
  void *reqdata;
 

	
 
  /**
 
     we have to check that there's new data so that
 
     we may call ourself recursively
 
  */
 
  fd_set readfd;
 
  struct timeval selecttimeout;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
@@ -154,13 +182,9 @@ int distrend_handleread(struct distrend_
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
  FD_ZERO(&readfd);
 
  FD_SET(client->sock, &readfd);
 
  memset(&selecttimeout, '\0', sizeof(selecttimeout));
 
  select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout);
 
  if(FD_ISSET(client->sock, &readfd))
 
    {
 
      readlen = read(client->sock, buf, sizeof(buf));
 
  //  if(revents & POLLRDNORM)
 
  //{
 
      readlen = read(client->sock.sock, buf, sizeof(buf));
 
      if(readlen == -1)
 
	{
 
	  perror("read");
 
@@ -199,7 +223,7 @@ int distrend_handleread(struct distrend_
 
      q_enqueue(client->inmsgs, packet, 0);
 
      client->inlen += packet->len;
 
      packet = NULL;
 
    } /* if(FD_ISSET(client->sock, &readfd)) */
 
      //    } /*  */
 

	
 
  /**
 
     Manage input, etc.
 
@@ -262,48 +286,15 @@ int distrend_handleread(struct distrend_
 
      client->inlen -= client->expectlen;
 
      client->expectlen = 0;
 

	
 
      (*handlereq)(client, req, reqdata, handlereqdata);
 
      distrend_dispatch_request(listens, client, req, reqdata);
 
      distren_request_free(req);
 
    }
 
  return 0;
 
}
 

	
 
struct distrend_accept_fdset_prepare_data
 
{
 
  fd_set readfds;
 
  fd_set writefds;
 
  /**
 
     stores the highest FD number which is one less than the first
 
   arfgument to select(). So do select(maxfd + 1, ...)
 
  */
 
  int maxfd;
 
};
 
int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data,
 
				  struct distrend_client *client)
 
{
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  if(client->sock > data->maxfd)
 
    data->maxfd = client->sock;
 
  FD_SET(client->sock, &data->readfds);
 
  /**
 
     Only select on an outgoing socket if we have something to
 
     say.
 
   */
 
  if(!q_empty(client->outmsgs))
 
    FD_SET(client->sock, &data->writefds);
 
  
 
  return TRUE;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  fd_set *fdset;
 
  struct distrend_config *config;
 
  distrend_handle_request_t handlereq;
 
  void *handlereqdata;
 
  enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
 
  struct distrend_listens *listens;
 
  time_t current_time;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
@@ -311,19 +302,23 @@ int distrend_accept_client_proc(struct d
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 
  short revents;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  /**
 
     Manage timing-out clients.
 
   */
 
  if(data->current_time > client->cleanup_time)
 
    switch(client->state)
 
      {
 
      case DISTREND_CLIENT_PREAUTH:
 
	distrend_send_disconnect(client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	break;
 

	
 
      case DISTREND_CLIENT_GOOD:
 
	distrend_send_disconnect(client, "Ping timeout :-p");
 
	distrend_send_disconnect(data->listens, client, "Ping timeout :-p");
 
	break;
 

	
 
      case DISTREND_CLIENT_BAD:
 
@@ -334,22 +329,27 @@ int distrend_accept_client_proc(struct d
 
	break;
 
      }
 

	
 
  if(!FD_ISSET(client->sock, data->fdset))
 
    /** continue iteration through the list */
 
    return TRUE;
 
  revents = data->listens->pollfds[client->sock.pollfd_offset].revents;
 
  if(revents & POLLRDNORM)
 
    /* handle reading */
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 
      distrend_handleread(data->listens, client);
 
    }
 
  if(revents & POLLWRNORM)
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 

	
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
 
	  __FILE__, __LINE__, client->sock,
 
	  (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ");
 

	
 
  switch(data->mode)
 
      if(q_empty(client->outmsgs))
 
    {
 
    case DISTREND_ACCEPT_CLIENT_WRITE:
 
      if(q_empty(client->outmsgs))
 
	  data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	return TRUE;
 
	}
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock, packet->data, packet->len);
 
      written_amount = write(client->sock.sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
@@ -369,6 +369,9 @@ int distrend_accept_client_proc(struct d
 
	{
 
	  q_dequeue(client->outmsgs);
 
	  distrend_packet_free(packet);
 

	
 
	  if(q_empty(client->outmsgs))
 
	    data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	}
 
      else
 
	{
 
@@ -376,36 +379,47 @@ int distrend_accept_client_proc(struct d
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	  memmove(packet->data, packet->data + written_amount, packet->len);
 
	}
 
      break;
 

	
 
    case DISTREND_ACCEPT_CLIENT_READ:
 
      distrend_handleread(data->config, client, data->handlereq, data->handlereqdata);
 
      break;
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock)
 
{
 
  short revents;
 
  int newclientsock;
 

	
 
  revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents;
 
  fprintf(stderr, "blah %d\n", (int)revents);
 
  if(revents & POLLRDNORM)
 
    {
 
      newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating/adding client struct\n");
 
	  return 1;
 
	}
 
      fprintf(stderr, "accepted new connection\n");
 
    }
 

	
 
  /**
 
     Check other listen blocks too.
 
   */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
{
 
  int tmp;
 
  int newclientsock;
 

	
 
  struct distrend_accept_fdset_prepare_data fdsets;
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  FD_ZERO(&fdsets.readfds);
 
  FD_ZERO(&fdsets.writefds);
 
  FD_SET(config->listens->sock, &fdsets.readfds);
 
  fdsets.maxfd = config->listens->sock;
 

	
 
  list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR);
 

	
 
  fprintf(stderr, "select()...");
 
  tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL);
 
  fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds);
 
  poll(listens->pollfds, listens->nfds, -1);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 
@@ -417,62 +431,42 @@ int distrend_accept(struct distrend_conf
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.config = config;
 
  travinfo.listens = listens;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 

	
 
  travinfo.fdset = &fdsets.writefds;
 
  travinfo.handlereq = handlereq;
 
  travinfo.handlereqdata = handlereqdata;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  travinfo.fdset = &fdsets.readfds;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 
  list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  if(FD_ISSET(config->listens->sock, &fdsets.readfds))
 
    {
 
      newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating client struct\n");
 
	  return 1;
 
	}
 
      clients->nfds ++;
 
      list_insert_after(clients->clients, newclient, 0);
 
      fprintf(stderr, "accepted new connection\n");
 
      distrend_client_write(newclient, "hello\n", 6);
 
    }
 

	
 
  list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE);
 
  /**
 
     Until liblist supports deleting nodes during a traversal,
 
     we clean dead clients here:
 
   */
 
  list_mvfront(clients->clients);
 
  newclient = list_curr(clients->clients);
 
  list_mvfront(listens->clients);
 
  newclient = list_curr(listens->clients);
 
  while(newclient)
 
    {
 
      if(newclient->state == DISTREND_CLIENT_DEAD)
 
	{
 
	  distrend_listen_poll_deletefd(listens, &newclient->sock);
 
	  distrend_client_free(newclient);
 
	  list_remove_curr(clients->clients);
 
	  list_remove_curr(listens->clients);
 
	  fprintf(stderr, "removed dead connection\n");
 
	}
 
      list_mvnext(clients->clients);
 
      list_mvnext(listens->clients);
 
      /* provide for termination of this loop */
 
      if(newclient == list_curr(clients->clients))
 
      if(newclient == list_curr(listens->clients))
 
	newclient = NULL;
 
      else
 
	newclient = list_curr(clients->clients);
 
	newclient = list_curr(listens->clients);
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients)
 
int distrend_listen_free(struct distrend_listens *listens)
 
{
 
  fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
 

	
 
@@ -486,40 +480,59 @@ void remotio_send_to_client(struct distr
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
 
}
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
 
/**
 
   Adds a newly connected distrend_client to the listens->clients list
 
 */
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state)
 
{
 
  int tmp;
 
  struct distrend_client *client;
 

	
 
  tmp = distrend_makesocknonblock(sock);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  *client = malloc(sizeof(struct distrend_client));
 
  if(!*client)
 
  client = malloc(sizeof(struct distrend_client));
 
  if(!client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      close(sock);
 
      return 1;
 
    }
 
  (*client)->sock = sock;
 
  (*client)->state = state;
 
  (*client)->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  (*client)->inlen = 0;
 
  (*client)->expectlen = 0;
 
  (*client)->inmsgs = q_init();
 
  (*client)->outmsgs = q_init();
 
  client->sock.sock = sock;
 
  distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */
 

	
 
  client->state = state;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  client->inlen = 0;
 
  client->expectlen = 0;
 
  client->inmsgs = q_init();
 
  client->outmsgs = q_init();
 

	
 
  list_insert_after(listens->clients, client, 0);
 

	
 
  /**
 
     For those using netcat/telnet to debug their internets.
 
   */
 
  distrend_client_write(listens, client, "distren\n", 8);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   This function shall only be called after the appropriate
 
   distrend_listen_poll_deletefd() has been called
 
 */
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock);
 
  close(client->sock.sock);
 
  
 
  return 0;
 
}
 
@@ -548,7 +561,7 @@ int distrend_makesocknonblock(int sock)
 
}
 

	
 

	
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen)
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen)
 
{
 
  struct distrend_packet *packet;
 

	
 
@@ -572,10 +585,12 @@ int distrend_client_write(struct distren
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM;
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data)
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data)
 
{
 
  char *towrite;
 
  int ret;
 
@@ -593,7 +608,7 @@ int distrend_client_write_request(struct
 
  memcpy(towrite, req, sizeof(struct distren_request));
 
  memcpy(towrite + sizeof(struct distren_request), data, req->len);
 

	
 
  ret = distrend_client_write(client, towrite, msglen);
 
  ret = distrend_client_write(listens, client, towrite, msglen);
 

	
 
  free(towrite);
 
  return ret;
 
@@ -670,12 +685,12 @@ int distrend_packets_collapse(QUEUE *que
 
}
 

	
 

	
 
int distrend_send_disconnect(struct distrend_client *client, char *quit_msg)
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg)
 
{
 
  struct distren_request *req;
 

	
 
  distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT);
 
  distrend_client_write_request(client, req, quit_msg);
 
  distrend_client_write_request(listens, client, req, quit_msg);
 
  distren_request_free(req);
 

	
 
  client->state = DISTREND_CLIENT_BAD;
 
@@ -683,3 +698,198 @@ int distrend_send_disconnect(struct dist
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler)
 
{
 
  struct distrend_request_handler_info *handler_info;
 

	
 
  handler_info = malloc(sizeof(struct distrend_request_handler_info));
 
  if(!handler_info)
 
    return 1;
 

	
 
  handler_info->request_type = type;
 
  handler_info->handler = handler;
 
  list_insert_after(listens->request_handlers, handler_info, 0);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_dispatch_request_data
 
{
 
  struct general_info *geninfo;
 
  struct distrend_client *client;
 
  struct distren_request *req;
 
  void *req_data;
 
};
 

	
 
/**
 
   traversal function for distrend_dispatch_request().
 
 */
 
int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info)
 
{
 
  if(handler_info->request_type == data->req->type)
 
    (*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data);
 

	
 
  return TRUE;
 
}
 

	
 
/**
 
   helper for distrend_handleread() which looks up the correct
 
   request handler and handles handing the the request to the
 
   handler. :-p
 
 */
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata)
 
{
 
  struct distrend_dispatch_request_data data;
 

	
 
  data.geninfo = listens->geninfo;
 
  data.client = client;
 
  data.req = req;
 
  data.req_data = reqdata;
 

	
 
  list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Helper for the distrend_listen_add() and functions that
 
   call accept() to maintain the struct pollfd list in listens.
 

	
 
   @param listens The info related to listening to sockets.
 
   @param fd the FD to add to the list of sockets.
 
   @param poll_events The poll() events to register in the list of sockets.
 
   @param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances.
 
 */
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events)
 
{
 
  size_t new_len;
 
  struct pollfd *new_pollfds;
 

	
 
  if(listens->nfds + 1 > listens->pollfds_len)
 
    {
 
      new_len = listens->pollfds_len * 2;
 
      new_pollfds = malloc(sizeof(struct pollfd) * new_len);
 
      if(!new_pollfds)
 
	return 1;
 

	
 
      memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len);
 

	
 
      free(listens->pollfds);
 
      listens->pollfds_len = new_len;
 
      listens->pollfds = new_pollfds;
 
    }
 

	
 
  sock->pollfd_offset = listens->nfds;
 
  listens->pollfds[sock->pollfd_offset].fd = sock->sock;
 
  listens->pollfds[sock->pollfd_offset].events = poll_events;
 
  listens->nfds ++;
 

	
 
  fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   Removes a particular struct pollfd from the listens->pollfds list. This
 
   will likely also require remapping at least one other existing pollfd
 
   entry, so this will also traverse the list of clients and update the
 
   entry_offset of the client which has to be remapped.
 

	
 
   @param listens the struct distrend_listens
 
   @param sock the relevent struct containing the offset of the pollfd that must be removed.
 
   @return 0 on success, 1 on error.
 
 */
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock)
 
{
 
  /*
 
    the socket which is being moved from the end of the list
 
    to the middle to fill in the hole left by removing sock
 
  */
 
  struct distrend_polled_sock *displaced_sock;
 

	
 
  /*
 
    special case: we're deleting the last element and thus
 
    don't need to remap an existing entry.
 
   */
 
  if(sock->pollfd_offset == listens->nfds - 1)
 
    {
 
      listens->nfds --;
 
      return 0;
 
    }
 

	
 
  /*
 
    Shift whatever's at the end of the pollfds array into the position
 
    of the pollfd being removed. Then update that client's struct distrend_client
 
    entry to point to the new offset.
 
   */
 
  memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd));
 
  displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1);
 
  if(!displaced_sock)
 
    {
 
      fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n",
 
	      (int)listens->nfds - 1, (int)listens->nfds);
 
      listens->nfds --;
 
      return 1;
 
    }
 

	
 
  displaced_sock->pollfd_offset = sock->pollfd_offset;
 

	
 
  listens->nfds --;
 
  return 0;
 
}
 

	
 
/**
 
   For _distrend_polled_sock_get_by_offset_traverse()
 
 */
 
struct distrend_polled_sock_get_by_offset_data
 
{
 
  struct distrend_polled_sock *sock;
 
  size_t pollfd_offset;
 
};
 

	
 
/**
 
   List traversal helper function for distrend_polled_sock_get_by_offset().
 

	
 
   @param sock The second argument is casting either sturct distrend_client or
 
   struct distrend_listen_sock to a struct distrend_polled_sock which works
 
   because each of the two former structs as a sturct distrend_polled_sock
 
   as its first member.
 
 */
 
int _distrend_polled_sock_get_by_offset_traverse(struct distrend_polled_sock_get_by_offset_data *data, struct distrend_polled_sock *sock)
 
{
 
  if(data->pollfd_offset == sock->pollfd_offset)
 
    {
 
      data->sock = sock;
 
      return FALSE;
 
    }
 
  return TRUE;
 
}
 

	
 
/**
 
   Returns a struct distrend_client based on its offset in the listens->pollfds
 
   array.
 

	
 
   @param listens the listens.
 
   @param pollfds_offset the index in the listens->pollfds array of the client we should find
 
   @return NULL if the client is not found or a pointer to the struct distrend_client for the specified client. Generally, you would not free() this but delete the client by first removing it from the listen->client list.
 
 */
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset)
 
{
 
  struct distrend_polled_sock_get_by_offset_data data;
 

	
 
  data.sock = NULL;
 
  data.pollfd_offset = pollfds_offset;
 

	
 
  /**
 
     These traversals both depend on struct distrend_polled_sock being
 
     the first entry of struct distrend_listens and struct distrend_client.
 
   */
 
  list_traverse(listens->clients, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
 
  if(data.sock == NULL)
 
    list_traverse(listens->listen_socks, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
 

	
 
  return data.sock;
 
}
 

	
src/server/listen.h
Show inline comments
 
@@ -17,8 +17,8 @@
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
struct distrend_clientset;
 
struct distrend_listen;
 
struct general_info;
 
struct distrend_listens;
 
struct distrend_client;
 

	
 
#ifndef _DISTREN_LISTEN_H
 
@@ -27,6 +27,7 @@ struct distrend_client;
 
#include "distrend.h"
 
#include "common/protocol.h"
 

	
 
#include <poll.h>
 
#include <queue.h>
 
#include <time.h>
 

	
 
@@ -69,15 +70,57 @@ enum distrend_client_state
 
    DISTREND_CLIENT_DEAD
 
  };
 

	
 
struct distrend_listen
 
/**
 
   inheritence in C !?!?
 
 */
 
struct distrend_polled_sock
 
{
 
  int sock;
 
  /* this socket's offset in the listens->pollfds array. */
 
  size_t pollfd_offset;
 
};
 

	
 
struct distrend_listen_sock
 
{
 
  /**
 
     sock must be first, see distrend_polled_socket_get_by_offset()
 
   */
 
  struct distrend_polled_sock sock;
 
  int port;
 
  int sock;
 
};
 

	
 
struct distrend_listens
 
{
 
  /* of type (struct distrend_listen_sock *) */
 
  list_t listen_socks;
 
  /* of type (struct distrend_request_handler_info) */
 
  list_t request_handlers;
 
  /* the data to pass onto all request handlers */
 
  struct general_info *geninfo;
 

	
 
  /* of type (struct distrend_client) */
 
  list_t clients;
 

	
 
  /* the array passed to poll() */
 
  struct pollfd *pollfds;
 
  /* the number of entries pollfds could hold */
 
  nfds_t pollfds_len;
 
  /* the number of entries that pollfds does hold */
 
  nfds_t nfds;
 
};
 

	
 

	
 
/**
 
   The information necessary to recieve data from and send data
 
   to a client.
 
 */
 
struct distrend_client
 
{
 
  int sock;
 
  /**
 
     sock must be first, see distrend_polled_socket_get_by_offset()
 
   */
 
  struct distrend_polled_sock sock;
 

	
 
  enum distrend_client_state state;
 

	
 
  /**
 
@@ -92,19 +135,45 @@ struct distrend_client
 
  size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */
 
  QUEUE *inmsgs;
 
  QUEUE *outmsgs;
 

	
 
};
 

	
 

	
 

	
 
typedef int(*distrend_handle_request_t)(struct distrend_client *client, struct distren_request *req, void *reqdata, void *data);
 
/**
 
   A function signature that may be registered as a client
 
   request handler.
 

	
 
   @param client the client that sent the request
 
   @param len the length of the message in bytes
 
   @param data the message received from the client
 
 */
 
typedef int(*distrend_handle_request_func_t)(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data);
 

	
 
/**
 
   initializes the listens and clientset
 
   @param config the configuration from distrend
 
   @param clients a pointer to a struct distrend_clientset pointer which will be set to memory allocated for the clientset
 
   Initializes the listens member of struct distrend_config.
 

	
 
   @param geninfo general info to apss to the request handler.
 
   @return Must be free()d with distrend_listen_free();
 
 */
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients);
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo);
 

	
 
/**
 
   Adds a socket and configures it to listen().
 

	
 

	
 
   @param listens The handle for this set of listens, obtained via distrend_listen_init().
 
 */
 
int distrend_listen_add(struct distrend_listens *listens, int port);
 

	
 
/**
 
   Register a request handler with the listener.
 

	
 
   @param config distrend's configuration
 
   @param type the request type for which this handler should be called
 
   @param handler the handler to call when a request of type type is received.
 
 */
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler);
 

	
 
/**
 
   checks states of the sockets I'm managing. If there are any new connections,
 
@@ -112,20 +181,22 @@ int distrend_listen(struct distrend_conf
 
   I will block until there is some sort of activity, including
 
   signals. If you want to cleanly shut down, it's best to register
 
   signal handlers somewhere
 

	
 
   @param listens the config->listens after being initialized with distrend_listen()
 
 */
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata);
 
int distrend_accept(struct distrend_listens *listens);
 

	
 
/**
 
   cleans listening socket. Unnecessary for a working server, currently a stub.
 
   cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub.
 
 */
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients);
 
int distrend_listen_free(struct distrend_listens *listens);
 

	
 
/**
 
   writes message to client.
 
   @param towrite the caller is expected to free this string. This function will
 
   strdup() it, in essence.
 
 */
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen);
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen);
 

	
 
/**
 
   writes request to client.
 
@@ -133,7 +204,7 @@ int distrend_client_write(struct distren
 
   @param req the request struct. caller must free this.
 
   @param data the data of the request which is req->len bytes long. caller must free this.
 
 */
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data);
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
@@ -144,7 +215,7 @@ void remotio_send_to_client();
 
   Queue a DISTREN_REQUEST_DISCONNECT and prepare a client
 
   to be disconnected.
 
 */
 
int distrend_send_disconnect(struct distrend_client *client, char *quit_msg);
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg);
 

	
 

	
 
#endif
0 comments (0 inline, 0 general)