Changeset - 5b7ab20c5306
[Not reviewed]
default
0 4 0
Nathan Brink (binki) - 16 years ago 2009-12-20 16:13:12
ohnobinki@ohnopublishing.net
style fixes, move distrenjob_unserialize() to distrenjob.*, more struct geninfo use
4 files changed with 186 insertions and 125 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
@@ -17,96 +17,96 @@
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "asprintf.h"
 
#include "distrenjob.h"
 
#include "execio.h"
 
#include "listen.h"
 
#include "options.h"
 
#include "protocol.h"
 
#include "slavefuncs.h"
 

	
 
#include <confuse.h>
 
#include <malloc.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/xmlmemory.h>
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/encoding.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 
#include <libxml/xmlreader.h>
 

	
 
/* local defs */
 
#define NUMBER_ELEVEN 11
 

	
 
/* ******************* Structs ************************ */
 

	
 
struct general_info
 
{
 
  struct distrenjob head;
 

	
 
  struct distrend_config *config;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 

	
 
/*
 
  internally defined funcs's prototypes @TODO: Make all functions nice and proper 
 
*/
 
void distrenjob_remove(struct general_info *, struct distrenjob *bj);
 

	
 
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum);
 
int distrenjob_enqueue(struct general_info *, struct distrenjob *job);
 
int mortition(struct general_info *, struct distrenjob *job);
 
int distrenjob_serialize(struct general_info *geninfo, struct distrenjob *job);
 
int update_xml_joblist(struct general_info *);
 
int createQueueFromXML(struct distrenjob *head);
 
int reCreateQueueFromXML(struct distrenjob *head, xmlDocPtr doc, xmlNodePtr current);
 
int createQueueFromXML(struct general_info*);
 
int reCreateQueueFromXML(struct general_info*, xmlDocPtr doc, xmlNodePtr current);
 
void update_general_info(struct general_info*);
 
int import_general_info(struct general_info*);
 
int updateJobStatsXML(struct distrenjob *job);
 

	
 
char *job_getserialfilename(struct general_info *, unsigned int jobnum);
 

	
 
/* ********************** Functions ************************* */
 

	
 
/** Dumps all data in RAM to an xml file (such as current jobs, etc) which is parsed by start_data. Remember to invoke this before shutting down! */
 
int xml_dump()
 
{
 
  return 0;
 
}
 
/**
 
   Performs command stored in a client's request.
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 
/**
 
   Frees the action
 
*/
 
void distrend_action_free()
 
{
 

	
 
}
 

	
 
/** 
 
@@ -123,182 +123,198 @@ int start_data(struct general_info *gene
 
  general_info->head.priority = 0;
 

	
 
  general_info->jobs_in_queue = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->total_finished_jobs = 0;
 
  general_info->total_frames_rendered = 0;
 
  general_info->highest_jobnum = 0;
 
  general_info->hibernate = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  fprintf(stderr, "Initialized default global and queue states\n");
 

	
 
  if(stat("general_info.xml", &buffer) == 0)
 
    {
 
      fprintf(stderr, "previous state file found, loading:\n");
 

	
 
      fprintf(stderr, "Parsing XML files and restoring previous state...\n");
 
      if(import_general_info(general_info))
 
	fprintf(stderr, "FAILURE\n");
 
      
 
      fprintf(stderr, "Restoring queue...\n");
 
      if(createQueueFromXML(&general_info->head))
 
      if(createQueueFromXML(general_info))
 
	fprintf(stderr, "FAILURE\n");
 
      
 
      fprintf(stderr, "done\n");
 
    }
 
  
 
  return 0;
 
}
 

	
 
/** Finish-Setter: Sets a frame to the "completed" status.*/
 
void finish_frame(struct general_info *geninfo, struct distrenjob *distrenjob, int frame)
 
{
 
  distrenjob->frameset[frame].status = FRAMESETSTATUS_DONE;
 
  distrenjob->total_render_time = distrenjob->total_render_time + (clock() - distrenjob->frameset[frame].start_time);
 
  distrenjob->completed_frames ++;
 
  distrenjob->assigned_frames --;
 
  geninfo->total_frames_rendered ++; /*< Increase total frames var for stats */
 

	
 
  update_general_info(geninfo);
 
  updateJobStatsXML(distrenjob);
 
}
 

	
 
/**
 
   checks to see if a job is actually done.
 
   - scans the folder of the job to make sure all output files are present
 
*/
 
