Changeset - e22839d03ac9
[Not reviewed]
default
0 2 0
Nathan Brink (binki) - 15 years ago 2010-07-02 12:13:57
ohnobinki@ohnopublishing.net
Add DISTREN_REQUEST_PING handler to distrend and fix one of many memory leaks.
2 files changed with 21 insertions and 0 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "distrenjob.h"
 
#include "listen.h"
 
#include "slavefuncs.h"
 
#include "mysql.h"
 

	
 
#include "common/asprintf.h"
 
#include "common/execio.h"
 
#include "common/options.h"
 
#include "common/protocol.h"
 

	
 
#include <confuse.h>
 
#include <malloc.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 

	
 
/* ******************* Structs ************************ */
 
struct general_info
 
{
 
  struct distrenjob head;
 
  distrend_mysql_conn_t conn;
 

	
 
  struct distrend_config *config;
 

	
 
  struct
 
  {
 
    /** general_info.xml */
 
    char *geninfo;
 
    
 
  } files;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 
/* *********************************************
 
   Function Prototypes
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
int distrend_do();
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 

	
 
/**
 
   client request handlers
 
 */
 
int distrend_handle_version(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data);
 
int distrend_handle_ping(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data);
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int update_xml_joblist(struct general_info *geninfo);
 

	
 
/* **************Test Functions**************** */
 
int interactiveTest(int test, struct general_info *general_info);
 

	
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  int tmp;
 
  struct general_info general_info;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
  } clientstatus;
 

	
 
  clientstatus = CLIENTSTATUS_UNINITIALIZED;
 
  // xmlinit();
 

	
 
  for(counter = 0; counter < argc; counter ++)
 
    {
 
      if(strcmp(argv[counter], "-h") == 0)
 
      {
 
    	  fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
 
	  return 2;
 
      }
 

	
 
      else if(strcmp(argv[counter], "-t") == 0)
 
      {
 
    	  fprintf(stderr, "Entering into test mode...\n\n");
 
    	  test = 1;
 
      }
 
    }
 

	
 

	
 
  if(distrend_do_config(argc, argv, &general_info.config))
 
    return 1;
 

	
 
  /** preset paths */
 
  _distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml",
 
		    general_info.config->datadir);
 

	
 
  /** MySQL Connection */
 
  fprintf(stderr,"Connecting to mysql...\n");
 
  if(mysqlConnect(&general_info.conn,
 
		  general_info.config->mysql_user,
 
		  general_info.config->mysql_host,
 
		  general_info.config->mysql_pass,
 
		  general_info.config->mysql_database) )
 
    {
 
      fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__);
 
      fprintf(stderr, "don't test mysql stuff\n");
 
      interactiveTest(test, &general_info);
 
      return 1;
 
    }
 
  fprintf(stderr,"Finished connecting!\n");
 

	
 
  /** Execute test function */
 
  interactiveTest(test, &general_info);
 

	
 
  general_info.config->listens = distrend_listens_new(&general_info);
 
  if(!general_info.config->listens)
 
    {
 
      fprintf(stderr, "error initializing listens\n");
 
      return 1;
 
    }
 
  for(counter = 0; general_info.config->listen_ports[counter]; counter ++)
 
    {
 
      tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]);
 
      if(tmp)
 
	{
 
	  fprintf(stderr, "Error listening on port %d\n", general_info.config->listen_ports[counter]);
 
	  return 1;
 
	}
 
    }
 

	
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_VERSION, &distrend_handle_version);
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_PING, &distrend_handle_ping);
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  general_info.config->die = 0;
 
  while(!general_info.config->die)
 
    {
 
      int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config->listens);
 

	
 
      /* Run the watchdog, @TODO: like every 10 mins or something */
 
      frame_watchdog(general_info.conn);
 

	
 
      struct frameset frame;
 
      struct distrenjob *job;
 
      distrenjob_new(&job);
 

	
 
      memset(&frame, '\0', sizeof(frame));
 

	
 
      /** If client requests work */
 
      if(clientrequest == DISTREN_REQUEST_GETWORK)
 
	{
 
	  int returnnum = find_jobframe(general_info.conn, slaveKey, &job->jobnum, &frame.num); // Finds a frame to render @FIXME: Slavenum :D
 
	  if(returnnum); /* No work on server */
 
	  else
 
	    remotio_send_to_client(frame.num, job->jobnum); // Pseudo-sends data to client
 
	}
 
      /* If the client states that they finished the frame */
 
      else if(clientrequest == DISTREN_REQUEST_DONEFRAME)
 
	{
 
	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
	  finish_frame(general_info.conn, 0, job->jobnum, frame.num); // @TODO: Make sure this actually works.
 
	}
 
      distrenjob_free(&job);
 
    }
 

	
 
  distrend_listen_free(general_info.config->listens);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
  mysqlDisconnect(general_info.conn);
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
int distrend_handle_version(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data)
 
