Changeset - 3efc48659d0b
[Not reviewed]
default
0 4 0
Nathan Brink (binki) - 15 years ago 2010-07-02 01:17:27
ohnobinki@ohnopublishing.net
Use poll() instead of select() (poll() is the thing to use nowadays ;-) ).
4 files changed with 508 insertions and 218 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
@@ -38,341 +38,355 @@
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 

	
 
/* ******************* Structs ************************ */
 
struct general_info
 
{
 
  struct distrenjob head;
 
  distrend_mysql_conn_t conn;
 

	
 
  struct distrend_config *config;
 

	
 
  struct
 
  {
 
    /** general_info.xml */
 
    char *geninfo;
 
    
 
  } files;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 
/* *********************************************
 
   Function Prototypes
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
int distrend_do();
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 

	
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int update_xml_joblist(struct general_info *geninfo);
 

	
 
/* **************Test Functions**************** */
 
int interactiveTest(int test, struct general_info *general_info);
 

	
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  int tmp;
 
  struct general_info general_info;
 
  struct distrend_clientset *clients;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
  } clientstatus;
 

	
 
  clientstatus = CLIENTSTATUS_UNINITIALIZED;
 
  // xmlinit();
 

	
 
  for(counter = 0; counter < argc; counter ++)
 
    {
 
      if(strcmp(argv[counter], "-h") == 0)
 
      {
 
    	  fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
 
	  return 2;
 
      }
 

	
 
      else if(strcmp(argv[counter], "-t") == 0)
 
      {
 
    	  fprintf(stderr, "Entering into test mode...\n\n");
 
    	  test = 1;
 
      }
 
    }
 

	
 

	
 
  if(distrend_do_config(argc, argv, &general_info.config))
 
    return 1;
 

	
 
  /** preset paths */
 
  _distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml",
 
		    general_info.config->datadir);
 

	
 
  /** MySQL Connection */
 
  fprintf(stderr,"Connecting to mysql...\n");
 
  if(mysqlConnect(&general_info.conn,
 
		  general_info.config->mysql_user,
 
		  general_info.config->mysql_host,
 
		  general_info.config->mysql_pass,
 
		  general_info.config->mysql_database) )
 
    {
 
      fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__);
 
      fprintf(stderr, "don't test mysql stuff\n");
 
      interactiveTest(test, &general_info);
 
      return 1;
 
    }
 
  fprintf(stderr,"Finished connecting!\n");
 

	
 
  /** Execute test function */
 
  interactiveTest(test, &general_info);
 

	
 
  if(distrend_listen(general_info.config, &clients))
 
  general_info.config->listens = distrend_listens_new(&general_info);
 
  if(!general_info.config->listens)
 
    {
 
      fprintf(stderr, "error listening\n");
 
      fprintf(stderr, "error initializing listens\n");
 
      return 1;
 
    }
 
  for(counter = 0; general_info.config->listen_ports[counter]; counter ++)
 
    {
 
      tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]);
 
      if(tmp)
 
	{
 
	  fprintf(stderr, "Error listening on port %d\n", general_info.config->listen_ports[counter]);
 
	  return 1;
 
	}
 
    }
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  general_info.config->die = 0;
 
  while(!general_info.config->die)
 
    {
 
      int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)&general_info);
 
      distrend_accept(general_info.config->listens);
 

	
 
      /* Run the watchdog, @TODO: like every 10 mins or something */
 
      frame_watchdog(general_info.conn);
 

	
 
      struct frameset frame;
 
      struct distrenjob *job;
 
      distrenjob_new(&job);
 

	
 
      memset(&frame, '\0', sizeof(frame));
 

	
 
      /** If client requests work */
 
      if(clientrequest == DISTREN_REQUEST_GETWORK)
 
	{
 
	  int returnnum = find_jobframe(general_info.conn, slaveKey, &job->jobnum, &frame.num); // Finds a frame to render @FIXME: Slavenum :D
 
	  if(returnnum); /* No work on server */
 
	  else
 
	    remotio_send_to_client(frame.num, job->jobnum); // Pseudo-sends data to client
 
	}
 
      /* If the client states that they finished the frame */
 
      else if(clientrequest == DISTREN_REQUEST_DONEFRAME)
 
	{
 
	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
	  finish_frame(general_info.conn, 0, job->jobnum, frame.num); // @TODO: Make sure this actually works.
 
	}
 
      distrenjob_free(&job);
 
    }
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_listen_free(general_info.config->listens);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
  mysqlDisconnect(general_info.conn);
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
{
 
  size_t counter;
 
  char *tmp_str;
 
  char fixedbuf[32];
 

	
 
  /* for response requests... if that makes any less sense ;-) */
 
  struct distren_request *newreq;
 

	
 
  fprintf(stderr, "I can haz data %d bytes long:", req->len);
 
  for(counter = 0; counter < req->len; counter ++)
 
    fputc(((char *)reqdata)[counter], stderr);
 
  fputc('\n', stderr);
 

	
 
  switch(req->type)
 
    {
 
    case DISTREN_REQUEST_VERSION:
 
      if(strlen(PACKAGE_STRING) == req->len
 
	 && !strncmp(PACKAGE_STRING, reqdata, req->len))
 
	{
 
	  /**
 
	     The client and I claim to be of the same version of distren :-D
 
	     Now we will mark the client as valid.
 
	   */
 
	  
 
	}
 
      else
 
	{
 
	  /**
 
	     The client claims to be of a different version of distren.
 
	     Now we will just send a disconnect packet and disconnect the client.
 
	   */
 
	  strncpy(fixedbuf, reqdata, 31);
 
	  fixedbuf[31] = '\0';
 
	  if(req->len < 31)
 
	    fixedbuf[req->len] = '\0';
 

	
 
	  _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, fixedbuf);
 
	  distrend_send_disconnect(client, tmp_str);
 
	  distrend_send_disconnect(listens, client, tmp_str);
 
	}
 

	
 
      distren_request_new(&newreq, strlen(VERSION), DISTREN_REQUEST_VERSION);
 
      distrend_client_write_request(client, newreq, VERSION);
 
      distrend_client_write_request(listens, client, newreq, VERSION);
 
      distren_request_free(newreq);
 
      break;
 
    }
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 

	
 
