Changeset - 0fa8dda72d6a
[Not reviewed]
default
0 3 0
Nathan Brink (binki) - 15 years ago 2010-08-09 23:20:05
ohnobinki@ohnopublishing.net
Working job unpacking.
3 files changed with 37 insertions and 19 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "common/config.h"
 

	
 
#include "distrenjob.h"
 
#include "listen.h"
 
#include "mysql.h"
 
#include "slavefuncs.h"
 
#include "user_mgr.h"
 

	
 
#include "common/asprintf.h"
 
#include "common/execio.h"
 
#include "common/options.h"
 
#include "common/protocol.h"
 
#include "common/request.h"
 

	
 
#include <arpa/inet.h>
 
#include <confuse.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 

	
 
/* ******************* Structs ************************ */
 
struct general_info
 
{
 
  struct distrenjob head;
 
  distrend_mysql_conn_t conn;
 

	
 
  struct distrend_config *config;
 

	
 
  struct
 
  {
 
    /** general_info.xml */
 
    char *geninfo;
 

	
 
    /**
 
     * \brief Where to store the user listing.
 
     */
 
    char *userlist;
 

	
 
    /**
 
     * where to store in-progress uploads or things that should
 
     * otherwise be on the same filesystem as the rest of the datadir
 
     * so that it may be rename()d.
 
     */
 
    char *tmpdir;
 
    
 
  } files;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 

	
 
  user_mgr_t user_mgr;
 

	
 
  /* my servertype, see protocol.h */
 
  uint8_t servertype;
 
};
 

	
 

	
 
/* *********************************************
 
   Function Prototypes
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
int distrend_do();
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config, multiio_context_t multiio);
 
int distrend_config_free(struct distrend_config *config);
 
int distrend_handle_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata, struct general_info *geninfo);
 

	
 
/**
 
 * client request handlers
 
 */
 
int distrend_handle_version(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data);
 
int distrend_handle_pass(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data);
 
int distrend_handle_file_post_start(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data);
 
int distrend_handle_file_post(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data);
 
int distrend_handle_file_post_finish(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data);
 

	
 
/* functions of some generic sort ...ish */
 
int distrend_handle_successful_upload(struct distrend_client *client, struct distrend_client_file_post *client_file_post);
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int update_xml_joblist(struct general_info *geninfo);
 

	
 
/* **************Test Functions**************** */
 
int interactiveTest(int test, multiio_context_t multiio, struct general_info *general_info);
 

	
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  int tmp;
 
  struct general_info general_info;
 
  multiio_context_t multiio;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
  } clientstatus;
 

	
 
  fprintf(stderr, PACKAGE_STRING "\n\
 
Nathan Phillip Brink <binki@ohnopub.net>\n\
 
Ethan Zonca <e@ethanzonca.com>\n\
 
\n");
 

	
 
#ifdef HAVE_LIST_BRAG
 
  fprintf(stderr, "Using %s\n", list_brag);
 
#endif
 

	
 
  clientstatus = CLIENTSTATUS_UNINITIALIZED;
 
  // xmlinit();
 

	
 
  for(counter = 0; counter < argc; counter ++)
 
    {
 
      if(strcmp(argv[counter], "-h") == 0)
 
      {
 
    	  fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
 
	  return 2;
 
      }
 

	
 
      else if(strcmp(argv[counter], "-t") == 0)
 
      {
 
    	  fprintf(stderr, "Entering into test mode...\n\n");
 
    	  test = 1;
 
      }
 
    }
 

	
 

	
 
  multiio = multiio_context_new();
 

	
 
  if(distrend_do_config(argc, argv, &general_info.config, multiio))
 
    return 1;
 

	
 
  /** preset paths */
 
  _distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml",
 
		    general_info.config->datadir);
 

	
 
  _distren_asprintf(&general_info.files.tmpdir, "%s/tmp",
 
		    general_info.config->datadir);
 
  distren_mkdir_recurse(general_info.files.tmpdir);
 

	
 
  _distren_asprintf(&general_info.files.userlist, "%s/users.xml",
 
		    general_info.config->datadir);
 

	
 
  /** configuraton stuff that depends on the paths being calculated, such as loading data */
 
  general_info.user_mgr = user_mgr_init(general_info.files.userlist);
 