{
 
  char *tmp_str;
 
  char fixedbuf[32];
 

	
 
  struct distren_request *newreq;
 

	
 
  if(client->state != DISTREND_CLIENT_PREVERSION)
 
    {
 
      distrend_send_disconnect(geninfo->config->listens, client, "You have already sent the VERSION command.");
 
    }
 
  if(strlen(PACKAGE_STRING) == req_len
 
     && !strncmp(PACKAGE_STRING, req_data, req_len))
 
    {
 
      /**
 
	 The client and I claim to be of the same version of distren :-D
 
	 Now we will mark the client as valid.
 

	
 
	 We won't increment his time to live, though, because it shouldn't take
 
	 him that long to auth.
 
      */
 
      client->state = DISTREND_CLIENT_PREAUTH;
 

	
 
      distren_request_new(&newreq, strlen(PACKAGE_STRING), DISTREN_REQUEST_VERSION);
 
      distrend_client_write_request(geninfo->config->listens, client, newreq, PACKAGE_STRING);
 
      distren_request_free(newreq);
 
    }
 
  else
 
    {
 
      /**
 
	 The client claims to be of a different version of distren.
 
	 Now we will just send a disconnect packet and disconnect the client.
 
      */
 
      strncpy(fixedbuf, req_data, 31);
 
      fixedbuf[31] = '\0';
 
      if(req_len < 31)
 
	fixedbuf[req_len] = '\0';
 

	
 
      _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, fixedbuf);
 
      distrend_send_disconnect(geninfo->config->listens, client, tmp_str);
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_handle_ping(struct general_info *geninfo, struct distrend_client *client, size_t req_len, void *req_data)
 
{
 
  struct distren_request *pong_req;
 

	
 
  if(req_len > 32)
 
    distrend_send_disconnect(geninfo->config->listens, client, "You have tried to send a PING packet with a length longer than 32 bytes.");
 

	
 
  /**
 
      respond to the client using the data he sent in his PONG
 
      command.
 
   */
 
  distren_request_new(&pong_req, req_len, DISTREN_REQUEST_PONG);
 
  distrend_client_write_request(geninfo->config->listens, client, pong_req, req_data);
 
  distren_request_free(pong_req);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 

	
 
/* Grabs config info from confs */
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
 
{
 
  unsigned int counter;
 

	
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_STR_LIST("render_types", NULL, CFGF_NONE),
 
      CFG_SIMPLE_STR("mysql_user", NULL),
 
      CFG_SIMPLE_STR("mysql_host", NULL),
 
      CFG_SIMPLE_STR("mysql_pass", NULL),
 
      CFG_SIMPLE_STR("mysql_database", NULL),
 
      CFG_END()
 
    };
 

	
 
  cfg_t *cfg_listen;
 

	
 
  fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__);
 
  *config = malloc(sizeof(struct distrend_config));
 
  myopts[1].simple_value = &(*config)->datadir;
 
  myopts[3].simple_value = &(*config)->mysql_user;
 
  myopts[4].simple_value = &(*config)->mysql_host;
 
  myopts[5].simple_value = &(*config)->mysql_pass;
 
  myopts[6].simple_value = &(*config)->mysql_database;
 

	
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listen_ports = malloc(sizeof(int) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listen_ports[counter] = cfg_getint(cfg_listen, "port");
 
    }
 
  (*config)->listen_ports[counter] = 0;
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 

	
 
  return 0;
 
}
 

	
 