/* Grabs config info from confs */
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
 
{
 
  unsigned int counter;
 

	
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_STR_LIST("render_types", NULL, CFGF_NONE),
 
      CFG_SIMPLE_STR("mysql_user", NULL),
 
      CFG_SIMPLE_STR("mysql_host", NULL),
 
      CFG_SIMPLE_STR("mysql_pass", NULL),
 
      CFG_SIMPLE_STR("mysql_database", NULL),
 
      CFG_END()
 
    };
 

	
 
  cfg_t *cfg_listen;
 

	
 
  fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__);
 
  *config = malloc(sizeof(struct distrend_config));
 
  myopts[1].simple_value = &(*config)->datadir;
 
  myopts[3].simple_value = &(*config)->mysql_user;
 
  myopts[4].simple_value = &(*config)->mysql_host;
 
  myopts[5].simple_value = &(*config)->mysql_pass;
 
  myopts[6].simple_value = &(*config)->mysql_database;
 

	
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  (*config)->listen_ports = malloc(sizeof(int) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
 
      (*config)->listens[counter].sock = -1;
 
      (*config)->listen_ports[counter] = cfg_getint(cfg_listen, "port");
 
    }
 
  memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
 
  (*config)->listen_ports[counter] = 0;
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 

	
 
  return 0;
 
}
 

	
 
int distrend_config_free(struct distrend_config *config)
 
{
 
  distrend_listen_free(config->listens);
 
  options_free(config->options);
 
  free(config->listen_ports);
 
  free(config);
 

	
 
  return 0;
 
}
 

	
 
/* ************************** XML Functions ************************* */
 

	
 
// writes the general_info.xml file which is a copy of the general_info structure
 
// except that it doesn't hold free_clients and rendering_clients
 
void update_general_info(struct general_info *geninfo)
 
{
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 

	
 
  writer = xmlNewTextWriterFilename(geninfo->files.geninfo, 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  xmlTextWriterStartElement(writer, (xmlChar*)"general_info");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->highest_jobnum);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterEndDocument(writer);
 
  xmlFreeTextWriter(writer);
 
}
 

	
 
/**
 
   Reads general state information from general_info.xml
 
   into the general_info structure.
 
*/
 
int import_general_info(struct general_info *general_info)
 
