Changeset - 27eb6c6418f6
[Not reviewed]
default
0 9 0
Nathan Brink (binki) - 15 years ago 2010-07-21 23:03:26
ohnobinki@ohnopublishing.net
Handle resolving hostnames gracefully.
Fix support for hostname:port in remoteio.
Added timeout parameter to multiio_poll().
Fix stupid mistakes in distrenslave's read handler.
The server and client may not sit and play table tennis at last :-D.
9 files changed with 55 insertions and 34 deletions:
0 comments (0 inline, 0 general)
src/common/multiio.c
Show inline comments
 
@@ -165,32 +165,32 @@ int multiio_poll_invoke_handlers(struct 
 
  if(!event)
 
    return TRUE;
 

	
 
  handler_info->handler(travinfo->multiio,
 
			travinfo->pollfd->fd,
 
			event,
 
			handler_info->handler_data,
 
			travinfo->socket_info->socket_data);
 

	
 
  return TRUE;
 
}
 

	
 
int multiio_poll(multiio_context_t context)
 
int multiio_poll(multiio_context_t context, int timeout)
 
{
 
  size_t counter;
 
  struct multiio_poll_travinfo travinfo;
 

	
 
  int ret;
 

	
 
  ret = poll(context->pollfds, context->nfds, -1);
 
  ret = poll(context->pollfds, context->nfds, timeout);
 
  if(ret == -1)
 
    {
 
      perror("poll");
 
    }
 

	
 
  for(counter = 0; counter < context->nfds; counter ++)
 
    if(context->pollfds[counter].revents)
 
      {
 
	travinfo.multiio = context;
 
	travinfo.socket_info = &context->socket_infos[counter];
 
	travinfo.pollfd = &context->pollfds[counter];
 
	list_traverse(context->socket_types[context->socket_infos[counter].socket_type].type_handlers,
src/common/multiio.h
Show inline comments
 
@@ -58,29 +58,33 @@ typedef struct multiio_context *multiio_
 
   Initializes and returns a multiio_context.
 
 */
 
multiio_context_t multiio_context_new();
 

	
 
/**
 
   Destroys and frees a multiio_context.
 

	
 
   @param context the context to destroy.
 
 */
 
int multiio_context_free(multiio_context_t context);
 

	
 
/**
 
   Call poll() on the registered sockets and react to events accordingly.
 

	
 
   @param context the context which sockets and handlers were registered with
 
 * Call poll() on the registered sockets and react to events accordingly.
 
 *
 
 * @param context the context which sockets and handlers were registered with
 
 *
 
 * @param timeout see poll(3p). The number of milliseconds to wait for
 
 * a request before returning. 0 for immediate return if no activity
 
 * and -1 for waiting until some activity happens before returning.
 
 */
 
int multiio_poll(multiio_context_t context);
 
int multiio_poll(multiio_context_t context, int timeout);
 

	
 
/**
 
   Registers a new socket type/classification for use with this multiio_context.
 

	
 
   @param context the multiio context to register this type with.
 
   @param new_type a pointer to a type which this function will set to the value of
 
   the newly registered type. The caller should record this type and use it for
 
   future multiio_event_handler_register() calls, etc.
 
   @return 0 on success, other on error
 
 */
 
int multiio_socket_type_register(multiio_context_t context, multiio_socket_type_t *new_type);
 

	
src/common/protocol.c
Show inline comments
 
@@ -67,30 +67,45 @@ int distren_request_send(struct remoteio
 

	
 
  write_err = remoteio_write(rem, packet, len);
 

	
 
  free(packet);
 

	
 
  return 0;
 
}
 

	
 
int distren_request_new_fromdata(struct distren_request **req, void *data, size_t len)
 
{
 
  struct distren_request *newreq;
 

	
 
#if 0
 
  size_t counter;
 
  uint32_t debugtmp;
 
#endif
 

	
 
  if(len < sizeof(struct distren_request))
 
    return 1;
 

	
 
  if( ((struct distren_request *)data)->magic != DISTREN_REQUEST_MAGIC )
 
    {
 
      fprintf(stderr, "packet doesn't match magic stuffs\n");
 
      fprintf(stderr, "packet doesn't match magic stuffs");
 
#if 0
 
      fputs("\n\tmagic=`", stderr);
 
      debugtmp = DISTREN_REQUEST_MAGIC;
 
      for(counter = 0; counter < sizeof(uint32_t); counter ++)
 
	putc(((char *)&debugtmp)[counter], stderr);
 
      fputs("'\n\t`", stderr);
 
      for(counter = 0; counter < sizeof(struct distren_request); counter ++)
 
	putc(((char *)data)[counter], stderr);
 
      fputs("'\n", stderr);
 
#endif
 
      return 1;
 
    }
 

	
 
  newreq = malloc(sizeof(struct distren_request));
 
  if(!newreq)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(newreq, data, sizeof(struct distren_request));
 
  (*req) = newreq;
src/common/remoteio.c
Show inline comments
 
@@ -730,55 +730,61 @@ int _remoteio_tcp_open(struct remoteio *
 
{
 
  int tmp;
 
  int tmp2;
 

	
 
  int sock;
 

	
 
  char *hostname;
 
  char *port;
 

	
 
  struct addrinfo addrinfo_hints;
 
  struct addrinfo *addrinfo_res;
 

	
 
  static char *default_port = REMOTEIO_DEFAULT_PORT;
 

	
 
  /**
 
     only hostname should be free()-ed, not port,
 
     because both are from the same block of malloc()-ed
 
     memory
 
   */
 
  hostname = strdup(server->hostname);
 
  for(port = hostname;
 
      *port && *port != ':';
 
      port ++)
 
    ;
 
  if(*port)
 
    {
 
      *port = '\0';
 
      port ++;
 
    }
 
  else
 
    port = REMOTEIO_DEFAULT_PORT;
 
    port = default_port;
 

	
 
  memset(&addrinfo_hints, '\0', sizeof(struct addrinfo));
 
  addrinfo_hints.ai_family = AF_UNSPEC;
 
#ifdef _WIN32
 
  /* windows lacks stuff documented in POSIX, I guess :-( */
 
  addrinfo_hints.ai_flags = 0;
 
#else
 
  addrinfo_hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
 
#endif
 
  addrinfo_hints.ai_socktype = SOCK_STREAM;
 

	
 
  tmp = getaddrinfo(server->hostname, port, &addrinfo_hints, &addrinfo_res);
 
  tmp = getaddrinfo(hostname, port, &addrinfo_hints, &addrinfo_res);
 
  if(tmp)
 
    {
 
    fprintf(stderr, "error resolving %s:%s: %s\n", server->hostname, port, gai_strerror(tmp));
 
      free(hostname);
 
      return 1;
 
    }
 
  fprintf(stderr, "connecting to %s[%s]:%s\n", server->hostname, addrinfo_res->ai_canonname, port);
 

	
 
  free(hostname);
 

	
 
  sock = socket(addrinfo_res->ai_family, SOCK_STREAM, addrinfo_res->ai_protocol);
 
  if(sock == -1)
 
    {
 
      perror("socket");
 
      freeaddrinfo(addrinfo_res);
 
  
 
      return 1;
 
    }
src/common/request.c
Show inline comments
 
@@ -19,26 +19,26 @@
 

	
 
#include "common/protocol.h"
 

	
 
#include <stdlib.h>
 
#include <string.h>
 

	
 
int distren_request_free_with_data(struct distren_request *req, void *data)
 
{
 
  free(data);
 
  return distren_request_free(req);
 
}
 

	
 
uint32_t distren_request_poing(struct distren_request **req, void **data, short is_ping, const void *poing_cookie, size_t poing_data_len)
 
int distren_request_poing(struct distren_request **req, void **data, short is_ping, const void *poing_cookie, size_t poing_data_len)
 
{
 
  enum distren_request_type type;
 

	
 
  if(is_ping)
 
    type = DISTREN_REQUEST_PING;
 
  else
 
    type = DISTREN_REQUEST_PONG;
 
  distren_request_new(req, poing_data_len, type);
 
  (*data) = malloc(poing_data_len);
 
  memcpy(*data, poing_cookie, poing_data_len);
 

	
 
  return (uint32_t)poing_data_len;
 
  return 0;
 
}
src/server/distrend.c
Show inline comments
 
@@ -178,25 +178,25 @@ int main(int argc, char *argv[])
 
	}
 
    }
 

	
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_VERSION, &distrend_handle_version);
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  general_info.config->die = 0;
 
  while(!general_info.config->die)
 
    {
 
      int clientrequest = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      multiio_poll(multiio);
 
      multiio_poll(multiio, 15000);
 

	
 
      tabletennis_serve(general_info.config->listens->tabletennis);
 

	
 
      /* Run the watchdog, @TODO: like every 10 mins or something */
 
      frame_watchdog(general_info.conn);
 

	
 
      struct frameset frame;
 
      struct distrenjob *job;
 
      distrenjob_new(&job);
 

	
 
      memset(&frame, '\0', sizeof(frame));
 

	
 
@@ -570,25 +570,25 @@ int interactiveTest(int test, multiio_co
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       start_frame(geninfo->conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Started Frame!\n");
 
       break;
 

	
 
     case 5:
 
       while(1)
 
	 {
 
	   multiio_poll(multiio);
 
	   multiio_poll(multiio, 15000);
 
	   tabletennis_serve(geninfo->config->listens->tabletennis);
 
	 }
 
       break;
 
       
 
     case 0:
 
       test = 0;
 
       break;
 
     default:
 
       fprintf(stderr, "Invalid input, please try again.\n");
 
       break;
 
     }
 
   }
src/server/listen.c
Show inline comments
 
@@ -52,29 +52,24 @@ static void distrend_listen_remoteio_han
 
int listen_handle_accept(multiio_context_t multiio,
 
			 int fd,
 
			 short revent,
 
			 struct distrend_listens *listens,
 
			 int *port);
 
int listen_handle_error(multiio_context_t multiio,
 
			int fd,
 
			short revent,
 
			struct distrend_listens *listens,
 
			int *port);
 

	
 
/*** TO BE MOVED TO REMOTEIO */
 
int listen_handle_read(multiio_context_t multiio,
 
		       int fd,
 
		       short revent,
 
		       struct distrend_listens *listens,
 
		       struct distrend_client *client);
 
int listen_handle_existence(multiio_context_t multiio,
 
			    int fd,
 
			    short revent,
 
			    struct distrend_listens *listens,
 
			    struct distrend_client *client);
 

	
 
struct distrend_listens *distrend_listens_new(multiio_context_t multiio,
 
					      struct general_info *geninfo,
 
					      struct options_common *opts)
 
{
 
  struct distrend_listens *listens;
 

	
 
@@ -186,27 +181,24 @@ int listen_handle_error(multiio_context_
 
			int *port)
 
{
 
  fprintf(stderr, "Port %d experienced an error or is closed. Closing it.\n", *port);
 
  multiio_socket_del(listens->multiio, fd);
 
  close(fd);
 

	
 
  free(port);
 

	
 
  return 0;
 
}
 

	
 

	
 
//int listen_handle_read(struct distrend_listens *listens,
 
//			struct distrend_client *client)
 

	
 
/**
 
 * an important thing to handle
 
 *
 
 * @deprecated to be replaced with table_tennis
 
 */
 
int listen_handle_existence(multiio_context_t multiio,
 
			    int fd,
 
			    short revent,
 
			    struct distrend_listens *listens,
 
			    struct distrend_client *client)
 
{
 
  /**
 
@@ -340,24 +332,27 @@ int listen_handle_accept(multiio_context
 
 /**
 
  * Handle read events from remoteio, remoteio_read_handle_func_t.
 
  *
 
  * This func requires that someone called remoteio_generic_data_set(remoteio_opts, listens);
 
  *
 
  * @param client the client associated with this remoteio instance.
 
  */
 
size_t distrend_listen_read_handle(struct remoteio *rem, struct distrend_listens *listens, void *buf, size_t len, struct distrend_client *client)
 
 {
 
   struct distren_request *req;
 
   void *reqdata;
 

	
 
   size_t used_len;
 

	
 
   used_len = 0;
 
   /**
 
    * Manage input, etc.
 
    */
 
   if(client->expectlen == 0)
 
     {
 
       /* search out header from input so far */
 
       if(len > sizeof(struct distren_request))
 
	 {
 
	   if(distren_request_new_fromdata(&req, buf, len))
 
	     {
 
	       fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
 

	
 
@@ -402,34 +397,35 @@ size_t distrend_listen_read_handle(struc
 

	
 
      reqdata = malloc(req->len);
 
      if(!reqdata)
 
	{
 
	  fprintf(stderr, "OOM\n");
 

	
 
	  distren_request_free(req);
 
	  return 1;
 
	}
 
      memcpy(reqdata, ((void *)buf) + sizeof(struct distren_request), req->len);
 

	
 
      client->expectlen = 0;
 
      used_len = sizeof(struct distren_request) + req->len;
 

	
 
      distrend_dispatch_request(listens, rem, client, req, reqdata);
 
      free(reqdata);
 
      distren_request_free(req);
 

	
 
      /* I actually just used recursion in non-LISP code! :-D */
 
      return req->len + distrend_listen_read_handle(rem, listens, buf + req->len, len - req->len, client);
 
      return used_len + distrend_listen_read_handle(rem, listens, buf + req->len, len - req->len, client);
 
    }
 

	
 
  return 0;
 
  return used_len;
 
}
 

	
 
/**
 
 * Handle cleaning up after remoteio_close() has been called. This includes cleaning up the struct distrend_client and stuffs
 
 */
 
void distrend_listen_remoteio_handle_close(struct distrend_listens *listens, struct distrend_client *client)
 
{
 
  /*
 
   * remoteio handles removing itself from multiio for us. We just
 
   * have to clean up tabletennis and the struct itself.
 
   */
 

	
 
@@ -563,30 +559,29 @@ struct distrend_dispatch_request_data
 
/**
 
   traversal function for distrend_dispatch_request().
 
 */
 
int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info)
 
{
 
  if(handler_info->request_type == data->req->type)
 
    (*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data);
 

	
 
  return TRUE;
 
}
 

	
 
/**
 
   helper for listen_handle_read() which looks up the correct
 
   helper for distrend_listen_read_handle() which looks up the correct
 
   request handler and handles handing the the request to the
 
   handler. :-p
 
 */
 
int distrend_dispatch_request(struct distrend_listens *listens, struct remoteio *rem, struct distrend_client *client, struct distren_request *req, void *reqdata)
 
{
 
  struct distrend_dispatch_request_data data;
 

	
 
  data.geninfo = listens->geninfo;
 
  data.client = client;
 
  data.req = req;
 
  data.req_data = reqdata;
 

	
 
  list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE);
 

	
 
  return 0;
 
}
 

	
src/server/slave.c
Show inline comments
 
@@ -221,25 +221,25 @@ int main(int argc, char *argv[])
 
          _web_setrenderpower(slavekey, password, renderPower);
 
          return 0;
 
        }
 
    }
 

	
 
  if(!DEBUG)
 
    fprintf(stderr, "Running..");
 

	
 

	
 
  // Main loop
 
  while(!slave_state.quit)
 
    {
 
      multiio_poll(multiio);
 
      multiio_poll(multiio, 15000);
 

	
 
      if(slave_state.quit)
 
	break;
 

	
 
    // request work
 
    fprintf(stderr, "Waiting...\n");
 
    haveWork = getwork(slave_state.rem, &jobnum, &framenum);
 

	
 
    /* If we got a frame */
 
    if(haveWork)
 
      {
 
        fprintf(stderr,"Got work from server...\n");
 
@@ -384,24 +384,25 @@ static size_t distren_slave_remoteio_rea
 
						 void *buf,
 
						 size_t len,
 
						 void *data)
 
{
 
  struct slave_state *slave_state = (struct slave_state *)data;
 

	
 
  struct distren_request *req, *my_req;
 
  void *req_data, *my_req_data;
 

	
 
  size_t to_return;
 

	
 
  size_t counter;
 
  fprintf(stderr, "expected to eat %d bytes\n", len);
 

	
 
  /* to_return shall record how much of buf we've eaten already */
 
  to_return = 0;
 

	
 
  if(!slave_state->copyright_done)
 
    {
 
      putchar('\n');
 
      /* we have to flush through data until we hit a newline */
 
      for(counter = 0;
 
	  counter + slave_state->expectlen < 256 && counter < len && ((char *)buf)[counter] != '\n';
 
	  counter ++)
 
	{
 
@@ -424,76 +425,77 @@ static size_t distren_slave_remoteio_rea
 
	  len -= counter;
 
	}
 
      if(!slave_state->copyright_done)
 
	return to_return;
 
    }
 

	
 
  while(1)
 
    {
 
      /* if we haven't read a full header yet: */
 
      if(!slave_state->expectlen)
 
	{
 
	  if(len < sizeof(struct distren_request))
 
	    return 0;
 
	    return to_return;
 

	
 
	  /* figure out how much we need to read in before we can get anywhere */
 
	  if(distren_request_new_fromdata(&req, buf, len))
 
	    {
 
	      fprintf(stderr, "Failing to interpret data from server, exiting\n");
 
	      slave_state->quit = 1;
 
	      return 0;
 
	      return to_return;
 
	    }
 
	  slave_state->expectlen = sizeof(struct distren_request) + req->len;
 
	  distren_request_free(req);
 
	}
 

	
 
      if(slave_state->expectlen
 
	    && slave_state->expectlen <= len)
 
	{
 
	  distren_request_new_fromdata(&req, buf, len);
 
	  req_data = buf + sizeof(struct distren_request);
 

	
 
	  switch((enum distren_request_type)req->type)
 
	    {
 
	    case DISTREN_REQUEST_VERSION:
 
	      fprintf(stderr, "The server runs ");
 
	      for(counter = 0; counter < req->len; counter ++)
 
		putc(((char *)req_data)[counter], stderr);
 
	      putc('\n', stderr);
 
	      break;
 

	
 
	    case DISTREN_REQUEST_PING:
 
	      fprintf(stderr, "PONG ! :-D\n");
 

	
 
	      distren_request_poing(&my_req, &my_req_data, 0, req_data, req->len);
 
	      remoteio_write(slave_state->rem, &my_req, sizeof(struct distren_request));
 
	      remoteio_write(slave_state->rem, &my_req_data, req->len);
 
	      remoteio_write(slave_state->rem, my_req, sizeof(struct distren_request));
 
	      remoteio_write(slave_state->rem, my_req_data, req->len);
 
	      distren_request_free_with_data(my_req, my_req_data);
 
	      break;
 

	
 
	    case DISTREN_REQUEST_DISCONNECT:
 
	      /* hopefully this ends up being a useful message... */
 
	      printf("You have been disconnected: \"");
 
	      for(counter = 0; counter < req->len; counter ++)
 
		putchar(((char *)buf)[counter]);
 
	      putchar('"');
 
	      putchar('\n');
 
	      break;
 

	
 
	    default:
 
	      fprintf(stderr, "something\n");
 
	      break;
 
	    }
 

	
 
	  counter = req->len + sizeof(struct distren_request);
 

	
 
	  distren_request_free(req);
 
	  slave_state->expectlen = 0;
 

	
 
	  counter = req->len + sizeof(struct distren_request);
 

	
 
	  len -= counter;
 
	  buf += counter;
 
	  to_return += counter;
 
	}
 
    }
 

	
 
  fprintf(stderr, "ate %d bytes\n", to_return);
 
  return to_return;
 
}
src/server/tabletennis.c
Show inline comments
 
@@ -74,37 +74,36 @@ int tabletennis_add_client(tabletennis_t
 

	
 
  return 0;
 
}
 

	
 
int tabletennis_serve(tabletennis_t tabletennis)
 
{
 
  struct timespec time_now;
 
  struct distrend_client *client;
 
  time_t time_next_check;
 

	
 
  struct distren_request *req;
 
  void *req_data;
 
  size_t req_len;
 

	
 
  clock_gettime(CLOCK_MONOTONIC, &time_now);
 

	
 
  time_next_check = time_now.tv_sec + tabletennis->pong_time;
 
  for(client = q_front(tabletennis->clients_to_ping);
 
      client && client->tabletennis_client.time_next_check < time_now.tv_sec;
 
      client = q_front(tabletennis->clients_to_ping))
 
    {
 
      q_dequeue(tabletennis->clients_to_ping);
 

	
 
      /* use time_next_check as the ping data */
 
      req_len = distren_request_poing(&req, &req_data, 1, &time_next_check, sizeof(time_next_check));
 
      distren_request_poing(&req, &req_data, 1, &time_next_check, sizeof(time_next_check));
 
      distrend_client_write_request(client, req, req_data);
 
      distren_request_free_with_data(req, req_data);
 

	
 
      client->tabletennis_client.state = TABLETENNIS_NEED_PONG;
 

	
 
      client->tabletennis_client.time_next_check = time_next_check;
 

	
 
      q_enqueue(tabletennis->clients_need_pong, client, 0);
 
    }
 

	
 
  time_next_check = time_now.tv_sec + tabletennis->ping_interval;
 
  for(client = q_front(tabletennis->clients_need_pong);
0 comments (0 inline, 0 general)