@@ -214,484 +217,486 @@ Ethan Zonca <e@ethanzonca.com>\n\
 
  general_info.config->listens = distrend_listens_new(multiio, &general_info, general_info.config->options);
 
  if(!general_info.config->listens)
 
    {
 
      fprintf(stderr, "error initializing listens\n");
 
      return 1;
 
    }
 
  remoteio_generic_data_set(general_info.config->options->remoteio, general_info.config->listens);
 
  for(counter = 0; general_info.config->listen_ports[counter]; counter ++)
 
    {
 
      tmp = distrend_listen_add(general_info.config->listens, general_info.config->listen_ports[counter]);
 
      if(tmp)
 
	{
 
	  fprintf(stderr, "Error listening on port %d\n", general_info.config->listen_ports[counter]);
 
	  return 1;
 
	}
 
    }
 

	
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_VERSION, &distrend_handle_version, (uint8_t)DISTREND_CLIENT_PREVERSION);
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_PASS,
 
			      &distrend_handle_pass, (uint8_t)DISTREND_CLIENT_PREAUTH);
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_FILE_POST_START, &distrend_handle_file_post_start, (uint8_t)DISTREND_CLIENT_GOOD);
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_FILE_POST, &distrend_handle_file_post, (uint8_t)DISTREND_CLIENT_GOOD);
 
  distrend_listen_handler_add(general_info.config->listens, DISTREN_REQUEST_FILE_POST_FINISH, &distrend_handle_file_post_finish, (uint8_t)DISTREND_CLIENT_GOOD);
 

	
 
  /* Main Loop */
 
  general_info.config->die = 0;
 
  while(!general_info.config->die)
 
    {
 
      multiio_poll(multiio, 15000);
 

	
 
      tabletennis_serve(general_info.config->listens->tabletennis);
 

	
 
      /* Run the watchdog, @TODO: like every 10 mins or something */
 
      frame_watchdog(general_info.conn);
 
    }
 

	
 
  distrend_listen_free(general_info.config->listens);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
  free(general_info.files.tmpdir);
 
  free(general_info.files.userlist);
 
  mysqlDisconnect(general_info.conn);
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
int distrend_handle_version(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data)
 
{
 
  char *tmp_str;
 
  struct distren_request_version version;
 

	
 
  if(distren_request_parse_version(req, req_data, &version))
 
    {
 
      distrend_send_disconnect(client, "Invalid DISTREN_REQUEST_VERSION packet.");
 
      return 1;
 
    }
 

	
 
  if(client->state != DISTREND_CLIENT_PREVERSION)
 
    {
 
      distrend_send_disconnect(client, "You have already sent the VERSION command.");
 
      return 1;
 
    }
 
  if(!strncmp(PACKAGE_STRING, version.package_string, DISTREN_REQUEST_VERSION_PACKAGE_STRING_LEN))
 
    {
 
      /**
 
       * The client and I claim to be of the same version of distren :-D
 
       * Now we will mark the client as valid.
 
       *
 
       * We won't increment his time to live, though, because it shouldn't take
 
       * him that long to auth.
 
       */
 
      client->state = DISTREND_CLIENT_PREAUTH;
 
    }
 
  else
 
    {
 
      /**
 
       * The client claims to be of a different version of distren.
 
       * Now we will just send a disconnect packet and disconnect the client.
 
       */
 
      _distren_asprintf(&tmp_str, "You have tried to connect to a %s server when your client claims to be running %s. Bye ;-)\n", PACKAGE_STRING, version.package_string);
 
      if(tmp_str)
 
	{
 
	  distrend_send_disconnect(client, tmp_str);
 
	  free(tmp_str);
 
	}
 
      else
 
	distrend_send_disconnect(client, "Invalid PACKAGE_VERSION :-|.");
 
      return 1;
 
    }
 

	
 
  client->servertype = version.servertype;
 

	
 
  return 0;
 
}
 

	
 
/**
 
 * Handle a DISTREN_REQUEST_PASS request.
 
 */
 
int distrend_handle_pass(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data)
 
{
 
  struct distren_request_pass *pass_req;
 

	
 
  char username[DISTREN_REQUEST_PASS_USERNAME_LEN + 1];
 
  char pass[DISTREN_REQUEST_PASS_PASS_LEN + 1];
 

	
 
  struct user *user;
 

	
 
  if(req->len < sizeof(struct distren_request_pass))
 
    {
 
      distrend_send_disconnect(client, "You tried to send too short of a DISTREN_REQUEST_PASS.");
 
      return 1;
 
    }
 

	
 
  pass_req = req_data;
 
  memcpy(username, pass_req->username, DISTREN_REQUEST_PASS_USERNAME_LEN);
 
  username[DISTREN_REQUEST_PASS_USERNAME_LEN] = '\0';
 

	
 
  memcpy(pass, pass_req->pass, DISTREN_REQUEST_PASS_PASS_LEN);
 
  pass[DISTREN_REQUEST_PASS_PASS_LEN] = '\0';
 

	
 
  user = user_find(geninfo->user_mgr, username);
 
  if(!user
 
     || strcmp(user->username, username)
 
     || strcmp(user->pass, pass))
 
    {
 
      distrend_send_disconnect(client, "Invalid username or password.");
 
      return 1;
 
    }
 

	
 
  client->state = DISTREND_CLIENT_GOOD;
 

	
 
  return 0;
 
}
 

	
 