int mortition(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  short int isJobDone;
 
  int counter;
 
  char *path_and_number;
 
  struct stat buffer;
 

	
 
  isJobDone = 1; // set isJobDone to true
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      _distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format);
 
      _distren_asprintf(&path_and_number, "%s/stor/job%d/out/%d.%s",
 
			geninfo->config->datadir,
 
			job->jobnum,
 
			job->frameset[counter].num,
 
			job->output_format);
 
      if(stat(path_and_number, &buffer) == -1)
 
        {
 
          job->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 
          job->completed_frames--;
 
          geninfo->total_frames_rendered--;
 
          isJobDone = 0; // if a missing frame is found, set isJobDone to false
 
	  /** if a missing frame is found, set isJobDone to false */
 
          isJobDone = 0;
 
        }
 
      free(path_and_number);
 
    }
 

	
 
  if(isJobDone) // if all frames were accounted for
 
  /** if all frames were accounted for */
 
  if(isJobDone)
 
    {
 
      distrenjob_remove(geninfo, job);
 
      distrenjob_free(&job);
 
      geninfo->jobs_in_queue --;
 
      update_xml_joblist(geninfo);
 
    }
 
  else{
 
    job->prev_frame_index = -1; // if the job isn't done, have frame_finder() start from the first frame, allowing it to see the frames that are now unassigned
 
  }
 
  else
 
    /**
 
       if the job isn't done, have frame_finder() start from the first frame, allowing it to see the frames that are now unassigned 
 
    */
 
    job->prev_frame_index = -1;
 

	
 
  update_general_info(geninfo);
 

	
 
  return 0;
 
}
 

	
 
/** scans the frames of a job to initialize a job after server */
 
/* returns 1 if the job is completely done and there are no missing frames */
 
/* returns 0 if a missing frame is found */
 
int restoreJobState(struct distrenjob *job)
 
{
 
  short int isJobDone;
 
  int counter;
 
  char *path_and_number;
 
  struct stat buffer;
 

	
 
  isJobDone = 1;
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      _distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format);
 
      _distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format); /*< @TODO this path is used in multiple places, construct/build/determine it in a central function */
 
      if(stat(path_and_number, &buffer) == 0)
 
        {
 
          job->frameset[counter].status = FRAMESETSTATUS_ASSIGNED;
 
          job->completed_frames++;
 
        }
 
      else
 
        isJobDone = 0;
 
      free(path_and_number);
 
    }
 

	
 
  return isJobDone;
 
}
 

	
 
/** creates a structure from starting data, then calls another function to actually add that struct to the queue */
 
int prepare_distrenjob(struct general_info *geninfo, int type, char *name, char *submitter, char *email, int priority, int start_frame, int end_frame, int width, int height)
 
{
 
  int counter2;
 
  int counter;
 
  int tmp;
 
  char *path_with_num;
 
  char *serialfile;
 

	
 
  struct distrenjob *distrenjob;
 
  tmp = distrenjob_new(&distrenjob);
 
  if(tmp)
 
    return 1;
 

	
 
  geninfo->highest_jobnum ++;
 
  distrenjob->jobnum = geninfo->highest_jobnum;
 

	
 
  distrenjob->type = 1;
 
  distrenjob->name = name;
 
  distrenjob->submitter = submitter;
 
  distrenjob->email = email;
 
  distrenjob->priority = priority;
 
  distrenjob->width = width;
 
  distrenjob->height = height;
 
  distrenjob->total_frames = (end_frame - start_frame + 1); /* sets the total number of frames in animation for status purposes */
 
  distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames);
 
  if(!distrenjob->frameset)
 
    {
 
      distrenjob_free(&distrenjob);
 
      return 1;
 
    }
 

	
 
  /* prepares all the frames by setting that status to "unassigned" */
 
  counter2 = start_frame;
 
  for(counter = 0; counter < distrenjob->total_frames; counter++){
 
    distrenjob->frameset[counter].num = counter2;
 
    distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 

	
 
    counter2++;
 
  }
 

	
 
  _distren_asprintf(&path_with_num, "stor/job%d/out/", distrenjob->jobnum);
 
  fprintf(stderr, "creating dir '%s'", path_with_num);
 
  fprintf(stderr, ">>NOT<< creating dir ``%s''\n", path_with_num); /*< @TODO recursively create job directory */
 
  serialfile = job_getserialfilename(geninfo, distrenjob->jobnum);
 

	
 
  free(path_with_num);
 

	
 
  /* add job to queue */
 
  fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_serialize()\n");
 
  distrenjob_serialize(geninfo, distrenjob);
 
  distrenjob_serialize(distrenjob, serialfile);
 
  free(serialfile);
 

	
 
  fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_enqueue()\n");
 
  distrenjob_enqueue(geninfo, distrenjob);
 

	
 
  geninfo->jobs_in_queue ++;
 
  fprintf(stderr, "\nprepare_distrenjob: attempting update_xml_joblist()\n");
 
  update_xml_joblist(geninfo);
 
  fprintf(stderr, "\nprepare_distrenjob: attempting update_general_info()\n");
 
  update_general_info(geninfo);
 
  fprintf(stderr, "\nprepare_distrenjob: attempting updateJobStatsXML()\n");
 
  updateJobStatsXML(distrenjob);
 

	
 
  return 0;
 
}
 

	
 

	
 