{
 
  xmlDocPtr doc;
 
  xmlNodePtr cur;
 

	
 
  doc = xmlParseFile(general_info->files.geninfo);
 
  cur = xmlDocGetRootElement(doc);
 
  if (xmlStrcmp(cur->name, (xmlChar*)"general_info"))
 
    {
 
      fprintf(stderr, "xml document is wrong type");
 
@@ -437,151 +451,145 @@ int update_xml_joblist(struct general_in
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  geninfo->total_priority_pieces = 0;
 
  counter = 0;
 
  for(job = geninfo->head.next; job; job = job->next)
 
    {
 
      _distren_asprintf(&tmp, "%d", job->jobnum);
 
      xmlTextWriterWriteElement(writer, (xmlChar*)"jobnum", (xmlChar*)tmp);
 
      free(tmp);
 

	
 
      /**
 
        this is needed for the new frame finder to work
 
      
 
        Why the random constant numeral 11? --ohnobinki
 
      */
 
      geninfo->total_priority_pieces = geninfo->total_priority_pieces + job->priority;
 

	
 
      counter++;
 
    }
 

	
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     close elements and end document
 
  */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk
 
  */
 
  xmlFreeTextWriter(writer);
 
  return 0;
 
}
 

	
 
/* ************************** Test Functions ************************* */
 

	
 

	
 
/** Interactive test for the queuing system */
 
/* @QUEUE: Test uses methods not present in C code using mysql web-based system */
 
int interactiveTest(int test, struct general_info *geninfo)
 
{
 
  int command;
 
  int32_t slaveKey = 1;
 
  jobnum_t jobKey = 0;
 
  int32_t frameNum = 0;
 
  int32_t newPriority = 0;
 
  int tmp = 0;
 
  struct distrend_clientset *clients;
 

	
 
  fprintf(stderr,"Hello!\n");
 

	
 
  while(test == 1)
 
   {
 
     fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
 
     fprintf(stderr, "\t1 \tGet a frame to render\n");
 
     fprintf(stderr, "\t2 \tChange job priority\n");
 
     fprintf(stderr, "\t3 \tSet frame finished\n");
 
     fprintf(stderr, "\t4 \tSet frame started\n");
 
     fprintf(stderr, "\t5 \tStart listener\n");
 
     fprintf(stderr, "\t0 \tQuit\n");
 

	
 
     scanf("%d", &command);
 

	
 
     switch(command)
 
     {
 
     case 1:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &slaveKey);
 
       fprintf(stderr, "Got frame: ");
 
       if(find_jobframe(geninfo->conn, slaveKey, &jobKey, &frameNum))
 
         fprintf(stderr,"No frames available to render!\n");
 
       else if(jobKey == -1)
 
         fprintf(stderr,"Slave %d has no render power!", slaveKey);
 
       else
 
         fprintf(stderr, "jobKey: %d, frameNum: %d\n",jobKey,frameNum);
 
       break;
 
     case 2:
 
       fprintf(stderr,"Job key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"New priority: ");
 
       scanf("%d", &tmp);
 
       newPriority = tmp;
 

	
 
       change_job_priority(geninfo->conn, jobKey, newPriority);
 
       fprintf(stderr,"Changed!");
 
       break;
 
     case 3:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       finish_frame(geninfo->conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Finished Frame!\n");
 
       break;
 
     case 4:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       start_frame(geninfo->conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Started Frame!\n");
 
       break;
 

	
 
     case 5:
 
       if(distrend_listen(geninfo->config, &clients))
 
	 {
 
	   fprintf(stderr, "error listening\n");
 
	   return 1;
 
	 }
 
       while(1)
 
	 {
 
	   distrend_accept(geninfo->config, clients, (distrend_handle_request_t)&distrend_handle_request, (void *)geninfo);
 
	   distrend_accept(geninfo->config->listens);
 
	   /*
 
	     code for reading data from clients either goes here or in distrend_accept().
 
	     it might make sense for us to just pass the distrend_accept() function a
 
	     callback which can handle packets or to have a generic packet handling
 
	     subsystem which gathers data into groups defined by by packet.h and then
 
	     passed onto the callback.
 
	   */
 
	 }
 
       break;
 
       
 
     case 0:
 
       test = 0;
 
       break;
 
     default:
 
       fprintf(stderr, "Invalid input, please try again.\n");
 
       break;
 
     }
 
   }
 
  return 0;
 
}
src/server/distrend.h
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
struct distrend_config;
 

	
 
#ifndef _DISTREN_DISTREN_H_
 
#define _DISTREN_DISTREN_H_
 

	
 
#include <confuse.h>
 

	
 
#include "listen.h"
 

	
 
struct distrend_config
 
{
 
  cfg_t *mycfg;
 
  struct options_common *options;
 
  struct distrend_listen *listens; /*< Null terminated array of structs */
 
  struct distrend_listens *listens;
 
  int *listen_ports;
 
  char *datadir;
 

	
 
  int die;
 

	
 
  char *mysql_user;
 
  char *mysql_host;
 
  char *mysql_pass;
 
  char *mysql_database;
 
};
 

	
 
#endif
src/server/listen.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "listen.h"
 
#include "common/protocol.h"
 

	
 
#include <errno.h>
 
#include <fcntl.h>
 
#include <list.h>
 
#include <malloc.h>
 
#include <netinet/in.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#include <sys/select.h>
 
#include <poll.h>
 
#include <sys/socket.h>
 
#include <unistd.h>
 

	
 
/* local */
 

	
 
struct distrend_clientset
 
{
 
  LIST *clients;
 

	
 
  int nfds;
 
};
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
 
struct distrend_request_handler_info
 
{
 
  enum distren_request_type request_type;
 
  distrend_handle_request_func_t handler;
 
};
 

	
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 
int distrend_packets_collapse(QUEUE *queue, size_t len);
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata);
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset);
 

	
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events);
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock);
 

	
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo)
 
{
 
  struct distrend_listens *listens;
 

	
 
  listens = malloc(sizeof(struct distrend_listens));
 
  if(!listens)
 
    return NULL;
 

	
 
  listens->listen_socks = list_init();
 
  listens->request_handlers = list_init();
 
  listens->clients = list_init();
 
  listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */
 
  listens->pollfds_len = 2;
 
  listens->nfds = 0;
 
  listens->geninfo = geninfo;
 

	
 
  if(!listens->listen_socks
 
     || !listens->request_handlers
 
     || !listens->clients
 
     || !listens->pollfds)
 
    {
 
      if(listens->listen_socks)
 
	list_free(listens->listen_socks, LIST_NODEALLOC);
 
      if(listens->request_handlers)
 
	list_free(listens->request_handlers, LIST_NODEALLOC);
 
      if(listens->clients)
 
	list_free(listens->clients, LIST_NODEALLOC);
 
      free(listens->pollfds);
 
      free(listens);
 
      return NULL;
 
    }
 

	
 
  return listens;
 
}
 

	
 
/**
 
   clean up after a partially completed distrend_listen()
 
   call which ends in error.
 
 */
 
int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter)
 
int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo)
 