/**
 
 * Traversal helper for distrend_client_find_post().
 
 */
 
int distrend_client_find_post_traverse(uint32_t *post_id, struct distrend_client_file_post *client_file_post)
 
{
 
  if(*post_id == client_file_post->post_id)
 
    return FALSE;
 

	
 
  return TRUE;
 
}
 

	
 
/**
 
 * Find the record for an in-progress client's file posting.
 
 */
 
struct distrend_client_file_post *distrend_client_find_post(struct distrend_client *client, uint32_t post_id)
 
{
 
  if(list_traverse(client->file_post_list, &post_id, (list_traverse_func_t)&distrend_client_find_post_traverse, LIST_ALTR | LIST_FORW | LIST_FRNT) != LIST_EXTENT)
 
    return list_curr(client->file_post_list);
 
  return NULL;
 
}
 

	
 
/**
 
 * Finds a post_context based on the post_id and client.
 
 *
 
 * Compatible the distren_request_parse_file_post_find_context_func_t.
 
 */
 
static distren_request_file_post_context_t distrend_client_find_file_post_context(uint32_t post_id, void *client)
 
{
 
  struct distrend_client_file_post *client_file_post;
 

	
 
  client_file_post = distrend_client_find_post(client, post_id);
 
  if(client_file_post)
 
    return client_file_post->post_context;
 
  return NULL;
 
}
 

	
 
/**
 
 * Clean up and free a client_file_post
 
 *
 
 * Whenever calling this functino, you almost _always_ have to call
 
 * list_remove_element(client->file_post_list, client_file_post);
 
 * first.
 
 */
 
void distrend_client_file_post_free(struct distrend_client_file_post *client_file_post)
 
{
 
  if(client_file_post->fd)
 
  fclose(client_file_post->fd);
 
  free(client_file_post->filename);
 
  unlink(client_file_post->file_save_path);
 
  free(client_file_post->file_save_path);
 

	
 
  free(client_file_post);
 
}
 

	
 
int distrend_handle_file_post_start(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data)
 
{
 
  struct distrend_client_file_post *client_file_post;
 

	
 
  distren_request_file_post_context_t post_context;
 
  uint32_t post_id;
 
  char *filename;
 

	
 
  char *str_tmp;
 

	
 
  int ret;
 

	
 
  /**
 
   * @todo access check!
 
   */
 
  fprintf(stderr, __FILE__ ":%d:distrend_handle_file_post_start(): You need to check if a client is actually allowed to upload files somehow!\n", __LINE__);
 

	
 
  /*
 
   * other servers should be excluded from this check, but we don't
 
   * store the servertype in client yet.
 
   */
 
  if(list_size(client->file_post_list) > 1)
 
    {
 
      distrend_send_disconnect(client, "You are trying to upload too many files at once!");
 
      return 1;
 
    }
 

	
 
  ret = distren_request_parse_file_post_start(req, req_data, &post_context, &post_id, &filename);
 
  if(ret)
 
    {
 
      distrend_send_disconnect(client, "You sent me an invalid DISTREN_REQUEST_FILE_POST_START packet");
 
      return 1;
 
    }
 

	
 
  if(distrend_client_find_post(client, post_id))
 
    {
 
      _distren_asprintf(&str_tmp, "Err accepting file: You are trying to upload using post_id=%d while you have already started another upload using the same post_id", post_id);
 
      distrend_send_disconnect(client, str_tmp);
 
      free(str_tmp);
 

	
 
      distren_request_file_post_context_free(post_context);
 

	
 
      return 1;
 
    }
 

	
 
  client_file_post = malloc(sizeof(struct distrend_client_file_post));
 
  if(!client_file_post)
 
    {
 
      distrend_send_disconnect(client, "Error accepting file: out of memory");
 

	
 
      distren_request_file_post_context_free(post_context);
 

	
 
      return 1;
 
    }
 

	
 
  client_file_post->post_context = post_context;
 
  client_file_post->post_id = post_id;
 
  client_file_post->filename = filename;
 
  _distren_asprintf(&client_file_post->file_save_path, "%s/conn-%d_file_post-%d",
 
		   geninfo->files.tmpdir, client->connection_id, post_id);
 
  client_file_post->fd = fopen(client_file_post->file_save_path, "w");
 
  if(!client_file_post->fd)
 
    {
 
      perror("fopen");
 

	
 
      fprintf(stderr, "error: Unable to open ``%s''. See above ``fopen'' error for more details.\n", client_file_post->file_save_path);
 
      distrend_send_disconnect(client, "Error accepting file: unable to store the file n disk");
 

	
 
      distren_request_file_post_context_free(post_context);
 

	
 
      list_remove_element(client->file_post_list, client_file_post);
 
      distrend_client_file_post_free(client_file_post);
 
      return 1;
 
    }
 

	
 
  list_insert_after(client->file_post_list, client_file_post, 0);
 

	
 
  return 0;
 
}
 

	
 