int distrend_config_free(struct distrend_config *config)
 
{
 
  distrend_listen_free(config->listens);
 
  options_free(config->options);
 
  free(config->listen_ports);
 
  free(config);
 

	
 
  return 0;
 
}
 

	
 
/* ************************** XML Functions ************************* */
 

	
 
// writes the general_info.xml file which is a copy of the general_info structure
 
// except that it doesn't hold free_clients and rendering_clients
 
void update_general_info(struct general_info *geninfo)
 
{
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 

	
 
  writer = xmlNewTextWriterFilename(geninfo->files.geninfo, 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  xmlTextWriterStartElement(writer, (xmlChar*)"general_info");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->highest_jobnum);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterEndDocument(writer);
 
  xmlFreeTextWriter(writer);
 
}
 

	
 
/**
 
   Reads general state information from general_info.xml
 
   into the general_info structure.
 
*/
 
int import_general_info(struct general_info *general_info)
 
{
 
  xmlDocPtr doc;
 
  xmlNodePtr cur;
 

	
 
  doc = xmlParseFile(general_info->files.geninfo);
 
  cur = xmlDocGetRootElement(doc);
 
  if (xmlStrcmp(cur->name, (xmlChar*)"general_info"))
 
    {
 
      fprintf(stderr, "xml document is wrong type");
 
      xmlFreeDoc(doc);
 
      return 1;
 
    }
 

	
 
  cur = cur->xmlChildrenNode;
 
  general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 

	
 
  general_info->hibernate = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  xmlFreeDoc(doc);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   updates job_list.xml which lists all the jobs in the queue
 
   @return 0 means success
 
*/
 
// @QUERY: Likely obselete (don't remove at request of ohnobinki)
 
int update_xml_joblist(struct general_info *geninfo)
 
{
 
  struct distrenjob *job;
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 
  int counter;
 

	
 
  /**
 
     update timestamp
 
  */
 
  geninfo->timestamp ++;
 
  if(geninfo->timestamp > 65530)
 
    geninfo->timestamp = 0;
 

	
 
  _distren_asprintf(&tmp, "%s/job_list.xml",
 
                    geninfo->config->datadir);
 
  writer = xmlNewTextWriterFilename(tmp, 0);
 
  free(tmp);
 

	
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  /**
 
     create root element job_list
 
  */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"job_list");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->timestamp);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  geninfo->total_priority_pieces = 0;
 
  counter = 0;
 
  for(job = geninfo->head.next; job; job = job->next)
 
    {
 
      _distren_asprintf(&tmp, "%d", job->jobnum);
 
      xmlTextWriterWriteElement(writer, (xmlChar*)"jobnum", (xmlChar*)tmp);
 
      free(tmp);
 

	
 
      /**
 
        this is needed for the new frame finder to work
 
      
 
        Why the random constant numeral 11? --ohnobinki
 
      */
 
      geninfo->total_priority_pieces = geninfo->total_priority_pieces + job->priority;
 

	
 
      counter++;
 
    }
 

	
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     close elements and end document
 
  */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk
 
  */
 
  xmlFreeTextWriter(writer);
 
  return 0;
 
}
 

	
 
/* ************************** Test Functions ************************* */
 

	
 

	
 
/** Interactive test for the queuing system */
 
/* @QUEUE: Test uses methods not present in C code using mysql web-based system */
 
int interactiveTest(int test, struct general_info *geninfo)
 