{
 
  int sock;
 

	
 
  for(counter ++; counter > 0; counter --)
 
    {
 
      sock = config->listens[counter - 1].sock;
 
      if(sock >= 0)
 
	close(sock);
 
    }
 
  list_free(clients->clients, NULL);
 
  free(clients);
 
  sock = sockinfo->sock.sock;
 
  if(sock >= 0)
 
    close(sock);
 

	
 
  free(sockinfo);
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
 
int distrend_listen_add(struct distrend_listens *listens, int port)
 
{
 
  int tmp;
 
  size_t counter;
 
  struct distrend_listen_sock *newsock;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
      .sin6_family = AF_INET6,
 
      .sin6_port = 0,
 
      .sin6_flowinfo = 0,
 
      .sin6_addr = IN6ADDR_ANY_INIT,
 
      .sin6_scope_id = 0
 
    };
 

	
 
  *clients = malloc(sizeof(struct distrend_clientset));
 
  sockaddr.sin6_port = htons(port);
 

	
 
  (*clients)->clients = list_init();
 
  (*clients)->nfds = 1;
 
  newsock = malloc(sizeof(struct distrend_listen_sock));
 
  if(!newsock)
 
    return 1;
 

	
 
  for(counter = 0; config->listens[counter].port; counter ++)
 
  newsock->port = port;
 
  newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  if(tmp == -1)
 
    {
 
      sockaddr.sin6_port = htons(config->listens[counter].port);
 
      perror("bind");
 
      distrend_listen_add_unwind(newsock);
 

	
 
      config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0);
 
      tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
      if(tmp == -1)
 
	{
 
	  perror("bind");
 
	  distrend_listen_unwind(config, *clients, counter);
 
      return 1;
 
    }
 

	
 
	  return 1;
 
	}
 

	
 
      tmp = listen(config->listens->sock, 1);
 
      if(tmp == -1)
 
	{
 
	  perror("listen");
 
	  distrend_listen_unwind(config, *clients, counter);
 
  tmp = listen(newsock->sock.sock, 1);
 
  if(tmp == -1)
 
    {
 
      perror("listen");
 
      distrend_listen_add_unwind(newsock);
 
      
 
	  return 1;
 
	}
 
      return 1;
 
    }
 

	
 
      tmp = distrend_makesocknonblock(config->listens->sock);
 
      if(tmp)
 
	{
 
	  /* error is already printed to user by distrend_makesocknonblock() */
 
	  distrend_listen_unwind(config, *clients, counter);
 
	  return 1;
 
	}
 
  tmp = distrend_makesocknonblock(newsock->sock.sock);
 
  if(tmp)
 
    {
 
      /* error is already printed to user by distrend_makesocknonblock() */
 
      distrend_listen_add_unwind(newsock);
 
      return 1;
 
    }
 

	
 
  distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM);
 
  list_insert_after(listens->listen_socks, newsock, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_handleread(struct distrend_config *config,
 
			struct distrend_client *client,
 
			distrend_handle_request_t handlereq,
 
			void *handlereqdata)
 
int distrend_handleread(struct distrend_listens *listens,
 
			struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
  char buf[8192];
 

	
 
  struct distren_request *req;
 
  void *reqdata;
 

	
 
  /**
 
     we have to check that there's new data so that
 
     we may call ourself recursively
 
  */
 
  fd_set readfd;
 
  struct timeval selecttimeout;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
  FD_ZERO(&readfd);
 
  FD_SET(client->sock, &readfd);
 
  memset(&selecttimeout, '\0', sizeof(selecttimeout));
 
  select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout);
 
  if(FD_ISSET(client->sock, &readfd))
 
    {
 
      readlen = read(client->sock, buf, sizeof(buf));
 
  //  if(revents & POLLRDNORM)
 
  //{
 
      readlen = read(client->sock.sock, buf, sizeof(buf));
 
      if(readlen == -1)
 
	{
 
	  perror("read");
 
	  switch(errno)
 
	    {
 
	    case EINTR:
 
	    case EAGAIN:
 
	      break;
 

	
 
	    default:
 
	      client->state = DISTREND_CLIENT_DEAD;
 

	
 
	      free(packet);
 
	      return 1;
 
	    }
 
	}
 
      if(readlen == 0)
 
	/* handle EOF */
 
	{
 
	  client->state = DISTREND_CLIENT_DEAD;
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 

	
 
      packet->len = readlen;
 
      packet->data = malloc(readlen);
 
      if(!packet->data)
 
	{
 
	  fprintf(stderr, "OOM!\n");
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 
      memcpy(packet->data, buf, readlen);
 
      q_enqueue(client->inmsgs, packet, 0);
 
      client->inlen += packet->len;
 
      packet = NULL;
 
    } /* if(FD_ISSET(client->sock, &readfd)) */
 
      //    } /*  */
 

	
 
  /**
 
     Manage input, etc.
 
   */
 
  if(client->expectlen == 0)
 
    {
 
      /* search out header from input so far */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(packet->len > sizeof(struct distren_request))
 
	{
 
	  if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	    {
 
	      fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
 

	
 
	      client->state = DISTREND_CLIENT_DEAD;
 
	      return 1;
 
	    }
 
	  client->expectlen = req->len + sizeof(struct distren_request);
 
	  distren_request_free(req);
 
	}
 
    }
 
  if(client->expectlen
 
     && client->inlen >= client->expectlen)
 
    {
 
      /** essentially un-queue-ize the queue ;-) */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	{
 
	  fprintf(stderr, "error handling data from client\n");
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      if(packet->len - sizeof(struct distren_request) < req->len)
 
	{
 
	  fprintf(stderr, "error handling some data from client\n");
 
	  distren_request_free(req);
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      reqdata = malloc(req->len);
 
      if(!reqdata)
 
	{
 
	  fprintf(stderr, "OOM\n");
 

	
 
	  distren_request_free(req);
 
	  return 1;
 
	}
 
      memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len);
 
      memmove(packet->data,
 
	      ((void *)packet->data) + client->expectlen,
 
	      packet->len - client->expectlen);
 
      packet->len -= client->expectlen;
 

	
 
      client->inlen -= client->expectlen;
 
      client->expectlen = 0;
 

	
 
      (*handlereq)(client, req, reqdata, handlereqdata);
 
      distrend_dispatch_request(listens, client, req, reqdata);
 
      distren_request_free(req);
 
    }
 
  return 0;
 
}
 

	
 
struct distrend_accept_fdset_prepare_data
 
{
 
  fd_set readfds;
 
  fd_set writefds;
 
  /**
 
     stores the highest FD number which is one less than the first
 
   arfgument to select(). So do select(maxfd + 1, ...)
 
  */
 
  int maxfd;
 
};
 
int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data,
 
				  struct distrend_client *client)
 
{
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  if(client->sock > data->maxfd)
 
    data->maxfd = client->sock;
 
  FD_SET(client->sock, &data->readfds);
 
  /**
 
     Only select on an outgoing socket if we have something to
 
     say.
 
   */
 
  if(!q_empty(client->outmsgs))
 
    FD_SET(client->sock, &data->writefds);
 
  
 
  return TRUE;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  fd_set *fdset;
 
  struct distrend_config *config;
 
  distrend_handle_request_t handlereq;
 
  void *handlereqdata;
 
  enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
 
  struct distrend_listens *listens;
 
  time_t current_time;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 
  short revents;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  /**
 
     Manage timing-out clients.
 
   */
 
  if(data->current_time > client->cleanup_time)
 
    switch(client->state)
 
      {
 
      case DISTREND_CLIENT_PREAUTH:
 
	distrend_send_disconnect(client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	break;
 

	
 
      case DISTREND_CLIENT_GOOD:
 
	distrend_send_disconnect(client, "Ping timeout :-p");
 
	distrend_send_disconnect(data->listens, client, "Ping timeout :-p");
 
	break;
 

	
 
      case DISTREND_CLIENT_BAD:
 
	client->state = DISTREND_CLIENT_DEAD;
 
	return TRUE;
 

	
 
      default:
 
	break;
 
      }
 

	
 
  if(!FD_ISSET(client->sock, data->fdset))
 
    /** continue iteration through the list */
 
    return TRUE;
 
  revents = data->listens->pollfds[client->sock.pollfd_offset].revents;
 
  if(revents & POLLRDNORM)
 
    /* handle reading */
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 
      distrend_handleread(data->listens, client);
 
    }
 
  if(revents & POLLWRNORM)
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 

	
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
 
	  __FILE__, __LINE__, client->sock,
 
	  (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ");
 

	
 
  switch(data->mode)
 
    {
 
    case DISTREND_ACCEPT_CLIENT_WRITE:
 
      if(q_empty(client->outmsgs))
 
	return TRUE;
 
	{
 
	  data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	  return TRUE;
 
	}
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock, packet->data, packet->len);
 
      written_amount = write(client->sock.sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
      if(written_amount == -1)
 
	{
 
	  perror("write");
 
	  /*
 
	    distrend_client_free();
 

	
 
	    Until liblist has the ability to delete nodes
 
	    during traversal, we'll have to free the client
 
	    during a client-list cleanup/pruning cycle.
 
	  */
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	}
 
      if(packet->len == written_amount)
 
	{
 
	  q_dequeue(client->outmsgs);
 
	  distrend_packet_free(packet);
 

	
 
	  if(q_empty(client->outmsgs))
 
	    data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	}
 
      else
 
	{
 
	  /**
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	  memmove(packet->data, packet->data + written_amount, packet->len);
 
	}
 
      break;
 

	
 
    case DISTREND_ACCEPT_CLIENT_READ:
 
      distrend_handleread(data->config, client, data->handlereq, data->handlereqdata);
 
      break;
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock)
 
{
 
  short revents;
 
  int newclientsock;
 

	
 
  revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents;
 
  fprintf(stderr, "blah %d\n", (int)revents);
 
  if(revents & POLLRDNORM)
 
    {
 
      newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating/adding client struct\n");
 
	  return 1;
 
	}
 
      fprintf(stderr, "accepted new connection\n");
 
    }
 

	
 
  /**
 
     Check other listen blocks too.
 
   */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
{
 
  int tmp;
 
  int newclientsock;
 

	
 
  struct distrend_accept_fdset_prepare_data fdsets;
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  FD_ZERO(&fdsets.readfds);
 
  FD_ZERO(&fdsets.writefds);
 
  FD_SET(config->listens->sock, &fdsets.readfds);
 
  fdsets.maxfd = config->listens->sock;
 

	
 
  list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR);
 

	
 
  fprintf(stderr, "select()...");
 
  tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL);
 
  fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds);
 
  poll(listens->pollfds, listens->nfds, -1);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 

	
 
      return 1;
 
    }
 

	
 
  /**
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.config = config;
 
  travinfo.listens = listens;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 

	
 
  travinfo.fdset = &fdsets.writefds;
 
  travinfo.handlereq = handlereq;
 
  travinfo.handlereqdata = handlereqdata;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  travinfo.fdset = &fdsets.readfds;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 
  list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  if(FD_ISSET(config->listens->sock, &fdsets.readfds))
 
    {
 
      newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating client struct\n");
 
	  return 1;
 
	}
 
      clients->nfds ++;
 
      list_insert_after(clients->clients, newclient, 0);
 
      fprintf(stderr, "accepted new connection\n");
 
      distrend_client_write(newclient, "hello\n", 6);
 
    }
 

	
 
  list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE);
 
  /**
 
     Until liblist supports deleting nodes during a traversal,
 
     we clean dead clients here:
 
   */
 
  list_mvfront(clients->clients);
 
  newclient = list_curr(clients->clients);
 
  list_mvfront(listens->clients);
 
  newclient = list_curr(listens->clients);
 
  while(newclient)
 
    {
 
      if(newclient->state == DISTREND_CLIENT_DEAD)
 
	{
 
	  distrend_listen_poll_deletefd(listens, &newclient->sock);
 
	  distrend_client_free(newclient);
 
	  list_remove_curr(clients->clients);
 
	  list_remove_curr(listens->clients);
 
	  fprintf(stderr, "removed dead connection\n");
 
	}
 
      list_mvnext(clients->clients);
 
      list_mvnext(listens->clients);
 
      /* provide for termination of this loop */
 
      if(newclient == list_curr(clients->clients))
 
      if(newclient == list_curr(listens->clients))
 
	newclient = NULL;
 
      else
 
	newclient = list_curr(clients->clients);
 
	newclient = list_curr(listens->clients);
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients)
 
int distrend_listen_free(struct distrend_listens *listens)
 
{
 
  fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
 

	
 
  return 1;
 
}
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
 
{
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
 
}
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
 
/**
 
   Adds a newly connected distrend_client to the listens->clients list
 
 */
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state)
 
{
 
  int tmp;
 
  struct distrend_client *client;
 

	
 
  tmp = distrend_makesocknonblock(sock);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  *client = malloc(sizeof(struct distrend_client));
 
  if(!*client)
 
  client = malloc(sizeof(struct distrend_client));
 
  if(!client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      close(sock);
 
      return 1;
 
    }
 
  (*client)->sock = sock;
 
  (*client)->state = state;
 
  (*client)->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  (*client)->inlen = 0;
 
  (*client)->expectlen = 0;
 
  (*client)->inmsgs = q_init();
 
  (*client)->outmsgs = q_init();
 
  client->sock.sock = sock;
 
  distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */
 

	
 
  client->state = state;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  client->inlen = 0;
 
  client->expectlen = 0;
 
  client->inmsgs = q_init();
 
  client->outmsgs = q_init();
 

	
 
  list_insert_after(listens->clients, client, 0);
 

	
 
  /**
 
     For those using netcat/telnet to debug their internets.
 
   */
 
  distrend_client_write(listens, client, "distren\n", 8);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   This function shall only be called after the appropriate
 
   distrend_listen_poll_deletefd() has been called
 
 */
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock);
 
  close(client->sock.sock);
 
  
 
  return 0;
 
}
 

	
 
void distrend_packet_free(struct distrend_packet *packet)
 
{
 
  free(packet->data);
 
  free(packet); 
 
}
 

	
 
int distrend_makesocknonblock(int sock)
 
{
 
  int fdstatus;
 
  int tmp;
 

	
 
  fdstatus = fcntl(sock, F_GETFL);
 
  fdstatus |= O_NONBLOCK;
 
  tmp = fcntl(sock, F_SETFL, fdstatus);
 
  if(tmp == -1)
 
    {
 
      perror("fcntl");
 
      return 1;
 
    }
 
  
 
  return 0;
 
}
 

	
 

	
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen)
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen)
 