/** distrenjob_enqueue: This function adds the job to the queue based on its priority */
 
int distrenjob_enqueue(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  struct distrenjob *prev_job;
 
  struct distrenjob *current_job;
 
  struct distrenjob *head;
 

	
 
  head = &geninfo->head;
 
  prev_job = head;
 
@@ -306,79 +322,84 @@ int distrenjob_enqueue(struct general_in
 
  // iterate through linked list of jobs
 
  for(current_job = head->next; 1; current_job = current_job->next)
 
  {
 
	fprintf(stderr, "enqueue loop iteration\n");
 
    if(current_job == NULL){ // if it has reached the end of the list, add job there
 
      prev_job->next = job;
 
      fprintf(stderr, "adding job at end of queue\n");
 
      break;
 
    }
 
    else if(job->priority < current_job->priority){ // if job's priority is less than current_job's priority, insert job
 
      prev_job->next = job;                        // keep in mind 1 is the highest priority given to jobs, head has a
 
      job->next = current_job;                     // priority of zero so it will always be before other jobs
 
      fprintf(stderr, "adding job before jobname: %s\n", current_job->name);
 
      break;
 
    }
 

	
 
    prev_job = current_job;
 
  } /* for(current_job) */
 

	
 
  return 0;
 
}
 

	
 
/** Changes the priority of an existing (and maybe running) job. @arg head I may end up changing the head if job == head */
 
int change_job_priority(struct general_info *geninfo, struct distrenjob *job, int new_priority){
 
  struct distrenjob *current_job;
 
  struct distrenjob *prev_job;
 
  char *serialname;
 

	
 
  distrenjob_remove(geninfo, job);
 
  job->priority = new_priority;
 
  struct distrenjob *current_job;
 
  struct distrenjob *prev_job;
 

	
 
  prev_job = &geninfo->head;
 

	
 
  if(job->frameset[0].status == FRAMESETSTATUS_UNASSIGNED)
 
    /**
 
       if job was not yet started
 
    */
 
    {
 
      distrenjob_enqueue(geninfo, job);
 
      return 0;
 
    }
 

	
 
  /**
 
     iterate through linked list of jobs 
 
  */
 
    for(current_job = &geninfo->head;
 
	current_job != NULL
 
	  && job->priority > current_job->priority;
 
	current_job = current_job->next)
 
      prev_job = current_job;
 

	
 
    prev_job->next = job;
 
    job->next = current_job;
 

	
 

	
 
  update_xml_joblist(geninfo);
 
  distrenjob_serialize(geninfo, job); // because priority is changed
 
  /** reserialize after changes */
 
  serialname = job_getserialfilename(geninfo, job->jobnum);
 
  distrenjob_serialize(job, serialname);
 
  free(serialname);
 

	
 
  return 0;
 
}
 

	
 
/**
 
  Frame Finder: matches your computer up with a lovely frame to render, starts looking at oldest job first
 
  @TODO: We must return both jobnum and framenum
 
  @TODO: Add calls in main()
 
  @return 0 success, other: error
 
*/
 
int find_jobframe(struct general_info *geninfo, struct distrenjob **job, struct frameset **frame)
 
{
 
  if(geninfo->hibernate)
 
    return 1;
 

	
 
  unsigned int frame_counter;
 
  unsigned short int found;
 

	
 
  struct distrenjob *distrenjob_ptr;
 

	
 
  found = 0;
 
  /* iterate through jobs from first to last */
 
  for(distrenjob_ptr = geninfo->head.next; distrenjob_ptr && !distrenjob_ptr->hibernate; distrenjob_ptr = distrenjob_ptr->next)
 
  {
 
@@ -613,53 +634,53 @@ int distrend_do_config(int argc, char *a
 
  unsigned int counter;
 

	
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_END()
 
    };
 

	
 
  struct distrenjob *distrenjob;
 
  cfg_t *cfg_listen;
 

	
 
  int tmp;
 

	
 
  /*
 
   * test xml2distrenjob()
 
   * test distrenjob_unserialize()
 
   */
 
  tmp = xml2distrenjob(&distrenjob, "distrenjob.xml.example");
 
  tmp = distrenjob_unserialize(&distrenjob, "distrenjob.xml.example");
 
  if(tmp)
 
    fprintf(stderr, "xml2distrenjob() returned %d. Try to cd to distren/doc if you want to test out the xml2distrenjob() function. (This will only fix this error if the error is due to an inability of the xml library to access distrenjob.xml.example)\n\n", tmp);
 
    fprintf(stderr, "distrenjob_unserialize() returned %d. Try to cd to distren/doc if you want to test out the distrenjob_unserialize() function. (This will only fix this error if the error is due to an inability of the xml library to access distrenjob.xml.example)\n\n", tmp);
 
  else
 
    fprintf(stderr, "using email ``%s'' for user ``%s'' -- reading in XML files and pulling data from them using libxml2+XPath works!!!\n", distrenjob->email, distrenjob->submitter);
 

	
 
  fprintf(stderr, "%s:%d running config\n", __FILE__, __LINE__);
 

	
 
  *config = malloc(sizeof(struct distrend_config));
 
  myopts[1].simple_value = &(*config)->datadir;
 

	
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
 
      (*config)->listens[counter].sock = -1;
 
    }
 
  memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 
@@ -715,173 +736,91 @@ int import_general_info(struct general_i
 
      return 1;
 
    }
 

	
 
  cur = cur->xmlChildrenNode;
 
  general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 

	
 
  general_info->hibernate = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  xmlFreeDoc(doc);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   serializes a distrenjob into XML
 
*/
 
int distrenjob_serialize(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 
  char *tmpfile;
 
  char *outfile;
 
  int tmprtn;
 

	
 
  _distren_asprintf(&tmpfile, "%s/stor/job%d/.job_info.xml", geninfo->config->datadir, job->jobnum);
 

	
 
  /* create xml document at the location tmp with no compression */
 
  writer = xmlNewTextWriterFilename(tmpfile, 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  /**
 
     write distrenjob element and add its attributes */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"job");
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"name", (xmlChar*)job->name);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"submitter", (xmlChar*)job->submitter);
 
  _distren_asprintf(&tmp, "%d", job->priority);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"priority", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  /**
 
     write resolution element and add its attributes */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"resolution");
 
  _distren_asprintf(&tmp, "%d", job->width);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"width", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", job->height);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"height", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     write video element and its attributes */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"video");
 
  _distren_asprintf(&tmp, "%d", job->frameset[0].num);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"start_frame", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", job->frameset[(job->total_frames - 1)].num);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"end_fame", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"output_format", (xmlChar*)job->output_format);
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     write watchdog forgiveness element */
 
  _distren_asprintf(&tmp, "%d", job->watchdog_forgiveness);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"wd_forgiveness", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  /**
 
     end document */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk */
 
  xmlFreeTextWriter(writer);
 

	
 
  _distren_asprintf(&outfile, "%s/stor/job%d/job_info.xml", geninfo->config->datadir, job->jobnum);
 
  tmprtn = rename(tmpfile, outfile);
 
  if(tmprtn == -1)
 
    {
 
      fprintf(stderr, "%s:%d: Error renaming ``%s'' to ``%s''\n",
 
	      __FILE__, __LINE__,
 
	      tmpfile, outfile);
 
      perror("rename");
 
      tmprtn = 1;
 
    }
 
  free(tmpfile);
 
  free(outfile);
 

	
 
  return tmprtn;
 
}
 

	
 