{
 
  int command;
 
  int32_t slaveKey = 1;
 
  jobnum_t jobKey = 0;
 
  int32_t frameNum = 0;
 
  int32_t newPriority = 0;
 
  int tmp = 0;
 

	
 
  fprintf(stderr,"Hello!\n");
 

	
 
  while(test == 1)
 
   {
 
     fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
 
     fprintf(stderr, "\t1 \tGet a frame to render\n");
 
     fprintf(stderr, "\t2 \tChange job priority\n");
 
     fprintf(stderr, "\t3 \tSet frame finished\n");
 
     fprintf(stderr, "\t4 \tSet frame started\n");
 
     fprintf(stderr, "\t5 \tStart listener\n");
 
     fprintf(stderr, "\t0 \tQuit\n");
 

	
 
     scanf("%d", &command);
 

	
 
     switch(command)
 
     {
 
     case 1:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &slaveKey);
 
       fprintf(stderr, "Got frame: ");
 
       if(find_jobframe(geninfo->conn, slaveKey, &jobKey, &frameNum))
 
         fprintf(stderr,"No frames available to render!\n");
 
       else if(jobKey == -1)
 
         fprintf(stderr,"Slave %d has no render power!", slaveKey);
 
       else
 
         fprintf(stderr, "jobKey: %d, frameNum: %d\n",jobKey,frameNum);
 
       break;
 
     case 2:
 
       fprintf(stderr,"Job key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"New priority: ");
 
       scanf("%d", &tmp);
 
       newPriority = tmp;
 

	
 
       change_job_priority(geninfo->conn, jobKey, newPriority);
 
       fprintf(stderr,"Changed!");
 
       break;
 
     case 3:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       finish_frame(geninfo->conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Finished Frame!\n");
 
       break;
 
     case 4:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       start_frame(geninfo->conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Started Frame!\n");
 
       break;
 

	
 
     case 5:
 
       while(1)
 
	 {
 
	   distrend_accept(geninfo->config->listens);
 
	   /*
 
	     code for reading data from clients either goes here or in distrend_accept().
 
	     it might make sense for us to just pass the distrend_accept() function a
 
	     callback which can handle packets or to have a generic packet handling
 
	     subsystem which gathers data into groups defined by by packet.h and then
 
	     passed onto the callback.
 
	   */
 
	 }
 
       break;
 
       
 
     case 0:
 
       test = 0;
 
       break;
 
     default:
 
       fprintf(stderr, "Invalid input, please try again.\n");
 
       break;
 
     }
 
   }
 
  return 0;
 
}
src/server/listen.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "listen.h"
 
#include "common/protocol.h"
 

	
 
#include <errno.h>
 
#include <fcntl.h>
 
#include <list.h>
 
#include <malloc.h>
 
#include <netinet/in.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#include <poll.h>
 
#include <sys/socket.h>
 
#include <unistd.h>
 

	
 
/* local */
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
struct distrend_request_handler_info
 
{
 
  enum distren_request_type request_type;
 
  distrend_handle_request_func_t handler;
 
};
 

	
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 
int distrend_packets_collapse(QUEUE *queue, size_t len);
 
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata);
 
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset);
 

	
 
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events);
 
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock);
 

	
 
struct distrend_listens *distrend_listens_new(struct general_info *geninfo)
 
{
 
  struct distrend_listens *listens;
 

	
 
  listens = malloc(sizeof(struct distrend_listens));
 
  if(!listens)
 
    return NULL;
 

	
 
  listens->listen_socks = list_init();
 
  listens->request_handlers = list_init();
 
  listens->clients = list_init();
 
  listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */
 
  listens->pollfds_len = 2;
 
  listens->nfds = 0;
 
  listens->geninfo = geninfo;
 

	
 
  if(!listens->listen_socks
 
     || !listens->request_handlers
 
     || !listens->clients
 
     || !listens->pollfds)
 
    {
 
      if(listens->listen_socks)
 
	list_free(listens->listen_socks, LIST_NODEALLOC);
 
      if(listens->request_handlers)
 
	list_free(listens->request_handlers, LIST_NODEALLOC);
 
      if(listens->clients)
 
	list_free(listens->clients, LIST_NODEALLOC);
 
      free(listens->pollfds);
 
      free(listens);
 
      return NULL;
 
    }
 

	
 
  return listens;
 
}
 

	
 
/**
 
   clean up after a partially completed distrend_listen()
 
   call which ends in error.
 
 */
 
