Changeset - 7d1fd1fe48fc
[Not reviewed]
default
0 7 0
Nathan Brink (binki) - 15 years ago 2010-06-07 23:20:29
ohnobinki@ohnopublishing.net
basic TCP support for distren able to write and handle requests
7 files changed with 324 insertions and 58 deletions:
0 comments (0 inline, 0 general)
Makefile.am
Show inline comments
 
ACLOCAL_AMFLAGS = -I m4
 

	
 

	
 
AM_CPPFLAGS = -DSYSCONFDIR='"$(sysconfdir)"' \
 
	-DLOCALSTATEDIR='"$(localstatedir)"' \
 
	-I$(top_srcdir)/src
 
AM_CFLAGS = $(DISTLIBS_CFLAGS)
 
LIBS = $(DISTLIBS_LDADD) $(DISTLIBS_LDFLAGS)
 
LIBS = $(DISTLIBS_LIBS)
 
LDADD = libdistrencommon.la
 

	
 
bin_PROGRAMS = 
 
if ENABLE_SERVER
 
bin_PROGRAMS += distrend distrenslave distrensimpleslave
 
endif
 

	
 
pkglib_LTLIBRARIES = libdistrencommon.la
 

	
 
# libdistrencommon.la:
 
libdistrencommon_la_SOURCES = src/common/asprintf.c src/common/asprintf.h \
 
	src/common/execio.c src/common/execio.h \
etc/distrendaemon.conf.in
Show inline comments
 
@@ -2,19 +2,27 @@
 
 configuration for _this_ server
 
*/
 
daemon
 
{
 
  datadir = "@LOCALSTATEDIR@/@PACKAGE@"
 
  render_types = {"povray", "blender", "inkscape", "imagemagick", "dcraw"}
 

	
 
  mysql_user = "distren"
 
  mysql_pass = "HwLxuBqTvJ6J7FWj"
 
  mysql_host = "localhost"
 
  mysql_database = "distren"
 

	
 
/*
 
  listen {
 
         type = "unix"
 
         path = "@RUNSTATEDIR@/@PACKAGE@d.sock"
 
         port = "0770"
 
         port = 0770
 
  }
 
*/
 

	
 
  listen {
 
  	 type = "tcp"
 
	 path = "0.0.0.0"
 
	 port = 4050
 
  }
 
}
 
include("distrencommon.conf")
src/common/protocol.c
Show inline comments
 
@@ -11,40 +11,65 @@
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "protocol.h"
 

	
 
#include <malloc.h>
 
#include <string.h>
 

	
 
#define DISTREN_REQUEST_MAGIC (0x32423434)
 

	
 
int distren_request_new(struct distren_request **req, uint32_t len, enum distren_request_type type)
 
{
 
  struct distren_request *newreq;
 

	
 
  newreq = malloc(sizeof(struct distren_request));
 
  if(!newreq)
 
    {
 
      (*req) = NULL;
 
      return 1;
 
    }
 

	
 
  newreq->magic = DISTREN_REQUEST_MAGIC;
 
  newreq->len = len;
 
  newreq->enumsize = sizeof(enum distren_request_type);
 
  newreq->type = type;
 

	
 
  (*req) = newreq;
 
  return 0;
 
}
 

	
 
int distren_request_new_fromdata(struct distren_request **req, void *data, size_t len)
 
{
 
  struct distren_request *newreq;
 

	
 
  if(len < sizeof(struct distren_request))
 
    return 1;
 

	
 
  if( ((struct distren_request *)data)->magic != DISTREN_REQUEST_MAGIC )
 
    {
 
      fprintf(stderr, "packet doesn't match magic stuffs\n");
 
      return 1;
 
    }
 

	
 
  newreq = malloc(sizeof(struct distren_request));
 
  if(!newreq)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(newreq, data, sizeof(struct distren_request));
 
  (*req) = newreq;
 
  return 0;
 
}
 

	
 
int distren_request_free(struct distren_request *req)
 
{
 
  free(req);
 
  return 0;
 
}
src/common/protocol.h
Show inline comments
 
@@ -11,24 +11,25 @@
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#ifndef DISTREN_PROTOCOL_H
 
#define DISTREN_PROTOCOL_H
 

	
 
#include <stddef.h>
 
#include <stdint.h>
 

	
 
/**
 
   Server types:
 
 */
 