{
 
  struct distrend_packet *packet;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  packet->len = msglen;
 
  packet->data = malloc(msglen);
 
  if(!packet->data)
 
    {
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, towrite, msglen);
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM;
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data)
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data)
 
{
 
  char *towrite;
 
  int ret;
 
  size_t msglen;
 

	
 
  msglen = sizeof(struct distren_request) + req->len;
 

	
 
  towrite = malloc(msglen);
 
  if(!towrite)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(towrite, req, sizeof(struct distren_request));
 
  memcpy(towrite + sizeof(struct distren_request), data, req->len);
 

	
 
  ret = distrend_client_write(client, towrite, msglen);
 
  ret = distrend_client_write(listens, client, towrite, msglen);
 

	
 
  free(towrite);
 
  return ret;
 
}
 

	
 
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
 
{
 
  struct distrend_packet *packet;
 

	
 
  *lenread = 0;
 
  *toread = NULL;
 

	
 
  if(q_empty(client->inmsgs))
 
    return 1;
 

	
 
  packet = q_dequeue(client->inmsgs);
 
  *lenread = packet->len;
 
  *toread = packet->data;
 
  free(packet);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   collapse a QUEUE of struct distrend_packet with a collective length
 
   of len into a single struct packet.
 
 */
 
int distrend_packets_collapse(QUEUE *queue, size_t len)
 
{
 
  struct distrend_packet *newpacket;
 
  struct distrend_packet *oldpacket;
 

	
 
  size_t copiedlen;
 

	
 
  newpacket = malloc(sizeof(struct distrend_packet));
 
  if(!newpacket)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  
 
  newpacket->data = malloc(len);
 
  if(!newpacket->data)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      perror("malloc");
 
      free(newpacket);
 

	
 
      return 1;
 
    }
 

	
 
  for(copiedlen = 0;
 
      copiedlen < len && !q_empty(queue);
 
      )
 
    {
 
      oldpacket = q_dequeue(queue);
 

	
 
      memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len);
 

	
 
      copiedlen += oldpacket->len;
 
      distrend_packet_free(oldpacket);
 
    }
 
  newpacket->len = copiedlen;
 

	
 
  if(copiedlen < len || !q_empty(queue))
 
    {
 
      fprintf(stderr, "inconsistency!\n");
 
      return 1;
 
    }
 

	
 
  q_enqueue(queue, newpacket, 0);
 

	
 
  return 0;
 
}
 

	
 

	
 