int distrend_handle_file_post(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data)
 
{
 
  struct distrend_client_file_post *client_file_post;
 

	
 
  void *file_data;
 
  size_t file_data_len;
 

	
 
  uint32_t post_id;
 
  char *tmp_str;
 

	
 
  size_t written_len;
 

	
 
  int ret;
 

	
 
  ret = distren_request_parse_file_post(req, req_data, &post_id,
 
					&distrend_client_find_file_post_context, client,
 
					&file_data, &file_data_len);
 
  if(ret)
 
    {
 
      distrend_send_disconnect(client, "You sent me an invalid DISTREN_REQUEST_FILE_POST packet");
 
      return 1;
 
    }
 

	
 
  client_file_post = distrend_client_find_post(client, post_id);
 

	
 
  if(!client_file_post)
 
    {
 
      _distren_asprintf(&tmp_str, "You are attempting to upload post_id=%d when you haven't given a DISTREN_REQUEST_FILE_POST_START packet", post_id);
 
      distrend_send_disconnect(client, tmp_str);
 
      free(tmp_str);
 
      return 1;
 
    }
 

	
 
  written_len = fwrite(file_data, 1, file_data_len, client_file_post->fd);
 
  if(written_len < file_data_len)
 
    {
 
      distrend_send_disconnect(client, "Error saving upload: error while writing to the temporary upload file");
 

	
 
      list_remove_element(client->file_post_list, client_file_post);;
 
      /* closes the file being written, and free()s everything, unlinks the file */
 
      distrend_client_file_post_free(client_file_post);
 

	
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
/**
 
 * here be magic
 
 */
 
int distrend_handle_file_post_finish(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data)
 
{
 
  struct distrend_client_file_post *client_file_post;
 

	
 
  uint32_t post_id;
 
  int ret;
 

	
 
  ret = distren_request_parse_file_post_finish(req, req_data, &post_id,
 
					       &distrend_client_find_file_post_context, client);
 
  if(ret)
 
    {
 
      switch(ret)
 
	{
 
	case 2:
 
	  /* client asked to cancel a file upload */
 
	  client_file_post = distrend_client_find_post(client, post_id);
 
	  if(client_file_post)
 
	    {
 
	      list_remove_element(client->file_post_list, client_file_post);
 
	      distrend_client_file_post_free(client_file_post);
 
	    }
 
	  return 0;
 

	
 
	case 3:
 
	  /* checksuming failed */
 
	  distrend_send_disconnect(client, "You have uploaded a file that doesn't match its checksum somehow... which should be pretty much impossible");
 
	  client_file_post = distrend_client_find_post(client, post_id);
 
	  if(client_file_post)
 
	    {
 
	      list_remove_element(client->file_post_list, client_file_post);
 
	      distrend_client_file_post_free(client_file_post);
 
	    }
 
	  return 1;
 

	
 
	default:
 
	  distrend_send_disconnect(client, "You sent me an invalid DISTREN_REQUEST_FILE_POST_FINISH packet");
 
	}
 
      return 1;
 
    }
 

	
 
  client_file_post = distrend_client_find_post(client, post_id);
 
  if(!client_file_post)
 
    return 1;
 

	
 
  /*
 
   * Here it is... manage a file being submitted for rendering... or
 
   * for whatever purpose it was uploaded for somehow...
 
   */
 
  fclose(client_file_post->fd);
 
  client_file_post->fd = NULL;
 
  distrend_handle_successful_upload(client, client_file_post);
 

	
 
  list_remove_element(client->file_post_list, client_file_post);
 
  distrend_client_file_post_free(client_file_post);
 

	
 
  return 0;
 
}
 

	
 
/**
 
 * Does stuff with file uploads after they've been successfully acquired.
 
 *
 
 * @todo this should probably be genericized to handle 1. file uploads and 2. server-initiated file _downloads_, i.e., using libcurl. In that case, struct distrend_client_file_post shouldn't be passed directly to ourself but some common struct might be passed here.
 
 */
 
int distrend_handle_successful_upload(struct distrend_client *client, struct distrend_client_file_post *client_file_post)
 
{
 
  fprintf(stderr, __FILE__ ":%d: STUB: I don't know what to do with %s[%s] :-/\n", __LINE__,
 
	  client_file_post->filename, client_file_post->file_save_path);
 

	
 
  unpackJob("/home/ohnobinki/var/distren/tmp", client_file_post->file_save_path);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 

	
 
/* Grabs config info from confs */
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config, multiio_context_t multiio)
 
{
 
  unsigned int counter;
 

	
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_STR_LIST("render_types", NULL, CFGF_NONE),
 
      CFG_SIMPLE_STR("mysql_user", NULL),
 
      CFG_SIMPLE_STR("mysql_host", NULL),
 
      CFG_SIMPLE_STR("mysql_pass", NULL),
 
      CFG_SIMPLE_STR("mysql_database", NULL),
 
      CFG_END()
 
    };
 

	
 
  cfg_t *cfg_listen;
 

	
 
  fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__);
 
  *config = malloc(sizeof(struct distrend_config));
 
  myopts[1].simple_value = &(*config)->datadir;
 
  myopts[3].simple_value = &(*config)->mysql_user;
 
  myopts[4].simple_value = &(*config)->mysql_host;
 
  myopts[5].simple_value = &(*config)->mysql_pass;
 
  myopts[6].simple_value = &(*config)->mysql_database;
 

	
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options, multiio))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listen_ports = malloc(sizeof(int) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listen_ports[counter] = cfg_getint(cfg_listen, "port");
 
    }
 
  (*config)->listen_ports[counter] = 0;
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 

	
 
  return 0;
 
}
 

	
 