int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo)
 
{
 
  int sock;
 

	
 
  sock = sockinfo->sock.sock;
 
  if(sock >= 0)
 
    close(sock);
 

	
 
  free(sockinfo);
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_add(struct distrend_listens *listens, int port)
 
{
 
  int tmp;
 
  struct distrend_listen_sock *newsock;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
      .sin6_family = AF_INET6,
 
      .sin6_port = 0,
 
      .sin6_flowinfo = 0,
 
      .sin6_addr = IN6ADDR_ANY_INIT,
 
      .sin6_scope_id = 0
 
    };
 

	
 
  sockaddr.sin6_port = htons(port);
 

	
 
  newsock = malloc(sizeof(struct distrend_listen_sock));
 
  if(!newsock)
 
    return 1;
 

	
 
  newsock->port = port;
 
  newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  if(tmp == -1)
 
    {
 
      perror("bind");
 
      distrend_listen_add_unwind(newsock);
 

	
 
      return 1;
 
    }
 

	
 
  tmp = listen(newsock->sock.sock, 1);
 
  if(tmp == -1)
 
    {
 
      perror("listen");
 
      distrend_listen_add_unwind(newsock);
 
      
 
      return 1;
 
    }
 

	
 
  tmp = distrend_makesocknonblock(newsock->sock.sock);
 
  if(tmp)
 
    {
 
      /* error is already printed to user by distrend_makesocknonblock() */
 
      distrend_listen_add_unwind(newsock);
 
      return 1;
 
    }
 

	
 
  distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM);
 
  list_insert_after(listens->listen_socks, newsock, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_handleread(struct distrend_listens *listens,
 
			struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
  char buf[8192];
 

	
 
  struct distren_request *req;
 
  void *reqdata;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
      readlen = read(client->sock.sock, buf, sizeof(buf));
 
      if(readlen == -1)
 
	{
 
	  perror("read");
 
	  switch(errno)
 
	    {
 
	    case EINTR:
 
	    case EAGAIN:
 
	      break;
 

	
 
	    default:
 
	      client->state = DISTREND_CLIENT_DEAD;
 

	
 
	      free(packet);
 
	      return 1;
 
	    }
 
	}
 
      if(readlen == 0)
 
	/* handle EOF */
 
	{
 
	  client->state = DISTREND_CLIENT_DEAD;
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 

	
 
      packet->len = readlen;
 
      packet->data = malloc(readlen);
 
      if(!packet->data)
 
	{
 
	  fprintf(stderr, "OOM!\n");
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 
      memcpy(packet->data, buf, readlen);
 
      q_enqueue(client->inmsgs, packet, 0);
 
      client->inlen += packet->len;
 
      packet = NULL;
 

	
 
  /**
 
     Manage input, etc.
 
   */
 
  if(client->expectlen == 0)
 
    {
 
      /* search out header from input so far */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(packet->len > sizeof(struct distren_request))
 
	{
 
	  if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	    {
 
	      fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
 

	
 
	      client->state = DISTREND_CLIENT_DEAD;
 
	      return 1;
 
	    }
 
	  client->expectlen = req->len + sizeof(struct distren_request);
 
	  distren_request_free(req);
 
	}
 
    }
 
  if(client->expectlen
 
     && client->inlen >= client->expectlen)
 
    {
 
      /** essentially un-queue-ize the queue ;-) */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	{
 
	  fprintf(stderr, "error handling data from client\n");
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      if(packet->len - sizeof(struct distren_request) < req->len)
 
	{
 
	  fprintf(stderr, "error handling some data from client\n");
 
	  distren_request_free(req);
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      reqdata = malloc(req->len);
 
      if(!reqdata)
 
	{
 
	  fprintf(stderr, "OOM\n");
 

	
 
	  distren_request_free(req);
 
	  return 1;
 
	}
 
      memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len);
 
      memmove(packet->data,
 
	      ((void *)packet->data) + client->expectlen,
 
	      packet->len - client->expectlen);
 
      packet->len -= client->expectlen;
 

	
 
      client->inlen -= client->expectlen;
 
      client->expectlen = 0;
 

	
 
      distrend_dispatch_request(listens, client, req, reqdata);
 
      free(reqdata);
 
      distren_request_free(req);
 
    }
 
  return 0;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  struct distrend_listens *listens;
 
  time_t current_time;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 
  short revents;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  /**
 
     Manage timing-out clients.
 
   */
 
  if(data->current_time > client->cleanup_time)
 
    switch(client->state)
 
      {
 
      case DISTREND_CLIENT_PREVERSION:
 
	distrend_send_disconnect(data->listens, client, "You have failed to present version information in a timely manner. Cya :-p");
 
	break;
 
      case DISTREND_CLIENT_PREAUTH:
 
	distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)");
 
	break;
 

	
 
      case DISTREND_CLIENT_GOOD:
 
	distrend_send_disconnect(data->listens, client, "Ping timeout :-p");
 
	break;
 

	
 
      case DISTREND_CLIENT_BAD:
 
	client->state = DISTREND_CLIENT_DEAD;
 
	return TRUE;
 

	
 
      default:
 
	break;
 
      }
 

	
 
  revents = data->listens->pollfds[client->sock.pollfd_offset].revents;
 
  if(revents & POLLRDNORM)
 
    /* handle reading */
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 
      distrend_handleread(data->listens, client);
 
    }
 
  if(revents & POLLWRNORM)
 
    {
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n",
 
	      __FILE__, __LINE__, client->sock.sock);
 

	
 
      if(q_empty(client->outmsgs))
 
	{
 
	  data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	  return TRUE;
 
	}
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock.sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
      if(written_amount == -1)
 
	{
 
	  perror("write");
 
	  /*
 
	    distrend_client_free();
 

	
 
	    Until liblist has the ability to delete nodes
 
	    during traversal, we'll have to free the client
 
	    during a client-list cleanup/pruning cycle.
 
	  */
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	}
 
      if(packet->len == written_amount)
 
	{
 
	  q_dequeue(client->outmsgs);
 
	  distrend_packet_free(packet);
 

	
 
	  if(q_empty(client->outmsgs))
 
	    data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
 
	}
 
      else
 
	{
 
	  /**
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	  memmove(packet->data, packet->data + written_amount, packet->len);
 
	}
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock)
 
{
 
  short revents;
 
  int newclientsock;
 

	
 
  revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents;
 
  fprintf(stderr, "blah %d\n", (int)revents);
 
  if(revents & POLLRDNORM)
 
    {
 
      newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREVERSION))
 
	{
 
	  fprintf(stderr, "error allocating/adding client struct\n");
 
	  return 1;
 
	}
 
      fprintf(stderr, "accepted new connection\n");
 
    }
 

	
 
  /**
 
     Check other listen blocks too.
 
   */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
{
 
  int tmp;
 

	
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds);
 
  poll(listens->pollfds, listens->nfds, -1);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 

	
 
      return 1;
 
    }
 

	
 
  /**
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.listens = listens;
 
  travinfo.current_time = time(NULL); /*< cache the time */
 

	
 
  list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE);
 
  /**
 
     Until liblist supports deleting nodes during a traversal,
 
     we clean dead clients here:
 
   */
 
  list_mvfront(listens->clients);
 
  newclient = list_curr(listens->clients);
 
  while(newclient)
 
    {
 
      if(newclient->state == DISTREND_CLIENT_DEAD)
 
	{
 
	  distrend_listen_poll_deletefd(listens, &newclient->sock);
 
	  distrend_client_free(newclient);
 
	  list_remove_curr(listens->clients);
 
	  fprintf(stderr, "removed dead connection\n");
 
	}
 
      list_mvnext(listens->clients);
 
      /* provide for termination of this loop */
 
      if(newclient == list_curr(listens->clients))
 
	newclient = NULL;
 
      else
 
	newclient = list_curr(listens->clients);
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen_free(struct distrend_listens *listens)
 
{
 
  fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
 

	
 
  return 1;
 
}
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
 
{
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
 
}
 

	
 
/**
 
   Adds a newly connected distrend_client to the listens->clients list
 
 */
 
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state)
 
{
 
  int tmp;
 
  struct distrend_client *client;
 

	
 
  tmp = distrend_makesocknonblock(sock);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  client = malloc(sizeof(struct distrend_client));
 
  if(!client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      close(sock);
 
      return 1;
 
    }
 
  client->sock.sock = sock;
 
  distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */
 

	
 
  client->state = state;
 
  client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
 
  client->inlen = 0;
 
  client->expectlen = 0;
 
  client->inmsgs = q_init();
 
  client->outmsgs = q_init();
 

	
 
  list_insert_after(listens->clients, client, 0);
 

	
 
  /**
 
     For those using netcat/telnet to debug their internets.
 
   */
 
#ifndef PACKAGE_URL
 
#define PACKAGE_URL "http://ohnopub.net/distren/"
 
#endif
 
#define DISTREN_GREETING PACKAGE_STRING " " PACKAGE_URL " : Nathan Phillip Brink && Ethan Michael Zonca\n"
 
  /* using sizeof() - 1 because the sizeof() includes a NULL byte we want to ignore. */
 
  distrend_client_write(listens, client, DISTREN_GREETING, sizeof(DISTREN_GREETING) - 1);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   This function shall only be called after the appropriate
 
   distrend_listen_poll_deletefd() has been called
 
 */
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock.sock);
 
  
 
  return 0;
 
}
 

	
 
void distrend_packet_free(struct distrend_packet *packet)
 
{
 
  free(packet->data);
 
  free(packet); 
 
}
 

	
 
int distrend_makesocknonblock(int sock)
 
{
 
  int fdstatus;
 
  int tmp;
 

	
 
  fdstatus = fcntl(sock, F_GETFL);
 
  fdstatus |= O_NONBLOCK;
 
  tmp = fcntl(sock, F_SETFL, fdstatus);
 
  if(tmp == -1)
 
    {
 
      perror("fcntl");
 
      return 1;
 
    }
 
  
 
  return 0;
 
}
 

	
 

	
 
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen)
 