int distrend_send_disconnect(struct distrend_client *client, char *quit_msg)
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg)
 
{
 
  struct distren_request *req;
 

	
 
  distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT);
 
  distrend_client_write_request(client, req, quit_msg);
 
  distrend_client_write_request(listens, client, req, quit_msg);
 
  distren_request_free(req);
 

	
 
  client->state = DISTREND_CLIENT_BAD;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_DISCONNECT_GRACE;
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler)
 
{
 
  struct distrend_request_handler_info *handler_info;
 

	
 
  handler_info = malloc(sizeof(struct distrend_request_handler_info));
 
  if(!handler_info)
 
    return 1;
 

	
 
  handler_info->request_type = type;
 
  handler_info->handler = handler;
 
  list_insert_after(listens->request_handlers, handler_info, 0);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_dispatch_request_data
 
{
 
  struct general_info *geninfo;
 
  struct distrend_client *client;
 
  struct distren_request *req;
 
  void *req_data;
 
};
 

	
 
/**
 
   traversal function for distrend_dispatch_request().
 
 */
 
int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info)
 
{
 
  if(handler_info->request_type == data->req->type)
 
    (*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data);
 

	
 
  return TRUE;
 
}
 

	
 
/**
 
   helper for distrend_handleread() which looks up the correct
 
   request handler and handles handing the the request to the
 
   handler. :-p
 
 */
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata)
 