int distrend_config_free(struct distrend_config *config)
 
{
 
  distrend_listen_free(config->listens);
 
  options_free(config->options);
 
  free(config->listen_ports);
 
  free(config);
 

	
 
  return 0;
 
}
 

	
 
/* ************************** XML Functions ************************* */
 

	
 
// writes the general_info.xml file which is a copy of the general_info structure
 
// except that it doesn't hold free_clients and rendering_clients
 
void update_general_info(struct general_info *geninfo)
 
{
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 

	
 
  writer = xmlNewTextWriterFilename(geninfo->files.geninfo, 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  xmlTextWriterStartElement(writer, (xmlChar*)"general_info");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp);
src/server/listen.h
Show inline comments
 
@@ -36,192 +36,197 @@ struct distrend_client;
 
#include "tabletennis.h"
 

	
 
#include "common/options.h"
 
#include "common/multiio.h"
 
#include "common/protocol.h"
 
#include "common/remoteio.h"
 
#include "common/request.h"
 

	
 
#include <queue.h>
 
#include <time.h>
 

	
 
/**
 
   How long a client has after connecting to send
 
   authentication information before his connection is cleaned
 
   up.
 
 */
 
#define DISTREND_LISTEN_AUTHTIME 32
 

	
 
/**
 
   How long a client has when in DISTREND_CLIENT_BAD before
 
   his connection is dropped. This grace time is intended so that
 
   the client will actually see his disconnect message instead of
 
   just having his connection reset.
 
 */
 
#define DISTREND_LISTEN_DISCONNECT_GRACE 8
 

	
 
/**
 
 * Numbers are explicitly given so that commands can be restricted to
 
 * clients in certain states.
 
 */
 
enum distrend_client_state
 
  {
 
    /**
 
       The client hasn't yet given us its version.
 
     */
 
    DISTREND_CLIENT_PREVERSION = 1,
 
    /**
 
       We don't yet know the client. It may only use authentication
 
       commands.
 
     */
 
    DISTREND_CLIENT_PREAUTH = 2,
 
    /**
 
       The client is authenticated, etc.
 
     */
 
    DISTREND_CLIENT_GOOD = 4,
 
    /**
 
       The client is queued to be disconnected. (This state exists
 
       so that the client at least has a chance to recieve its
 
       disconnect message/error before being dumped).
 
     */
 
    DISTREND_CLIENT_BAD = 8,
 
  };
 

	
 
struct distrend_listens
 
{
 
  /* of type (struct distrend_request_handler_info) */
 
  list_t request_handlers;
 
  /* the data to pass on to all request handlers */
 
  struct general_info *geninfo;
 
  /* the distrend config */
 
  struct options_common *options;
 

	
 
  tabletennis_t tabletennis;
 

	
 
  /* of type (struct distrend_client)  (multiio stores a pointer per socket, we'll store each strut distrend_client as that pointer instead of using this list) */
 
  /* list_t clients; */
 

	
 
  /* the multiio context the listening interface should use/initialize */
 
  multiio_context_t multiio;
 
  /*
 
   * the socket type reserved for us, i.e., the listen()/accept()
 
   * socket type whose events we handle.
 
  */
 
  multiio_socket_type_t socket_type;
 
};
 

	
 
struct distrend_client_file_post
 
{
 
  distren_request_file_post_context_t post_context;
 
  uint32_t post_id;
 
  /** user-friendly filename */
 
  char *filename;
 
  /** on-disk filename */
 
  char *file_save_path;
 
  FILE *fd;
 
};
 

	
 
/**
 
   The information necessary to recieve data from and send data
 
   to a client.
 
 */
 
struct distrend_client
 
{
 
  enum distrend_client_state state;
 

	
 
  /**
 
   * \see protocol.h for DISTREN_SERVERTYPE_ constants.
 
   */
 
  uint8_t servertype;
 

	
 
  /**
 
     The absolute time at which this client's entry in the client list will be
 
     expired, closed, and marked as dead so that it may be cleaned up. This is
 
     used to implement ping timeouts (if state == DISTREND_CLIENT_GOOD) and 
 
     disconnect message grace time (if state == DISTREND_CLIENT_BAD).
 
   */
 
  time_t cleanup_time;
 

	
 
  size_t inlen; /*< number of bytes waiting to be processed */
 
  size_t expectlen; /*< number of bytes that inlen has to be for a complete request to be had, 0 when waiting on header */
 

	
 
  struct tabletennis_client tabletennis_client;
 

	
 
  /**
 
   * File postings (uploads) are per-connection, and thus stored
 
   * here. Of type struct distrend_client_file_post.
 
   */
 
  list_t file_post_list;
 

	
 
  /**
 
   * Yes, in some places we need a unique key for this connection.
 
   */
 
  int connection_id;
 

	
 
  /**
 
   * Yes, even though the remoteio will have a void *pointer to this
 
   * struct, we need a reverse pointer for something like
 
   * distrend_client_write() to work.
 
   */
 
  struct remoteio *rem;
 
};
 

	
 

	
 

	
 
/**
 
   A function signature that may be registered as a client
 
   request handler.
 

	
 
   @param client the client that sent the request.
 
   @param req the distren request header.
 
   @param data the message received from the client.
 
 */
 
typedef int(*distrend_handle_request_func_t)(struct general_info *geninfo, struct distrend_client *client, struct distren_request *req, void *req_data);
 

	
 
/**
 
   Initializes the listens member of struct distrend_config.
 

	
 
   @param multiio the multiio context in which we should register a new socket type and insert records for clients who connect.
 
   @param geninfo general info to apss to the request handler.
 
   @return Must be free()d with distrend_listen_free();
 
*/
 
struct distrend_listens *distrend_listens_new(multiio_context_t multiio, struct general_info *geninfo, struct options_common *opts);
 

	
 
/**
 
   Adds a socket and configures it to listen().
 

	
 

	
 
   @param listens The handle for this set of listens, obtained via distrend_listen_init().
 
 */
 
int distrend_listen_add(struct distrend_listens *listens, int port);
 

	
 
/**
 
 * Register a request handler with the listener.
 
 *
 
 * \param config distrend's configuration.
 
 * \param type The request type for which this handler should be
 
 *   called
 
 * \param handler The handler to call when a request of type type is
 
 *   received.
 
 * \param client_state_mask A mask of enum distrend_client_state
 
 *   values that are allowed to use this command.
 
 */
 
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler, uint8_t client_state_mask);
 

	
 