{
 
  struct distrend_packet *packet;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  packet->len = msglen;
 
  packet->data = malloc(msglen);
 
  if(!packet->data)
 
    {
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, towrite, msglen);
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM;
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data)
 
{
 
  char *towrite;
 
  int ret;
 
  size_t msglen;
 

	
 
  msglen = sizeof(struct distren_request) + req->len;
 

	
 
  towrite = malloc(msglen);
 
  if(!towrite)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(towrite, req, sizeof(struct distren_request));
 
  memcpy(towrite + sizeof(struct distren_request), data, req->len);
 

	
 
  ret = distrend_client_write(listens, client, towrite, msglen);
 

	
 
  free(towrite);
 
  return ret;
 
}
 

	
 
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
 
{
 
  struct distrend_packet *packet;
 

	
 
  *lenread = 0;
 
  *toread = NULL;
 

	
 
  if(q_empty(client->inmsgs))
 
    return 1;
 

	
 
  packet = q_dequeue(client->inmsgs);
 
  *lenread = packet->len;
 
  *toread = packet->data;
 
  free(packet);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   collapse a QUEUE of struct distrend_packet with a collective length
 
   of len into a single struct packet.
 
 */
 
int distrend_packets_collapse(QUEUE *queue, size_t len)
 
{
 
  struct distrend_packet *newpacket;
 
  struct distrend_packet *oldpacket;
 

	
 
  size_t copiedlen;
 

	
 
  newpacket = malloc(sizeof(struct distrend_packet));
 
  if(!newpacket)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  
 
  newpacket->data = malloc(len);
 
  if(!newpacket->data)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      perror("malloc");
 
      free(newpacket);
 

	
 
      return 1;
 
    }
 

	
 
  for(copiedlen = 0;
 
      copiedlen < len && !q_empty(queue);
 
      )
0 comments (0 inline, 0 general)