{
 
  struct distrend_dispatch_request_data data;
 

	
 
  data.geninfo = listens->geninfo;
 
  data.client = client;
 
  data.req = req;
 
  data.req_data = reqdata;
 

	
 
  list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Helper for the distrend_listen_add() and functions that
 
   call accept() to maintain the struct pollfd list in listens.
 

	
 
   @param listens The info related to listening to sockets.
 
   @param fd the FD to add to the list of sockets.
 
   @param poll_events The poll() events to register in the list of sockets.
 
   @param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances.
 
 */
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events)
 
{
 
  size_t new_len;
 
  struct pollfd *new_pollfds;
 

	
 
  if(listens->nfds + 1 > listens->pollfds_len)
 
    {
 
      new_len = listens->pollfds_len * 2;
 
      new_pollfds = malloc(sizeof(struct pollfd) * new_len);
 
      if(!new_pollfds)
 
	return 1;
 

	
 
      memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len);
 

	
 
      free(listens->pollfds);
 
      listens->pollfds_len = new_len;
 
      listens->pollfds = new_pollfds;
 
    }
 

	
 
  sock->pollfd_offset = listens->nfds;
 
  listens->pollfds[sock->pollfd_offset].fd = sock->sock;
 
  listens->pollfds[sock->pollfd_offset].events = poll_events;
 
  listens->nfds ++;
 

	
 
  fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   Removes a particular struct pollfd from the listens->pollfds list. This
 
   will likely also require remapping at least one other existing pollfd
 
   entry, so this will also traverse the list of clients and update the
 
   entry_offset of the client which has to be remapped.
 

	
 
   @param listens the struct distrend_listens
 
   @param sock the relevent struct containing the offset of the pollfd that must be removed.
 
   @return 0 on success, 1 on error.
 
 */
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock)
 
{
 
  /*
 
    the socket which is being moved from the end of the list
 
    to the middle to fill in the hole left by removing sock
 
  */
 
  struct distrend_polled_sock *displaced_sock;
 

	
 
  /*
 
    special case: we're deleting the last element and thus
 
    don't need to remap an existing entry.
 
   */
 
  if(sock->pollfd_offset == listens->nfds - 1)
 
    {
 
      listens->nfds --;
 
      return 0;
 
    }
 

	
 
  /*
 
    Shift whatever's at the end of the pollfds array into the position
 
    of the pollfd being removed. Then update that client's struct distrend_client
 
    entry to point to the new offset.
 
   */
 
  memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd));
 
  displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1);
 
  if(!displaced_sock)
 
    {
 
      fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n",
 
	      (int)listens->nfds - 1, (int)listens->nfds);
 
      listens->nfds --;
 
      return 1;
 
    }
 

	
 
  displaced_sock->pollfd_offset = sock->pollfd_offset;
 

	
 
  listens->nfds --;
 
  return 0;
 
}
 

	
 
/**
 
   For _distrend_polled_sock_get_by_offset_traverse()
 
 */
 
struct distrend_polled_sock_get_by_offset_data
 
{
 
  struct distrend_polled_sock *sock;
 
  size_t pollfd_offset;
 
};
 

	
 
/**
 
   List traversal helper function for distrend_polled_sock_get_by_offset().
 

	
 
   @param sock The second argument is casting either sturct distrend_client or
 
   struct distrend_listen_sock to a struct distrend_polled_sock which works
 
   because each of the two former structs as a sturct distrend_polled_sock
 
   as its first member.
 
 */
 
int _distrend_polled_sock_get_by_offset_traverse(struct distrend_polled_sock_get_by_offset_data *data, struct distrend_polled_sock *sock)
 
{
 
  if(data->pollfd_offset == sock->pollfd_offset)
 
    {
 
      data->sock = sock;
 
      return FALSE;
 
    }
 
  return TRUE;
 
}
 

	
 
/**
 
   Returns a struct distrend_client based on its offset in the listens->pollfds
 
   array.
 

	
 
   @param listens the listens.
 
   @param pollfds_offset the index in the listens->pollfds array of the client we should find
 
   @return NULL if the client is not found or a pointer to the struct distrend_client for the specified client. Generally, you would not free() this but delete the client by first removing it from the listen->client list.
 
 */
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset)
 