// extracts data from the xml created by above function and creates a job from it
 
// it returns a pointer to the created job
 
struct distrenjob *createJobFromXML(int job_number)
 
struct distrenjob *createJobFromXML(struct general_info *geninfo, unsigned int jobnum)
 
{
 
  xmlDocPtr doc;
 
  xmlNodePtr cur;
 
  char *file_name;
 
  struct distrenjob *distrenjob;
 
  int start_frame;
 
  int counter;
 
  int counter2;
 

	
 
  _distren_asprintf(&file_name, "stor/job%d/job_info.xml", job_number);
 
  file_name = job_getserialfilename(geninfo, jobnum);
 

	
 
  doc = xmlParseFile(file_name);
 
  if(!doc)
 
    return NULL;
 

	
 
  distrenjob_new(&distrenjob);
 

	
 
  cur = xmlDocGetRootElement(doc);
 

	
 
  distrenjob->name = (char*)xmlGetProp(cur, (xmlChar*)"name");
 
  distrenjob->submitter = (char*)xmlGetProp(cur, (xmlChar*)"submitter");
 
  distrenjob->priority = atoi((char*)xmlGetProp(cur, (xmlChar*)"priority"));
 

	
 
  cur = cur->xmlChildrenNode;
 
  distrenjob->width = atoi((char*)xmlGetProp(cur, (xmlChar*)"width"));
 
  distrenjob->height = atoi((char*)xmlGetProp(cur, (xmlChar*)"number"));
 

	
 
  cur = cur->next;
 