#define DISTREN_SERVERTYPE_SUBMIT (0x1)
 
#define DISTREN_SERVERTYPE_DISTRIBUTE (0x2)
 
#define DISTREN_SERVERTYPE_RENDER (0x4)
 

	
 
/**
 
   This file defines the constants and structs that the client uses to talk to the server and that the servers use to talk to eachother.
 
 */
 
@@ -75,27 +76,33 @@ slave */
 
database */
 
    DISTREN_REQUEST_RESETFRAME = 15, /* sets a frame back to unassigned,
 
happens if the slave quits for some reason. server code should only allow
 
resetting of a frame assigned to the slave calling the request (see php
 
code)*/ 
 

	
 
  };
 

	
 
struct distren_request
 
{
 
  uint32_t magic;
 
  uint32_t len;
 
  uint8_t enumsize;
 
  enum distren_request_type type;
 
  /** treat type as an enum distren_request_type using casting */
 
  uint32_t /* enum distren_request_type */ type;
 
};
 

	
 
/**
 
   initializes request
 
   initializes and allocates request
 
 */
 
int distren_request_new(struct distren_request **req, uint32_t len, enum distren_request_type type);
 

	
 
/**
 
   initializes and allocates request based on raw input data
 
   which includes the headers of the request.
 
 */
 
int distren_request_new_fromdata(struct distren_request **req, void *data, size_t len);
 

	
 
/**
 
   frees request
 
 */
 
int distren_request_free(struct distren_request *req);
 

	
 
#endif
src/server/distrend.c
Show inline comments
 
@@ -74,25 +74,25 @@ struct general_info
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 
/* *********************************************
 
   Function Prototypes
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
int distrend_do();
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 
int distrend_handle_request(struct distren_request *req, void *reqdata, struct general_info *geninfo);
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int update_xml_joblist(struct general_info *geninfo);
 

	
 
/* **************Test Functions**************** */
 
int interactiveTest(int test, struct general_info *general_info);
 

	
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
@@ -198,27 +198,44 @@ int main(int argc, char *argv[])
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
  mysqlDisconnect(general_info.conn);
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
int distrend_handle_request(struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
int distrend_handle_request(struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo)
 
{
 
  fprintf(stderr, "I can haz data %d bytes long\n", req->len);
 
  size_t counter;
 

	
 
  /* for response requests... if that makes any less sense ;-) */
 
  struct distren_request *newreq;
 

	
 
  fprintf(stderr, "I can haz data %d bytes long:", req->len);
 
  for(counter = 0; counter < req->len; counter ++)
 
    fputc(((char *)reqdata)[counter], stderr);
 
  fputc('\n', stderr);
 

	
 
  switch(req->type)
 
    {
 
    case DISTREN_REQUEST_VERSION:
 
      distren_request_new(&newreq, strlen(VERSION), DISTREN_REQUEST_VERSION);
 
      distrend_client_write_request(client, newreq, VERSION);
 
      distren_request_free(newreq);
 
      break;
 
    }
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 

	
src/server/listen.c
Show inline comments
 
@@ -41,127 +41,239 @@ struct distrend_clientset
 
};
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 
int distrend_packets_collapse(QUEUE *queue, size_t len);
 

	
 
/**
 
   clean up after a partially completed distrend_listen()
 
   call which ends in error.
 
 */
 
int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter)
 
{
 
  int sock;
 

	
 
  for(counter ++; counter > 0; counter --)
 
    {
 
      sock = config->listens[counter - 1].sock;
 
      if(sock >= 0)
 
	close(sock);
 
    }
 
  list_free(clients->clients, NULL);
 
  free(clients);
 

	
 
  return 0;
 
}
 

	
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
 