/**
 
 * cleans listening sockets/frees main struct. Unnecessary for a working server, currently a stub.
 
 */
 
int distrend_listen_free(struct distrend_listens *listens);
 

	
 
/**
 
   writes request to client.
 
   @param client client to write to
 
   @param req the request struct. caller must free this.
 
   @param data the data of the request which is req->len bytes long. caller must free this.
 
 */
 
int distrend_client_write_request(struct distrend_client *client, const struct distren_request *req, const void *data);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client();
 

	
 
/**
 
 * Queue a DISTREN_REQUEST_DISCONNECT and prepare a client
 
 * to be disconnected.
 
 */
 
int distrend_send_disconnect(struct distrend_client *client, const char *quit_msg);
src/server/slavefuncs.c
Show inline comments
 
@@ -718,216 +718,224 @@ int slaveBenchmark(char *datadir, int *b
 
  _distren_asprintf(&realOutput, "%s/benchmark%d.jpg", datadir, frameToRender); // Where to save benchmark output
 

	
 

	
 
  char *input;
 
  _distren_asprintf(&input, "%s/benchmark.blend", datadir); // Input file
 

	
 
  fprintf(stderr,"Downloading benchmark data...\n");
 
  curlget("http://data.distren.org/benchmark.blend", input); // Download to input file location
 

	
 
  char *command = "blender"; // @TODO: We currently expect this to be in PATH
 
  char *cmd[] = { command, "-b", input, "-o", output, "-f", frame_str, "-t", "0", (char *)NULL }; // arguments for blender
 

	
 
  // fprintf(stderr,"Preparing to execute command: %s -b %s -o %s -f %s\n", command, input, output, frame_str);
 
  fprintf(stderr,"Running benchmark...\n");
 

	
 
  long startTime;
 
  long endTime;
 

	
 
  time(&startTime);
 

	
 
  char buf[20];
 
  struct execio *testrem;
 
  size_t readlen;
 

	
 
  ret = execio_open(&testrem, command, cmd);
 
  if(ret)
 
    {
 
      fprintf(stderr, "Error executing blender\n");
 
      return 1;
 
    }
 

	
 
  buf[19] = '\0';
 
  while(!execio_read(testrem, buf, 19, &readlen))
 
    {
 
       buf[readlen] = '\0';
 
       fprintf(stderr, "read \"%s\"\n", buf);
 
    }
 
  execio_close(testrem);
 

	
 
  time(&endTime);
 

	
 
  struct stat buffer;
 
  int ostatus = stat(realOutput, &buffer);
 
  if(ostatus == -1){
 
    ret = 1; // Return error if output wasn't generated
 
  }
 
  else
 
    remove(output);
 

	
 
  *benchmarkTime = abs(difftime(startTime,endTime));
 
  float tmp = *benchmarkTime;
 
  tmp = (1/tmp) * 50000;
 
  *renderPower = (int)tmp;
 
  
 
free(realOutput);
 
  free(frame_str);
 
  free(input);
 
  free(output);
 
  return ret;
 
}
 

	
 

	
 

	
 