  start_frame = atoi((char*)xmlGetProp(cur, (xmlChar*)"start_frame"));
 
  distrenjob->total_frames = atoi((char*)xmlGetProp(cur, (xmlChar*)"end_frame")) - start_frame + 1;
 
  distrenjob->output_format = (char*)xmlGetProp(cur, (xmlChar*)"output_format");
 

	
 
  cur = cur->next;
 
  distrenjob->watchdog_forgiveness = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 

	
 
  xmlFreeDoc(doc);
 

	
 
  // load up stats.xml file to retrieve data
 
  _distren_asprintf(&file_name, "stor/job%d/stats.xml", job_number);
 
  /** load up stats.xml file to retrieve data */
 
  _distren_asprintf(&file_name, "stor/job%d/stats.xml", jobnum);
 

	
 
  doc = xmlParseFile(file_name);
 
  cur = xmlDocGetRootElement(doc);
 

	
 
  distrenjob->total_render_time = (time_t)atol((char*)xmlGetProp(cur, (xmlChar*)"total_render_time"));
 

	
 
  xmlFreeDoc(doc);
 

	
 
  distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames);
 

	
 
  // change the status of all frames to "unassigned"
 
  counter2 = start_frame;
 
  for(counter = 0; counter <= distrenjob->total_frames; counter++){
 
    distrenjob->frameset[counter].num = counter2;
 
    distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 

	
 
    counter2++;
 
  }
 

	
 
  restoreJobState(distrenjob);
 
  return distrenjob;
 
}
 

	
 
int updateJobStatsXML(struct distrenjob *job)
 
@@ -945,114 +884,127 @@ int update_xml_joblist(struct general_in
 

	
 
      /*
 
	this is needed for the new frame finder to work 
 
      
 
	Why the random constant numeral 11? --ohnobinki
 
      */
 
      geninfo->total_priority_pieces = geninfo->total_priority_pieces + (NUMBER_ELEVEN - job->priority);
 

	
 
      counter++;
 
    }
 

	
 
  // close elements and end document
 
  xmlTextWriterEndDocument(writer);
 

	
 
  // free writer and save xml file to disk
 
  xmlFreeTextWriter(writer);
 
  return 1;
 
}
 

	
 
/**
 
 returns 0 if completed successfully
 
 this reads a list of jobs in the queue before DistRen was shut down
 
 and then adds the jobs to the queue, as if it were never shut down
 
*/
 
int createQueueFromXML(struct distrenjob *head)
 
int createQueueFromXML(struct general_info *geninfo)
 
{
 
  int tmp;
 

	
 
  xmlDocPtr doc;  // holds xml document in memory
 
  xmlNodePtr cur; // points to the current xml element node
 

	
 
  // load xml document
 
  doc = xmlParseFile("job_list.xml");
 
  if(doc == NULL)
 
    {
 
      fprintf(stderr, "createQueueFromXML: document not found\n");
 
      return 1;
 
    }
 

	
 
  // have cur point to the root element of the xml document pointed to by doc
 
  cur = xmlDocGetRootElement(doc);
 
  if(cur == NULL)
 
    {
 
      fprintf(stderr, "createQueueFromXML: document empty\n");
 
      return 1;
 
    }
 

	
 
  if(xmlStrcmp(cur->name, (const xmlChar*)"job_list"))
 
    {
 
      fprintf(stderr, "createQueueFromXML: incorrect root element (%s)", (char*)cur->name);
 
    }
 

	
 
  // moves into the children elements of job_list
 
  cur = cur->xmlChildrenNode;
 

	
 
  tmp = reCreateQueueFromXML(head, doc, cur);
 
  tmp = reCreateQueueFromXML(geninfo, doc, cur);
 

	
 
  xmlFreeDoc(doc);
 

	
 
  return tmp;
 
}
 

	
 
// inserts jobs at front of queue, starting with the last job in the job_list xml file
 
// to preserve the order of the queue
 
int reCreateQueueFromXML(struct distrenjob *head, xmlDocPtr doc, xmlNodePtr current)
 
/**
 
   inserts jobs at front of queue, starting
 
   with the last job in the job_list xml file
 
   to preserve the order of the queue
 
*/
 
int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current)
 