{
 
  int tmp;
 
  size_t counter;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
      .sin6_family = AF_INET6,
 
      .sin6_port = 0,
 
      .sin6_flowinfo = 0,
 
      .sin6_addr = IN6ADDR_ANY_INIT,
 
      .sin6_scope_id = 0
 
    };
 

	
 
  *clients = malloc(sizeof(struct distrend_clientset));
 

	
 
  (*clients)->clients = list_init();
 
  (*clients)->nfds = 1;
 

	
 
  sockaddr.sin6_port = htons(4050);
 

	
 
  config->listens->sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(config->listens->sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  if(tmp == -1)
 
  for(counter = 0; config->listens[counter].port; counter ++)
 
    {
 
      perror("bind");
 
      free(*clients);
 
      sockaddr.sin6_port = htons(config->listens[counter].port);
 

	
 
      return 1;
 
    }
 
      config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0);
 
      tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
      if(tmp == -1)
 
	{
 
	  perror("bind");
 
	  distrend_listen_unwind(config, *clients, counter);
 

	
 
	  return 1;
 
	}
 

	
 
  tmp = listen(config->listens->sock, 1);
 
  if(tmp == -1)
 
    {
 
      perror("listen");
 
      free(*clients);
 
      tmp = listen(config->listens->sock, 1);
 
      if(tmp == -1)
 
	{
 
	  perror("listen");
 
	  distrend_listen_unwind(config, *clients, counter);
 
      
 
      return 1;
 
    }
 
	  return 1;
 
	}
 

	
 
  tmp = distrend_makesocknonblock(config->listens->sock);
 
  if(tmp)
 
    {
 
      free(*clients);
 
      return 1;
 
      tmp = distrend_makesocknonblock(config->listens->sock);
 
      if(tmp)
 
	{
 
	  /* error is already printed to user by distrend_makesocknonblock() */
 
	  distrend_listen_unwind(config, *clients, counter);
 
	  return 1;
 
	}
 
    }
 

	
 
  return 0;
 
}
 

	
 
int distrend_handleread(struct distrend_config *config,
 
		    struct distrend_client *client)
 
			struct distrend_client *client,
 
			distrend_handle_request_t handlereq,
 
			void *handlereqdata)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
  char buf[8192];
 

	
 
  struct distren_request *req;
 
  void *reqdata;
 

	
 
  /**
 
     we have to check that there's new data so that
 
     we may call ourself recursively
 
  */
 
  fd_set readfd;
 
  struct timeval selecttimeout;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
  readlen = read(client->sock, buf, sizeof(buf));
 
  if(readlen == -1)
 
  FD_ZERO(&readfd);
 
  FD_SET(client->sock, &readfd);
 
  memset(&selecttimeout, '\0', sizeof(selecttimeout));
 
  select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout);
 
  if(FD_ISSET(client->sock, &readfd))
 
    {
 
      perror("read");
 
      switch(errno)
 
      readlen = read(client->sock, buf, sizeof(buf));
 
      if(readlen == -1)
 
	{
 
	case EINTR:
 
	case EAGAIN:
 
	  break;
 
	  perror("read");
 
	  switch(errno)
 
	    {
 
	    case EINTR:
 
	    case EAGAIN:
 
	      break;
 

	
 
	default:
 
	    default:
 
	      client->state = DISTREND_CLIENT_DEAD;
 

	
 
	      free(packet);
 
	      return 1;
 
	    }
 
	}
 
      if(readlen == 0)
 
	/* handle EOF */
 
	{
 
	  client->state = DISTREND_CLIENT_DEAD;
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 
    }
 
  if(readlen == 0)
 
    /* handle EOF */
 

	
 
      packet->len = readlen;
 
      packet->data = malloc(readlen);
 
      if(!packet->data)
 
	{
 
	  fprintf(stderr, "OOM!\n");
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 
      memcpy(packet->data, buf, readlen);
 
      q_enqueue(client->inmsgs, packet, 0);
 
      client->inlen += packet->len;
 
      packet = NULL;
 
    } /* if(FD_ISSET(client->sock, &readfd)) */
 

	
 
  /**
 
     Manage input, etc.
 
   */
 
  if(client->expectlen == 0)
 
    {
 
      client->state = DISTREND_CLIENT_DEAD;
 
      /* search out header from input so far */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      free(packet);
 
      return 1;
 
    }
 
      if(packet->len > sizeof(struct distren_request))
 
	{
 
	  if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	    {
 
	      fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
 

	
 
  packet->len = readlen;
 
  packet->data = malloc(readlen);
 
  if(!packet->data)
 
	      client->state = DISTREND_CLIENT_DEAD;
 
	      return 1;
 
	    }
 
	  client->expectlen = req->len + sizeof(struct distren_request);
 
	  distren_request_free(req);
 
	}
 
    }
 
  if(client->expectlen
 
     && client->inlen >= client->expectlen)
 
    {
 
      fprintf(stderr, "OOM!\n");
 
      /** essentially un-queue-ize the queue ;-) */
 
      distrend_packets_collapse(client->inmsgs, client->inlen);
 
      packet = q_front(client->inmsgs);
 

	
 
      if(distren_request_new_fromdata(&req, packet->data, packet->len))
 
	{
 
	  fprintf(stderr, "error handling data from client\n");
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      free(packet);
 
      return 1;
 
      if(packet->len - sizeof(struct distren_request) < req->len)
 
	{
 
	  fprintf(stderr, "error handling some data from client\n");
 
	  distren_request_free(req);
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	  return 1;
 
	}
 

	
 
      reqdata = malloc(req->len);
 
      if(!reqdata)
 
	{
 
	  fprintf(stderr, "OOM\n");
 

	
 
	  distren_request_free(req);
 
	  return 1;
 
	}
 
      memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len);
 
      memmove(packet->data,
 
	      ((void *)packet->data) + client->expectlen,
 
	      packet->len - client->expectlen);
 
      packet->len -= client->expectlen;
 

	
 
      client->inlen -= client->expectlen;
 
      client->expectlen = 0;
 

	
 
      (*handlereq)(client, req, reqdata, handlereqdata);
 
      distren_request_free(req);
 
    }
 
  memcpy(packet->data, buf, readlen);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_accept_fdset_prepare_data
 