/** *********************************************************************************************************/
 
/** Why hello ohnobinki! Normaldotcom has graciously prepared this portion of code for you to work on! Yay! */
 
/** *********************************************************************************************************/
 

	
 

	
 
/**
 
   Extracts archive to the specified directory, creating this directory
 
   if necessary (but this directory will not be recursively created).
 
   @param outdir output directory
 
   @param pathtoTar filename of the archive to extract
 

	
 
   ohnobinki: please make me work :-\
 
 */
 
int unpackJob(char *outdir, char *pathtoTar)
 
{
 
  int ret;
 

	
 
  struct archive *a;
 
  struct archive_entry *ae;
 
  int astatus;
 

	
 
  /* ignore return because directory may exist already */
 
  mkdir(outdir, 0700);
 
  ret = chdir(outdir);
 
  if(ret == -1)
 
    {
 
      perror("chdir");
 
      return 1;
 
    }
 

	
 
  a = archive_read_new();
 
  ae = archive_entry_new();
 

	
 
  archive_read_support_compression_all(a);
 
  archive_read_support_format_raw(a);
 
  astatus = archive_read_open_filename(a, pathtoTar, 8192);
 
  if (astatus != ARCHIVE_OK)
 
  if(!a
 
     || !ae)
 
    {
 
      fprintf(stderr, "Error opening archive!\n");
 
      if(a)
 
	archive_read_finish(a);
 
      if(ae)
 
	archive_entry_free(ae);
 
      return 1;
 
    }
 

	
 
  for(astatus = ARCHIVE_OK;
 
      astatus == ARCHIVE_OK
 
        || astatus == ARCHIVE_WARN;
 
      )
 
  fprintf(stderr, "Trying to unpack %s into %s\n", pathtoTar, outdir);
 

	
 
  archive_read_support_compression_all(a);
 
  archive_read_support_format_all(a);
 
  astatus = archive_read_open_filename(a, pathtoTar, 1);
 
  if (astatus != ARCHIVE_OK)
 
    {
 
      astatus = archive_read_next_header2(a, ae);
 
      if(astatus == ARCHIVE_WARN)
 
        fprintf(stderr, "Encountered nonfatal read error somehow!\n");
 
      fprintf(stderr, "Error opening archive!\n");
 
      archive_read_finish(a);
 
      archive_entry_free(ae);
 
      return 1;
 
    }
 

	
 
      if(astatus == ARCHIVE_OK
 
         || astatus == ARCHIVE_WARN)
 
  while((astatus = archive_read_next_header2(a, ae)) == ARCHIVE_OK)
 
    {
 
        astatus = archive_read_extract(a, ae,
 
                                       ARCHIVE_EXTRACT_NO_OVERWRITE
 
                                       | ARCHIVE_EXTRACT_SECURE_SYMLINKS
 
                                       | ARCHIVE_EXTRACT_SECURE_NODOTDOT);
 
      if(astatus != ARCHIVE_OK)
 
	fprintf(stderr, "Encountered error or warning when attempting to extract file: %s\n",
 
		archive_entry_pathname(ae));
 
    }
 
  archive_entry_free(ae);
 
  archive_read_finish(a);
 

	
 
  if(astatus != ARCHIVE_EOF)
 
    {
 
      fprintf(stderr, "Error reading archive!\n");
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 

	
 
/** Logs the user into the server after ensuring that keys exist
 
    ohnobinki: I assume you could use this for remoteio, or just kill it
 
*/
 
int login_user(char *username)
 
{
 
  // @TODO: Put some telnet-style auth code here unless this is obselete
 
  return 1; // success
 
}
 

	
 
/**
 
 * Sends the server a single request (see protocol.h)
 
 * ohnobinki: This should hopefully work, maybe ;D
 
 *
 
 * @deprecated THIS FUNCTION SHOULD DIE VERY, VERY SOON!
 
 * (and painfully :-p)
 
*/
 
int sendSignal(struct remoteio *rem, char signal)
 
{
 

	
 
  fprintf(stderr, __FILE__ ":%d: Ignoring call to DEPRECATED and WRONG sendSignal() function. See similar message for sendExtSignal() for details\n", __LINE__);
 
  return 1;
 
  remoteio_write(rem, &signal, 1);
 

	
 
  /**
 
     if remoteio_write returned 1, the connection
 
     is probably dead or there was a real error
 
   */
 
  return 1;
 
}
 

	
 
/**
 
 * Sends the server an extended signal (request + data)
 
 * ohnobinki: I have no clue how you really want to handle this. Please clarify/edit
 
 * normaldotcom: I see more and more how clueless you are, I hope to get to his soon ;-)
 
 *
 
 * @deprecated lol
 
 */
 
int sendExtSignal(struct remoteio *rem, char signal, char *data)
 
{
 
  size_t len;
 
  char *ssignal;
 

	
 
  fprintf(stderr, __FILE__ ":%d: Ignoring call to DEPRECATED and WRONG sendExtSignal() function. First of all, this function's name is camelCase. Secondly, it doesn't use protocol.h properly. I'll show how later. Thirdly, when protocol.h and request.h (still being written) are used properly, you don't need sendExtSignal() at all.\n", __LINE__);
 
  return 1;
 

	
 
  /**
 
   * Just append the data FIXME: We should do this differently
 
   */
 
  _distren_asprintf(&ssignal, "%c%s", signal, data);
 
  len = strlen(ssignal);
 
  remoteio_write(rem, ssignal, len);
 

	
 
  return 0;
 
}
 

	
 

	
 
/* Port of web functions for standard code
 

	
 
   Currently, most functions are stubs due to lack
 
   of socket reading code
 

	
 
   ohnobinki: I can take care of a fair amount of this, but the remotio reading and writing is where you should really lay down some code.
 
*/
 

	
 
/** marks frame finished on server */
 
void finishframe(struct remoteio *rem, int jobnum, int framenum){
 
  char* data;
 
  _distren_asprintf(&data, "%d%d", jobnum, framenum);
 
  sendExtSignal(rem, DISTREN_REQUEST_STATUS, data);
 
}
 

	
 
/** resets frame to unassigned on server */
 
void resetframe(struct remoteio *rem, int jobnum, int framenum){
 
  fprintf(stderr,"Resetting frame %d in job %d on server... ",framenum,jobnum);
 
  char* data;
 
  _distren_asprintf(&data, "%d%d", jobnum, framenum);
 
 sendExtSignal(rem, DISTREN_REQUEST_STATUS, data);
 

	
 
}
 

	
 
/** marks frame assigned on server */
 
void startframe(struct remoteio *rem, int jobnum, int framenum){
0 comments (0 inline, 0 general)