{
 
  struct distrenjob *holder; // holds the job that "was" after head, so that the new struct can be inserted after head
 
  struct distrenjob *job; // job to be added
 
  xmlChar *tmp;
 
  int job_num;
 
  if(current == NULL) // base case, if element doesn't exist then don't do anything
 
    return 0;
 

	
 
  // recursively call itself so that the next job in the queue is added to the front before the current one
 
  if(reCreateQueueFromXML(head, doc, current->next))
 
  /**
 
     recursively call itself so that the next job in
 
     the queue is added to the front before the current one
 
  */
 
  if(reCreateQueueFromXML(geninfo, doc, current->next))
 
    /**
 
       bail if failure recursevely
 
        ^-- is this the best thing to do in this case?
 
	@TODO bail-out cleanup
 
     */
 
    return 1;
 

	
 
  // now actual work is done, adding the job to the front of the queue
 
  holder = head->next; // initialize holder
 
  /** actual work is done now: add the job to the front of the queue */
 
  holder = geninfo->head.next;
 

	
 
  tmp = xmlNodeListGetString(doc, current->xmlChildrenNode, 1); // get job number
 
  job_num = atoi((char*)tmp);
 
  xmlFree(tmp);
 

	
 
  job = createJobFromXML(job_num);
 
  job = createJobFromXML(geninfo, job_num);
 
  if(job == NULL)
 
    return 1;
 

	
 
  // insert job at front of the queue
 
  head->next = job;
 
  /**
 
     insert job at front of the queue
 
  */
 
  geninfo->head.next = job;
 
  job->next = holder;
 

	
 
  return 0;
 
}
 

	
 
/* ************************** Test Functions ************************* */
 
int printFrameInfo(struct frameset *frame)
 
{
 
  char *status;
 

	
 
  status = NULL;
 

	
 
  switch(frame->status)
 
  {
 
    case FRAMESETSTATUS_UNASSIGNED:
 
      _distren_asprintf(&status, "%s", "unassigned");
 
      break;
 
    case FRAMESETSTATUS_ASSIGNED:
 
      _distren_asprintf(&status, "%s", "assigned");
 
      break;
 
    case FRAMESETSTATUS_DONE:
 
      _distren_asprintf(&status, "%s", "completed");
 
      break;
 
    case FRAMESETSTATUS_CANCELED:
 
@@ -1280,24 +1232,39 @@ int main(int argc, char *argv[])
 
	  if(returnnum)
 
	    {
 
	      fprintf(stderr,"No frames are available to render at this time. Idling...\n");
 
	      sleep(10);
 
	    }
 
	  else
 
	    remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client
 
	}
 
      /* If the client states that they finished the frame */
 
      	if(clientsays == DISTREN_REQUEST_DONEFRAME){
 
      	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
      	  finish_frame(&general_info, job, frame->num); // @TODO: Make sure this actually works.
 
      	}
 

	
 
      distrend_action_free(action);
 
    }
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  return 0;
 
}
 

	
 
/**
 
   constructs the filename for a distrenjob's serialized XML
 
   to be stored/retrieved from/in.
 
 */
 
char *job_getserialfilename(struct general_info *geninfo, unsigned int jobnum)
 
{
 
  char *filename;
 

	
 
  _distren_asprintf(&filename, "%s/stor/job/%d/distrenjob.xml",
 
		    geninfo->config->datadir,
 
		    jobnum);
 

	
 
  return filename;
 
}
src/server/distrenjob.c
Show inline comments
 
@@ -2,48 +2,49 @@
 
  Copyright 2009 Nathan Phillip Brink <ohnobinki@ohnopublishing.net>
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "asprintf.h"
 
#include "distrenjob.h"
 
#include "slavefuncs.h"
 

	
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlwriter.h>
 

	
 
void distrenjob_free(struct distrenjob **distrenjob)
 
{
 
  struct distrenjob *dj;
 

	
 
  dj = *distrenjob;
 
  xmlFree(dj->name);
 
  xmlFree(dj->submitter);
 
  xmlFree(dj->email);
 

	
 
  free(dj->frameset);
 

	
 
  free(dj);
 
  *distrenjob = NULL;
 
}
 

	
 
int distrenjob_new(struct distrenjob **distrenjob)
 