{
 
  struct distrend_polled_sock_get_by_offset_data data;
 

	
 
  data.sock = NULL;
 
  data.pollfd_offset = pollfds_offset;
 

	
 
  /**
 
     These traversals both depend on struct distrend_polled_sock being
 
     the first entry of struct distrend_listens and struct distrend_client.
 
   */
 
  list_traverse(listens->clients, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
 
  if(data.sock == NULL)
 
    list_traverse(listens->listen_socks, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
 

	
 
  return data.sock;
 
}
 

	
src/server/listen.h
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
struct distrend_clientset;
 
struct distrend_listen;
 
struct general_info;
 
struct distrend_listens;
 
struct distrend_client;
 

	
 
#ifndef _DISTREN_LISTEN_H
 
#define _DISTREN_LISTEN_H
 

	
 
#include "distrend.h"
 
#include "common/protocol.h"
 

	
 
#include <poll.h>
 
#include <queue.h>
 
#include <time.h>
 

	
 
/**
 
   How long a client has after connecting to send
 
   authentication information before his connection is cleaned
 
   up.
 
 */
 
#define DISTREND_LISTEN_AUTHTIME 32
 

	
 
/**
 
   How long a client has when in DISTREND_CLIENT_BAD before
 
   his connection is dropped. This grace time is intended so that
 
   the client will actually see his disconnect message instead of
 
   just having his connection reset.
 
 */
 
#define DISTREND_LISTEN_DISCONNECT_GRACE 8
 

	
 
enum distrend_client_state
 
  {
 
    /**
 
       We don't yet know the client. It may only use authentication
 
       commands.
 
     */
 
    DISTREND_CLIENT_PREAUTH,
 
    /**
 
       The client is authenticated, etc.
 
     */
 
    DISTREND_CLIENT_GOOD,
 
    /**
 
       The client is queued to be disconnected. (This state exists
 
       so that the client at least has a chance to recieve its
 
       disconnect message/error before being dumped).
 
     */
 
    DISTREND_CLIENT_BAD,
 
    /**
 
       The socket used to communicate with the client is closed. Its entry
 
       in the client list should be removed on the next garbage clean-up round.
 
     */
 
    DISTREND_CLIENT_DEAD
 
  };
 

	
 
struct distrend_listen
 
/**
 
   inheritence in C !?!?
 
 */
 
struct distrend_polled_sock
 
{
 
  int sock;
 
  /* this socket's offset in the listens->pollfds array. */
 
  size_t pollfd_offset;
 
};
 

	
 
struct distrend_listen_sock
 
{
 
  /**
 
     sock must be first, see distrend_polled_socket_get_by_offset()
 
   */
 
  struct distrend_polled_sock sock;
 
  int port;
 
  int sock;
 
};
 

	
 
struct distrend_listens
 
{
 
  /* of type (struct distrend_listen_sock *) */
 
  list_t listen_socks;
 
  /* of type (struct distrend_request_handler_info) */
 
  list_t request_handlers;
 
  /* the data to pass onto all request handlers */
 
  struct general_info *geninfo;
 

	
 
  /* of type (struct distrend_client) */
 
  list_t clients;
 

	
 
  /* the array passed to poll() */
 
  struct pollfd *pollfds;
 
  /* the number of entries pollfds could hold */
 
  nfds_t pollfds_len;
 
  /* the number of entries that pollfds does hold */
 
  nfds_t nfds;
 
};
 

	
 

	
 
/**
 
   The information necessary to recieve data from and send data
 
   to a client.
 
 */
 
struct distrend_client
 
{
 
  int sock;
 
  /**
 
     sock must be first, see distrend_polled_socket_get_by_offset()
 
   */
 
  struct distrend_polled_sock sock;
 

	
 
  enum distrend_client_state state;
 

	
 
  /**
 
     The absolute time at which this client's entry in the client list will be
 
     expired, closed, and marked as dead so that it may be cleaned up. This is
 
     used to implement ping timeouts (if state == DISTREND_CLIENT_GOOD) and 
 
     disconnect message grace time (if state == DISTREND_CLIENT_BAD).
 
   */
 
  time_t cleanup_time;
 

	
 
  size_t inlen; /*< number of bytes waiting to be processed */
 
  size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */
 
  QUEUE *inmsgs;
 
  QUEUE *outmsgs;
 

	
 
};
 

	
 

	
 

	
 
typedef int(*distrend_handle_request_t)(struct distrend_client *client, struct distren_request *req, void *reqdata, void *data);
 
/**
 
   A function signature that may be registered as a client
 
   request handler.
 

	
 
   @param client the client that sent the request
 
   @param len the length of the message in bytes
 
   @param data the message received from the client
 
 */
 
typedef int(*distrend_handle_request_func_t)(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data);
 

	
 
/**
 
   initializes the listens and clientset
 
   @param config the configuration from distrend
 
   @param clients a pointer to a struct distrend_clientset pointer which will be set to memory allocated for the clientset
 
   Initializes the listens member of struct distrend_config.
 

	
 
   @param geninfo general info to apss to the request handler.
 
   @return Must be free()d with distrend_listen_free();
 
*/
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo);
 

	
 
/**
 
   Adds a socket and configures it to listen().
 

	
 

	
 
   @param listens The handle for this set of listens, obtained via distrend_listen_init().
 
 */
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients);
 
int distrend_listen_add(struct distrend_listens *listens, int port);
 

	
 
/**
 
   Register a request handler with the listener.
 

	
 
   @param config distrend's configuration
 
   @param type the request type for which this handler should be called
 
   @param handler the handler to call when a request of type type is received.
 
 */
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler);
 

	
 
/**
 
   checks states of the sockets I'm managing. If there are any new connections,
 
   or activity on any sockets, I'll call the appropriate function.
 
   I will block until there is some sort of activity, including
 
   signals. If you want to cleanly shut down, it's best to register
 
   signal handlers somewhere
 

	
 
   @param listens the config->listens after being initialized with distrend_listen()
 
 */
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata);
 
int distrend_accept(struct distrend_listens *listens);
 

	
 
/**
 
   cleans listening socket. Unnecessary for a working server, currently a stub.
 
   cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub.
 
 */
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients);
 
int distrend_listen_free(struct distrend_listens *listens);
 

	
 
/**
 
   writes message to client.
 
   @param towrite the caller is expected to free this string. This function will
 
   strdup() it, in essence.
 
 */
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen);
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen);
 

	
 
/**
 
   writes request to client.
 
   @param client client to write to
 
   @param req the request struct. caller must free this.
 
   @param data the data of the request which is req->len bytes long. caller must free this.
 
 */
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data);
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client();
 

	
 
/**
 
   Queue a DISTREN_REQUEST_DISCONNECT and prepare a client
 
   to be disconnected.
 
 */
 
int distrend_send_disconnect(struct distrend_client *client, char *quit_msg);
 
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg);
 

	
 

	
 
#endif
0 comments (0 inline, 0 general)