{
 
  fd_set readfds;
 
  fd_set writefds;
 
  /**
 
     stores the highest FD number which is one less than the first
 
   arfgument to select(). So do select(maxfd + 1, ...)
 
  */
 
  int maxfd;
 
@@ -180,42 +292,44 @@ int distrend_accept_fdset_prepare(struct
 
     say.
 
   */
 
  if(!q_empty(client->outmsgs))
 
    FD_SET(client->sock, &data->writefds);
 
  
 
  return TRUE;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  fd_set *fdset;
 
  struct distrend_config *config;
 
  distrend_handle_request_t handlereq;
 
  void *handlereqdata;
 
  enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  if(!FD_ISSET(client->sock, data->fdset))
 
    /** continue iteration through the list */
 
    return TRUE;
 

	
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
 
	  __FILE__, __LINE__, client->sock,
 
	  (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "write");
 
	  (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ");
 

	
 
  switch(data->mode)
 
    {
 
    case DISTREND_ACCEPT_CLIENT_WRITE:
 
      if(q_empty(client->outmsgs))
 
	return TRUE;
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
@@ -237,33 +351,33 @@ int distrend_accept_client_proc(struct d
 
	  distrend_packet_free(packet);
 
	}
 
      else
 
	{
 
	  /**
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	}
 
      break;
 

	
 
    case DISTREND_ACCEPT_CLIENT_READ:
 
      distrend_handleread(data->config, client);
 
      distrend_handleread(data->config, client, data->handlereq, data->handlereqdata);
 
      break;
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients)
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
 
{
 
  int tmp;
 
  int newclientsock;
 

	
 
  struct distrend_accept_fdset_prepare_data fdsets;
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  FD_ZERO(&fdsets.readfds);
 
  FD_ZERO(&fdsets.writefds);
 
  FD_SET(config->listens->sock, &fdsets.readfds);
 
  fdsets.maxfd = config->listens->sock;
 
@@ -277,24 +391,26 @@ int distrend_accept(struct distrend_conf
 
      perror("select");
 

	
 
      return 1;
 
    }
 

	
 
  /**
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.config = config;
 

	
 
  travinfo.fdset = &fdsets.writefds;
 
  travinfo.handlereq = handlereq;
 
  travinfo.handlereqdata = handlereqdata;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  travinfo.fdset = &fdsets.readfds;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  if(FD_ISSET(config->listens->sock, &fdsets.readfds))
 
    {
 
@@ -359,24 +475,26 @@ int distrend_client_new(struct distrend_
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      return 1;
 
    }
 

	
 
  *client = malloc(sizeof(struct distrend_client));
 
  if(!*client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  (*client)->sock = sock;
 
  (*client)->state = state;
 
  (*client)->inlen = 0;
 
  (*client)->expectlen = 0;
 
  (*client)->inmsgs = q_init();
 
  (*client)->outmsgs = q_init();
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock);
 
@@ -426,29 +544,105 @@ int distrend_client_write(struct distren
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, towrite, msglen);
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data)
 
{
 
  char *towrite;
 
  int ret;
 
  size_t msglen;
 

	
 
  msglen = sizeof(struct distren_request) + req->len;
 

	
 
  towrite = malloc(msglen);
 
  if(!towrite)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(towrite, req, sizeof(struct distren_request));
 
  memcpy(towrite + sizeof(struct distren_request), data, req->len);
 

	
 
  ret = distrend_client_write(client, towrite, msglen);
 

	
 
  free(towrite);
 
  return ret;
 
}
 

	
 
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
 
{
 
  struct distrend_packet *packet;
 

	
 
  *lenread = 0;
 
  *toread = NULL;
 

	
 
  if(q_empty(client->inmsgs))
 
    return 1;
 

	
 
  packet = q_dequeue(client->inmsgs);
 
  *lenread = packet->len;
 
  *toread = packet->data;
 
  free(packet);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   collapse a QUEUE of struct distrend_packet with a collective length
 
   of len into a single struct packet.
 
 */
 
int distrend_packets_collapse(QUEUE *queue, size_t len)
 
{
 
  struct distrend_packet *newpacket;
 
  struct distrend_packet *oldpacket;
 

	
 
  size_t copiedlen;
 

	
 
  newpacket = malloc(sizeof(struct distrend_packet));
 
  if(!newpacket)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  
 
  newpacket->data = malloc(len);
 
  if(!newpacket->data)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      perror("malloc");
 
      free(newpacket);
 

	
 
      return 1;
 
    }
 

	
 
  for(copiedlen = 0;
 
      copiedlen < len && !q_empty(queue);
 
      )
 
    {
 
      oldpacket = q_dequeue(queue);
 

	
 
      memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len);
 

	
 
      copiedlen += oldpacket->len;
 
      distrend_packet_free(oldpacket);
 
    }
 
  newpacket->len = copiedlen;
 

	
 
  if(copiedlen < len || !q_empty(queue))
 
    {
 
      fprintf(stderr, "inconsistency!\n");
 
      return 1;
 
    }
 

	
 
  q_enqueue(queue, newpacket, 0);
 

	
 
  return 0;
 
}
src/server/listen.h
Show inline comments
 
@@ -16,73 +16,88 @@
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
struct distrend_clientset;
 
struct distrend_listen;
 
struct distrend_client;
 

	
 
#ifndef _DISTREN_LISTEN_H
 
#define _DISTREN_LISTEN_H
 

	
 
#include "distrend.h"
 
#include "common/protocol.h"
 

	
 
#include <queue.h>
 

	
 
enum distrend_client_state
 
  {
 
    DISTREND_CLIENT_PREAUTH,
 
    DISTREND_CLIENT_GOODDOGGY,
 
    DISTREND_CLIENT_BADBOY,
 
    DISTREND_CLIENT_DEAD
 
  };
 

	
 
struct distrend_listen
 
{
 
  int port;
 
  int sock;
 
};
 

	
 
struct distrend_client
 
{
 
  int sock;
 
  enum distrend_client_state state;
 

	
 
  size_t inlen; /*< number of bytes waiting to be processed */
 
  size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */
 
  QUEUE *inmsgs;
 
  QUEUE *outmsgs;
 
};
 

	
 

	
 

	
 
typedef int(*distrend_handle_request_t)(struct distrend_client *client, struct distren_request *req, void *reqdata, void *data);
 

	
 

	
 
/**
 
   initializes the listens and clientset
 
   @param config the configuration from distrend
 
   @param clients a pointer to a struct distrend_clientset pointer which will be set to memory allocated for the clientset
 
 */
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients);
 

	
 
/**
 
   checks states of the sockets I'm managing. If there are any new connections,
 
   or activity on any sockets, I'll call the appropriate function.
 
   I will block until there is some sort of activity, including
 
   signals. If you want to cleanly shut down, it's best to register
 
   signal handlers somewhere
 
 */
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients);
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata);
 

	
 
/**
 
   cleans listening socket. Unnecessary for a working server, currently a stub.
 
 */
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients);
 

	
 
/**
 
   writes message to client.
 
   @param towrite the caller is expected to free this string. This function will
 
   strdup() it, in essence.
 
 */
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen);
 

	
 
/**
 
   writes request to client.
 
   @param client client to write to
 
   @param req the request struct. caller must free this.
 
   @param data the data of the request which is req->len bytes long. caller must free this.
 
 */
 
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client();
 

	
 
#endif
0 comments (0 inline, 0 general)