{
 
  struct distrenjob *dj;
 

	
 
  dj = malloc(sizeof(struct distrenjob));
 
  if(!dj)
 
    {
 
      /* try to catch code that doesn't respect return values
 
@@ -55,49 +56,49 @@ int distrenjob_new(struct distrenjob **d
 

	
 
  dj->next = NULL;
 
  dj->name = (char *)NULL;
 
  dj->submitter = (char *)NULL;
 
  dj->email = (char *)NULL;
 
  dj->jobnum = 0; /*< @todo there should be a central jobnum allocator and a way to save the maximum jobnumber allocated */
 
  dj->priority = 0;
 
  dj->width = 0;
 
  dj->height = 0;
 
  dj->completed_frames = 0;
 
  dj->assigned_frames = 0;
 
  dj->total_render_time = 0;
 
  dj->hibernate = 0;
 
  dj->prev_frame_index = -1;
 
  dj->assigned_render_power = 0;
 
  dj->watchdog_forgiveness = 3600; // initialize watchdog forgiveness at 1 hour
 
  dj->frameset = (struct frameset *)NULL; /*< @todo does frameset need to be initialized here? */
 

	
 
  return 0;
 
}
 

	
 
/**
 
   writes struct from xml
 
*/
 
int xml2distrenjob(struct distrenjob **distrenjob, char *pathtoxml)
 
int distrenjob_unserialize(struct distrenjob **distrenjob, char *pathtoxml)
 
{
 
  struct distrenjob *dj;
 

	
 
  xmlDocPtr xmldoc;
 
  xmlNodePtr xmlnode;
 
  xmlChar *xmlchar;
 

	
 
  xmlXPathContextPtr xmlxpathcontext;
 

	
 
  int tmp;
 

	
 
  if(distrenjob_new(distrenjob))
 
    return 1;
 
  dj = *distrenjob;
 

	
 
  xmldoc = xmlReadFile(pathtoxml, NULL, XML_PARSE_PEDANTIC);
 
  if(!xmldoc)
 
    {
 
      /**
 
	 @todo are we able to depend on libxml2's printed errors or
 
	 channel them into syslog output (eventually)? Currently,
 
	 this error is repetitious of a libxml2 error printed on stderr
 
	 for us.
 
       */
 
@@ -134,24 +135,108 @@ int xml2distrenjob(struct distrenjob **d
 
  tmp = _distren_asprintf((char **)&xmlchar, "/distren/submitters/submitter[attribute::name=\"%s\"]", dj->submitter);
 
  if(tmp <= 0
 
     || !xmlchar)
 
    {
 
      fprintf(stderr, "error using _distren_asprintf\n");
 
      distrenjob_free(distrenjob);
 
      return 6;
 
    }
 
  xmlnode = xml_quickxpath(xmlxpathcontext, xmlchar);
 
  free(xmlchar);
 
  dj->email = (char *)xmlGetProp(xmlnode, (xmlChar *)"email");
 
  if(!dj->email)
 
    {
 
      fprintf(stderr, "error getting email for user ``%s'' from ``%s''\n",
 
	      dj->submitter, pathtoxml);
 
      distrenjob_free(distrenjob);
 
      return 7;
 
    }
 

	
 
  xmlXPathFreeContext(xmlxpathcontext);
 
  xmlFreeDoc(xmldoc);
 
  return 0;
 
}
 

	
 
int distrenjob_serialize(struct distrenjob *job, char *outfile)
 
{
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 
  char *tmpfile;
 
  int tmprtn;
 

	
 
  /**
 
     transactional FS access for POSIX systems...
 
     (probably not implemented correctly)
 
   */
 
  _distren_asprintf(&tmpfile, "%s_", outfile);
 

	
 
  /* create xml document at the location tmp with no compression */
 
  writer = xmlNewTextWriterFilename(tmpfile, 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  /**
 
     write distrenjob element and add its attributes */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"job");
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"name", (xmlChar*)job->name);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"submitter", (xmlChar*)job->submitter);
 
  _distren_asprintf(&tmp, "%d", job->priority);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"priority", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  /**
 
     write resolution element and add its attributes */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"resolution");
 
  _distren_asprintf(&tmp, "%d", job->width);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"width", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", job->height);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"height", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     write video element and its attributes */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"video");
 
  _distren_asprintf(&tmp, "%d", job->frameset[0].num);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"start_frame", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", job->frameset[(job->total_frames - 1)].num);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"end_fame", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"output_format", (xmlChar*)job->output_format);
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     write watchdog forgiveness element */
 
  _distren_asprintf(&tmp, "%d", job->watchdog_forgiveness);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"wd_forgiveness", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  /**
 
     end document */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk */
 
  xmlFreeTextWriter(writer);
 

	
 
  /**
 
     This is the key to transactioanl POSIX
 
     FS programming.
 
   */
 
  tmprtn = rename(tmpfile, outfile);
 
  if(tmprtn == -1)
 
    {
 
      fprintf(stderr, "%s:%d: Error renaming ``%s'' to ``%s''\n",
 
	      __FILE__, __LINE__,
 
	      tmpfile, outfile);
 
      perror("rename");
 
      tmprtn = 1;
 
    }
 
  free(tmpfile);
 

	
 
  return tmprtn;
 
}
src/server/distrenjob.h
Show inline comments
 
@@ -63,42 +63,51 @@ enum framesetstatus
 
  {
 
    FRAMESETSTATUS_CANCELED, /*< The use has canceled this frame */
 
    FRAMESETSTATUS_UNASSIGNED, /*< The frame has not been assigned */
 
    FRAMESETSTATUS_ASSIGNED, /*< The frame has been assigned */
 
    FRAMESETSTATUS_DONE /*< The frame has completed rendering and the slave has returned the product to me */
 
  };
 

	
 
