Changeset - 974e60d6a71b
[Not reviewed]
default
0 3 0
Nathan Brink (binki) - 15 years ago 2010-03-25 11:08:38
ohnobinki@ohnopublishing.net
Server-side sockets partial implementation
3 files changed with 321 insertions and 30 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
@@ -511,7 +511,16 @@ int interactiveTest(int test, struct gen
 
     case 5:
 
       distrend_listen(geninfo->config, &clients);
 
       while(1)
 
	 {
 
	 distrend_accept(geninfo->config, clients);
 
	   /*
 
	     code for reading data from clients either goes here or in distrend_accept().
 
	     it might make sense for us to just pass the distrend_accept() function a
 
	     callback which can handle packets or to have a generic packet handling
 
	     subsystem which gathers data into groups defined by by packet.h and then
 
	     passed onto the callback.
 
	   */
 
	 }
 
       break;
 
       
 
     case 0:
src/server/listen.c
Show inline comments
 
@@ -19,6 +19,8 @@
 

	
 
#include "listen.h"
 

	
 
#include <errno.h>
 
#include <fcntl.h>
 
#include <list.h>
 
#include <malloc.h>
 
#include <netinet/in.h>
 
@@ -27,6 +29,7 @@
 
#include <sys/types.h>
 
#include <sys/select.h>
 
#include <sys/socket.h>
 
#include <unistd.h>
 

	
 
/* local */
 

	
 
@@ -34,16 +37,19 @@ struct distrend_clientset
 
{
 
  LIST *clients;
 

	
 
  /*
 
    for select()
 
   */
 
  fd_set readfds;
 
  fd_set writefds;
 
  int nfds;
 
};
 

	
 
struct distrend_packet
 
{
 
  size_t len;
 
  char *data;
 
};
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 
void distrend_packet_free(struct distrend_packet *packet);
 
int distrend_makesocknonblock(int sock);
 

	
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
 
{
 
@@ -61,8 +67,9 @@ int distrend_listen(struct distrend_conf
 
  *clients = malloc(sizeof(struct distrend_clientset));
 

	
 
  (*clients)->clients = list_init();
 
  (*clients)->nfds = 1;
 

	
 
  sockaddr.sin6_port = htonl(4050);
 
  sockaddr.sin6_port = htons(4050);
 

	
 
  config->listens->sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(config->listens->sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
@@ -75,12 +82,20 @@ int distrend_listen(struct distrend_conf
 
    }
 

	
 
  tmp = listen(config->listens->sock, 1);
 
  if(tmp == -1)
 
    {
 
      perror("listen");
 
      free(*clients);
 

	
 
  FD_ZERO(&(*clients)->readfds);
 
  FD_ZERO(&(*clients)->writefds);
 
      return 1;
 
    }
 

	
 
  /** accept()able sockets are considered ``readable'' */
 
  FD_SET(config->listens->sock, &(*clients)->readfds);
 
  tmp = distrend_makesocknonblock(config->listens->sock);
 
  if(tmp)
 
    {
 
      free(*clients);
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 
@@ -88,27 +103,152 @@ int distrend_listen(struct distrend_conf
 
int distrend_handleread(struct distrend_config *config,
 
		    struct distrend_client *client)
 
{
 
  fprintf(stderr, "%s:%d: STUB: I'm supposed to read data from the client\n",
 
	  __FILE__, __LINE__);
 
  struct distrend_packet *packet;
 
  ssize_t readlen;
 
  char buf[8192];
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  packet->len = 0;
 
  packet->data = NULL;
 

	
 
  readlen = read(client->sock, buf, sizeof(buf));
 
  if(readlen == -1)
 
    {
 
      perror("read");
 
      switch(errno)
 
	{
 
	case EINTR:
 
	case EAGAIN:
 
	  break;
 

	
 
	default:
 
	  client->state = DISTREND_CLIENT_DEAD;
 

	
 
	  free(packet);
 
	  return 1;
 
	}
 
    }
 
  if(readlen == 0)
 
    /* handle EOF */
 
    {
 
      client->state = DISTREND_CLIENT_DEAD;
 

	
 
      free(packet);
 
      return 1;
 
    }
 

	
 
  packet->len = readlen;
 
  packet->data = malloc(readlen);
 
  if(!packet->data)
 
    {
 
      fprintf(stderr, "OOM!\n");
 

	
 
      free(packet);
 
      return 1;
 
    }
 
  memcpy(packet->data, buf, readlen);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_accept_fdset_prepare_data
 
{
 
  fd_set readfds;
 
  fd_set writefds;
 
  /**
 
     stores the highest FD number which is one less than the first
 
   arfgument to select(). So do select(maxfd + 1, ...)
 
  */
 
  int maxfd;
 
};
 
int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data,
 
				  struct distrend_client *client)
 
{
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  if(client->sock > data->maxfd)
 
    data->maxfd = client->sock;
 
  FD_SET(client->sock, &data->readfds);
 
  /**
 
     Only select on an outgoing socket if we have something to
 
     say.
 
   */
 
  if(!q_empty(client->outmsgs))
 
    FD_SET(client->sock, &data->writefds);
 
  
 
  return TRUE;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  fd_set *fdset;
 
  struct distrend_config *config;
 
  enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  struct distrend_packet *packet;
 
  ssize_t written_amount;
 

	
 
  if(client->state == DISTREND_CLIENT_DEAD)
 
    return TRUE;
 

	
 
  if(!FD_ISSET(client->sock, data->fdset))
 
    /** continue iteration through the list */
 
    return TRUE;
 

	
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d has data waiting\n",
 
	  __FILE__, __LINE__, client->sock);
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
 
	  __FILE__, __LINE__, client->sock,
 
	  (data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "write");
 

	
 
  switch(data->mode)
 
    {
 
    case DISTREND_ACCEPT_CLIENT_WRITE:
 
      if(q_empty(client->outmsgs))
 
	return TRUE;
 

	
 
      packet = q_front(client->outmsgs);
 
      written_amount = write(client->sock, packet->data, packet->len);
 
      /**
 
	 Disconnect in case of write error.
 
       */
 
      if(written_amount == -1)
 
	{
 
	  perror("write");
 
	  /*
 
	    distrend_client_free();
 

	
 
	    Until liblist has the ability to delete nodes
 
	    during traversal, we'll have to free the client
 
	    during a client-list cleanup/pruning cycle.
 
	  */
 
	  client->state = DISTREND_CLIENT_DEAD;
 
	}
 
      if(packet->len == written_amount)
 
	{
 
	  q_dequeue(client->outmsgs);
 
	  distrend_packet_free(packet);
 
	}
 
      else
 
	{
 
	  /**
 
	     shifting seems the simplest solution.
 
	   */
 
	  packet->len -= written_amount;
 
	}
 
      break;
 

	
 
    case DISTREND_ACCEPT_CLIENT_READ:
 
  distrend_handleread(data->config, client);
 
      break;
 
    }
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
@@ -117,13 +257,21 @@ int distrend_accept_client_proc(struct d
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients)
 
{
 
  int tmp;
 
  fd_set readfds;
 
  fd_set writefds;
 
  int newclientsock;
 

	
 
  struct distrend_accept_fdset_prepare_data fdsets;
 
  struct distrend_accept_client_proc_data travinfo;
 
  struct distrend_client *newclient;
 

	
 
  memcpy(&readfds, &clients->readfds, sizeof(fd_set));
 
  memcpy(&writefds, &clients->writefds, sizeof(fd_set));
 
  FD_ZERO(&fdsets.readfds);
 
  FD_ZERO(&fdsets.writefds);
 
  FD_SET(config->listens->sock, &fdsets.readfds);
 
  fdsets.maxfd = config->listens->sock;
 

	
 
  tmp = select(clients->nfds, &readfds, &writefds, NULL, (struct timeval *)NULL);
 
  list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR);
 

	
 
  fprintf(stderr, "select()...");
 
  tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 
@@ -132,10 +280,57 @@ int distrend_accept(struct distrend_conf
 
    }
 

	
 
  /**
 
     deal with all sockets that have data waiting
 
     Deal first with data waiting to be sent and then
 
     with input data.
 
   */
 
  travinfo.config = config;
 

	
 
  travinfo.fdset = &fdsets.writefds;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  travinfo.fdset = &fdsets.readfds;
 
  travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
 
  list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 
  /**
 
     Handle new connections.
 
   */
 
  list_traverse(clients->clients, &readfds, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 
  if(FD_ISSET(config->listens->sock, &fdsets.readfds))
 
    {
 
      newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
 
      if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH))
 
	{
 
	  fprintf(stderr, "error allocating client struct\n");
 
	  return 1;
 
	}
 
      clients->nfds ++;
 
      list_insert_after(clients->clients, newclient, 0);
 
      fprintf(stderr, "accepted new connection\n");
 
      distrend_client_write(newclient, "hello\n", 6);
 
    }
 

	
 
  /**
 
     Until liblist supports deleting nodes during a traversal,
 
     we clean dead clients here:
 
   */
 
  list_mvfront(clients->clients);
 
  newclient = list_curr(clients->clients);
 
  while(newclient)
 
    {
 
      if(newclient->state == DISTREND_CLIENT_DEAD)
 
	{
 
	  distrend_client_free(newclient);
 
	  list_remove_curr(clients->clients);
 
	  fprintf(stderr, "removed dead connection\n");
 
	}
 
      list_mvnext(clients->clients);
 
      /* provide for termination of this loop */
 
      if(newclient == list_curr(clients->clients))
 
	newclient = NULL;
 
      else
 
	newclient = list_curr(clients->clients);
 
    }
 

	
 
  return 0;
 
}
 
@@ -156,6 +351,15 @@ void remotio_send_to_client(struct distr
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
 
{
 
  int tmp;
 

	
 
  tmp = distrend_makesocknonblock(sock);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
 
      return 1;
 
    }
 

	
 
  *client = malloc(sizeof(struct distrend_client));
 
  if(!*client)
 
    {
 
@@ -164,17 +368,87 @@ int distrend_client_new(struct distrend_
 
    }
 
  (*client)->sock = sock;
 
  (*client)->state = state;
 
  (*client)->inmsgs = list_init();
 
  (*client)->outmsgs = list_init();
 
  (*client)->inmsgs = q_init();
 
  (*client)->outmsgs = q_init();
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  list_free(client->inmsgs, (void *)LIST_DEALLOC);
 
  list_free(client->outmsgs, (void *)LIST_DEALLOC);
 
  q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
 
  q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
 

	
 
  close(client->sock);
 
  
 
  return 0;
 
}
 

	
 
  fprintf(stderr, "%s:%d: stub!", __FILE__, __LINE__);
 
void distrend_packet_free(struct distrend_packet *packet)
 
{
 
  free(packet->data);
 
  free(packet); 
 
}
 

	
 
int distrend_makesocknonblock(int sock)
 
{
 
  int fdstatus;
 
  int tmp;
 

	
 
  fdstatus = fcntl(sock, F_GETFL);
 
  fdstatus |= O_NONBLOCK;
 
  tmp = fcntl(sock, F_SETFL, fdstatus);
 
  if(tmp == -1)
 
    {
 
      perror("fcntl");
 
  return 1;
 
}
 
  
 
  return 0;
 
}
 

	
 

	
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen)
 
{
 
  struct distrend_packet *packet;
 

	
 
  packet = malloc(sizeof(struct distrend_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  packet->len = msglen;
 
  packet->data = malloc(msglen);
 
  if(!packet->data)
 
    {
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, towrite, msglen);
 

	
 
  q_enqueue(client->outmsgs, packet, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
 
{
 
  struct distrend_packet *packet;
 

	
 
  *lenread = 0;
 
  *toread = NULL;
 

	
 
  if(q_empty(client->inmsgs))
 
    return 1;
 

	
 
  packet = q_dequeue(client->inmsgs);
 
  *lenread = packet->len;
 
  *toread = packet->data;
 
  free(packet);
 

	
 
  return 0;
 
}
src/server/listen.h
Show inline comments
 
@@ -26,13 +26,14 @@ struct distrend_client;
 

	
 
#include "distrend.h"
 

	
 
#include <list.h>
 
#include <queue.h>
 

	
 
enum distrend_client_state
 
  {
 
    DISTREND_CLIENT_PREAUTH,
 
    DISTREND_CLIENT_GOODDOGGY,
 
    DISTREND_CLIENT_BADBOY
 
    DISTREND_CLIENT_BADBOY,
 
    DISTREND_CLIENT_DEAD
 
  };
 

	
 
struct distrend_listen
 
@@ -45,8 +46,8 @@ struct distrend_client
 
{
 
  int sock;
 
  enum distrend_client_state state;
 
  LIST *inmsgs;
 
  LIST *outmsgs;
 
  QUEUE *inmsgs;
 
  QUEUE *outmsgs;
 
};
 

	
 

	
 
@@ -73,6 +74,13 @@ int distrend_accept(struct distrend_conf
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients);
 

	
 
/**
 
   writes message to client.
 
   @param towrite the caller is expected to free this string. This function will
 
   strdup() it, in essence.
 
 */
 
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client();
0 comments (0 inline, 0 general)