struct frameset {
 
  int num; /*< frame number to render */
 
  char slave_name; /*< user that frame is assigned to */
 
  enum framesetstatus status; /*< status of frame, 0= unassigned, 1= taken, 2= done */
 
  clock_t start_time; /*< time the frame was started */
 
};
 

	
 
/*
 
  related functions
 
*/
 

	
 
/**
 

	
 
   @param distrenjob the address where we will store the pointer of a malloc()ed
 
   distrenjob struct.
 
   @param pathtoxml filename/pathname of the xml file to be read into a distrenjob struct
 
 */
 
int xml2distrenjob(struct distrenjob **distrenjob, char *pathtoxml);
 
int distrenjob_unserialize(struct distrenjob **distrenjob, char *pathtoxml);
 

	
 
/**
 
   support function for xml2distrenjob() to help cleaning up a
 
   serialize a distrenjob into XML
 
   @param distrenjob the address where we will store the pointer of a malloc()ed
 
   distrenjob struct.
 
   @param xmloutput filename/pathname of the xml file to be written
 
 */
 
int distrenjob_serialize(struct distrenjob *distrenjob, char *xmloutputpath);
 

	
 

	
 
/**
 
   support function for distrenjob_unserialize() to help cleaning up a
 
   struct distrenjob when it is incompletely initialized.
 
   Also acts as a general-purpose struct distrenjob free()er ;-)
 
 */
 
void distrenjob_free(struct distrenjob **distrenjob);
 

	
 
/**
 
   initializes an empty, pointless struct distrenjob. This
 
   DOES run malloc() for you. It could return nonzero on error,
 
   but there are no errors to have yet (except for nomem). This
 
   sets all char* to NULL.
 
 */
 
int distrenjob_new(struct distrenjob **distrenjob);
 

	
 
#endif
src/server/slave.c
Show inline comments
 
@@ -183,54 +183,54 @@ int main(int argc, char *argv[])
 
            // Download the Tar
 
            if( curlget(urltoTar, pathtoTar) == 0){
 
              fprintf(stderr, "Job data retrieved successfully\n");
 
              free(urltoTar);
 
            }
 
            else {
 
              fprintf(stderr, "Downloading job data from %s failed. Check your network connection.\n",urltoTar);
 
              return 1; // Eventually make a retry loop
 
            }
 
          }
 
        else
 
          fprintf(stderr, "Using cached job file...\n");
 

	
 
        _distren_asprintf(&outdir, "/tmp/distren/job%d", jobnum);
 
        mkdir("/tmp/distren", 0750); /* @FIXME: Change to tmpdir once it exists */
 
        mkdir(outdir, 0750);
 

	
 
        _distren_asprintf(&tarcmd, "tar -xvf \"%s\" -C \"%s\"", pathtoTar, outdir); /* @TODO:Use a lib here! */
 
        system(tarcmd);
 
        free(tarcmd);
 
        free(pathtoTar);
 
        free(outdir);
 

	
 
        /* Parses a job's XML file, puts data in the myjob struct */
 
        if(xml2distrenjob(&myjob, pathtoXml) == 0)
 
        if(distrenjob_unserialize(&myjob, pathtoXml) == 0)
 
          {
 
            fprintf(stderr, "Well, the XML craziness may have worked. Maybe. \n");
 
            free(pathtoXml);
 

	
 
            /* Frees things up if it was successful. xml2distrenjob() really (usually) only fails if malloc'ing inside it fails */
 
            /* Frees things up if it was successful. distrenjob_unserialize() really (usually) only fails if malloc'ing inside it fails */
 
            distrenjob_free(&myjob);
 
          }
 
        else
 
          {
 
            fprintf(stderr, "XML job data parsing has failed. Please contact a developer, or try again.\n");
 
            return 1;
 
          }
 

	
 
        /* Variable-fillers which require XML */
 
        outputExt = myjob->output_format;
 
        /* Prepares the path to the jobfile */
 
        _distren_asprintf(&pathtoOutput, "%s/job%d/output/job%d-frame%d.%s", datadir, jobnum, jobnum, framenum, outputExt );
 
        free(outputExt);
 

	
 
        /* Execute blender */
 
        if(exec_blender(pathtoJobfile, pathtoOutput, framenum))
 
          {
 
            fprintf(stderr,"Error running Blender. Check your installation and/or your PATH.\n");
 
            return 1;
 
          }
 
        free(pathtoJobfile);
 

	
 
        /* Post-execution */
 
        fprintf(stderr, "Finished frame %d in job %d, uploading...\n", framenum, jobnum);
0 comments (0 inline, 0 general)