Changeset - 716f6fbdba6f
[Not reviewed]
default
0 1 0
Nathan Brink (binki) - 16 years ago 2009-12-20 16:25:49
ohnobinki@ohnopublishing.net
fix comments and style, fix distrenjob_enqueue()
1 file changed with 47 insertions and 23 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "asprintf.h"
 
#include "distrenjob.h"
 
#include "execio.h"
 
#include "listen.h"
 
#include "options.h"
 
#include "protocol.h"
 
#include "slavefuncs.h"
 

	
 
#include <confuse.h>
 
#include <malloc.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 

	
 
/* local defs */
 
#define NUMBER_ELEVEN 11
 

	
 
/* ******************* Structs ************************ */
 

	
 
struct general_info
 
{
 
  struct distrenjob head;
 

	
 
  struct distrend_config *config;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 

	
 
/*
 
  internally defined funcs's prototypes @TODO: Make all functions nice and proper 
 
*/
 
void distrenjob_remove(struct general_info *, struct distrenjob *bj);
 

	
 
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum);
 
int distrenjob_enqueue(struct general_info *, struct distrenjob *job);
 
int mortition(struct general_info *, struct distrenjob *job);
 
int update_xml_joblist(struct general_info *);
 
int createQueueFromXML(struct general_info*);
 
int reCreateQueueFromXML(struct general_info*, xmlDocPtr doc, xmlNodePtr current);
 
void update_general_info(struct general_info*);
 
int import_general_info(struct general_info*);
 
int updateJobStatsXML(struct distrenjob *job);
 

	
 
char *job_getserialfilename(struct general_info *, unsigned int jobnum);
 

	
 
/* ********************** Functions ************************* */
 

	
 
/** Dumps all data in RAM to an xml file (such as current jobs, etc) which is parsed by start_data. Remember to invoke this before shutting down! */
 
int xml_dump()
 
{
 
  return 0;
 
}
 
/**
 
   Performs command stored in a client's request.
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 
/**
 
   Frees the action
 
*/
 
void distrend_action_free()
 
{
 

	
 
}
 

	
 
/** 
 
    Fill variables at startup from XML dumps or defaults
 
 */
 
int start_data(struct general_info *general_info, char *datadir)
 
{
 
  struct stat buffer;
 

	
 
  /**
 
     defaults
 
   */
 
  memset(&general_info->head, '\0', sizeof(struct distrenjob));
 
  general_info->head.priority = 0;
 

	
 
  general_info->jobs_in_queue = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->total_finished_jobs = 0;
 
  general_info->total_frames_rendered = 0;
 
  general_info->highest_jobnum = 0;
 
  general_info->hibernate = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  fprintf(stderr, "Initialized default global and queue states\n");
 

	
 
  if(stat("general_info.xml", &buffer) == 0)
 
    {
 
      fprintf(stderr, "previous state file found, loading:\n");
 

	
 
      fprintf(stderr, "Parsing XML files and restoring previous state...\n");
 
      if(import_general_info(general_info))
 
	fprintf(stderr, "FAILURE\n");
 
      
 
      fprintf(stderr, "Restoring queue...\n");
 
      if(createQueueFromXML(general_info))
 
	fprintf(stderr, "FAILURE\n");
 
      
 
      fprintf(stderr, "done\n");
 
    }
 
  
 
  return 0;
 
}
 

	
 
/** Finish-Setter: Sets a frame to the "completed" status.*/
 
void finish_frame(struct general_info *geninfo, struct distrenjob *distrenjob, int frame)
 
{
 
  distrenjob->frameset[frame].status = FRAMESETSTATUS_DONE;
 
  distrenjob->total_render_time = distrenjob->total_render_time + (clock() - distrenjob->frameset[frame].start_time);
 
  distrenjob->completed_frames ++;
 
  distrenjob->assigned_frames --;
 
  geninfo->total_frames_rendered ++; /*< Increase total frames var for stats */
 

	
 
  update_general_info(geninfo);
 
  updateJobStatsXML(distrenjob);
 
}
 

	
 
/**
 
   checks to see if a job is actually done.
 
   - scans the folder of the job to make sure all output files are present
 
*/
 
int mortition(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  short int isJobDone;
 
  int counter;
 
  char *path_and_number;
 
  struct stat buffer;
 

	
 
  isJobDone = 1; // set isJobDone to true
 
  isJobDone = 1;
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      _distren_asprintf(&path_and_number, "%s/stor/job%d/out/%d.%s",
 
			geninfo->config->datadir,
 
			job->jobnum,
 
			job->frameset[counter].num,
 
			job->output_format);
 
      if(stat(path_and_number, &buffer) == -1)
 
        {
 
	  /**
 
	     missing frame found
 
	   */
 
          job->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 
          job->completed_frames--;
 
          geninfo->total_frames_rendered--;
 
	  /** if a missing frame is found, set isJobDone to false */
 
          isJobDone = 0;
 
        }
 
      free(path_and_number);
 
    }
 

	
 
  /** if all frames were accounted for */
 
  if(isJobDone)
 
    {
 
      /**
 
	 all frames were accounted for
 
      */
 
      distrenjob_remove(geninfo, job);
 
      distrenjob_free(&job);
 
      geninfo->jobs_in_queue --;
 
      update_xml_joblist(geninfo);
 
    }
 
  else
 
    /**
 
       if the job isn't done, have frame_finder() start from the first frame, allowing it to see the frames that are now unassigned 
 
    */
 
    job->prev_frame_index = -1;
 

	
 
  update_general_info(geninfo);
 

	
 
  return 0;
 
}
 

	
 
/** scans the frames of a job to initialize a job after server */
 
/* returns 1 if the job is completely done and there are no missing frames */
 
/* returns 0 if a missing frame is found */
 
/**
 
   scans the frames of a job to initialize a job after server
 
   @return 0 if the job is completely done and there are no missing frames,
 
           1 if missing frames are found.
 
*/
 
int restoreJobState(struct distrenjob *job)
 
{
 
  short int isJobDone;
 
  int counter;
 
  char *path_and_number;
 
  struct stat buffer;
 

	
 
  isJobDone = 1;
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      _distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format); /*< @TODO this path is used in multiple places, construct/build/determine it in a central function */
 
      if(stat(path_and_number, &buffer) == 0)
 
        {
 
          job->frameset[counter].status = FRAMESETSTATUS_ASSIGNED;
 
          job->completed_frames++;
 
        }
 
      else
 
        isJobDone = 0;
 
      free(path_and_number);
 
    }
 

	
 
  return isJobDone;
 
  return !isJobDone;
 
}
 

	
 
/** creates a structure from starting data, then calls another function to actually add that struct to the queue */
 
/**
 
   creates a structure from starting data, then calls another
 
   function to actually add that struct to the queue.
 
*/
 
int prepare_distrenjob(struct general_info *geninfo, int type, char *name, char *submitter, char *email, int priority, int start_frame, int end_frame, int width, int height)
 
{
 
  int counter2;
 
  int counter;
 
  int tmp;
 
  char *path_with_num;
 
  char *serialfile;
 

	
 
  struct distrenjob *distrenjob;
 
  tmp = distrenjob_new(&distrenjob);
 
  if(tmp)
 
    return 1;
 

	
 
  geninfo->highest_jobnum ++;
 
  distrenjob->jobnum = geninfo->highest_jobnum;
 

	
 
  distrenjob->type = 1;
 
  distrenjob->name = name;
 
  distrenjob->submitter = submitter;
 
  distrenjob->email = email;
 
  distrenjob->priority = priority;
 
  distrenjob->width = width;
 
  distrenjob->height = height;
 
  distrenjob->total_frames = (end_frame - start_frame + 1); /* sets the total number of frames in animation for status purposes */
 
  /** sets the total number of frames in animation for status purposes */
 
  distrenjob->total_frames = (end_frame - start_frame + 1);
 
  distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames);
 
  if(!distrenjob->frameset)
 
    {
 
      distrenjob_free(&distrenjob);
 
      return 1;
 
    }
 

	
 
  /* prepares all the frames by setting that status to "unassigned" */
 
  /** prepares all the frames by setting that status to "unassigned" */
 
  counter2 = start_frame;
 
  for(counter = 0; counter < distrenjob->total_frames; counter++){
 
    distrenjob->frameset[counter].num = counter2;
 
    distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 

	
 
    counter2++;
 
  }
 

	
 
  _distren_asprintf(&path_with_num, "stor/job%d/out/", distrenjob->jobnum);
 
  fprintf(stderr, ">>NOT<< creating dir ``%s''\n", path_with_num); /*< @TODO recursively create job directory */
 
  serialfile = job_getserialfilename(geninfo, distrenjob->jobnum);
 

	
 
  free(path_with_num);
 

	
 
  /* add job to queue */
 
  /**
 
     add job to queue
 
  */
 
  fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_serialize()\n");
 
  distrenjob_serialize(distrenjob, serialfile);
 
  free(serialfile);
 

	
 
  fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_enqueue()\n");
 
  distrenjob_enqueue(geninfo, distrenjob);
 

	
 
  geninfo->jobs_in_queue ++;
 
  fprintf(stderr, "\nprepare_distrenjob: attempting update_xml_joblist()\n");
 
  update_xml_joblist(geninfo);
 
  fprintf(stderr, "\nprepare_distrenjob: attempting update_general_info()\n");
 
  update_general_info(geninfo);
 
  fprintf(stderr, "\nprepare_distrenjob: attempting updateJobStatsXML()\n");
 
  updateJobStatsXML(distrenjob);
 

	
 
  return 0;
 
}
 

	
 

	
 
/** distrenjob_enqueue: This function adds the job to the queue based on its priority */
 
/**
 
   distrenjob_enqueue: This function adds the job to the queue based on its priority
 
*/
 
int distrenjob_enqueue(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  struct distrenjob *prev_job;
 
  struct distrenjob *current_job;
 
  struct distrenjob *head;
 

	
 
  head = &geninfo->head;
 
  prev_job = head;
 

	
 
  // iterate through linked list of jobs
 
  for(current_job = head->next; 1; current_job = current_job->next)
 
  /**
 
     iterate through linked jobs
 
  */
 
  for(current_job = head->next; current_job; current_job = current_job->next)
 
  {
 
	fprintf(stderr, "enqueue loop iteration\n");
 
    if(current_job == NULL){ // if it has reached the end of the list, add job there
 
    if(job->priority < current_job->priority)
 
      {
 
	/**
 
	   if job's priority is less than current_job's priority, insert job
 
	   keep in mind 1 is the highest priority given to jobs, head has a
 
	   priority of zero so it will always be before other jobs
 
	*/
 
      prev_job->next = job;
 
      fprintf(stderr, "adding job at end of queue\n");
 
      break;
 
    }
 
    else if(job->priority < current_job->priority){ // if job's priority is less than current_job's priority, insert job
 
      prev_job->next = job;                        // keep in mind 1 is the highest priority given to jobs, head has a
 
      job->next = current_job;                     // priority of zero so it will always be before other jobs
 
	job->next = current_job;
 
      fprintf(stderr, "adding job before jobname: %s\n", current_job->name);
 
      break;
 

	
 
	return 0;
 
    }
 

	
 
    prev_job = current_job;
 
  } /* for(current_job) */
 
  }
 

	
 
  /**
 
     if it has reached the end of the list, add job there
 
  */
 
  prev_job->next = job;
 
  fprintf(stderr, "adding job at end of queue\n");
 

	
 
  return 0;
 
}
 

	
 
/** Changes the priority of an existing (and maybe running) job. @arg head I may end up changing the head if job == head */
 
int change_job_priority(struct general_info *geninfo, struct distrenjob *job, int new_priority){
 
  struct distrenjob *current_job;
 
  struct distrenjob *prev_job;
 
  char *serialname;
 

	
 
  distrenjob_remove(geninfo, job);
 
  job->priority = new_priority;
 

	
 
  prev_job = &geninfo->head;
 

	
 
  if(job->frameset[0].status == FRAMESETSTATUS_UNASSIGNED)
 
    /**
 
       if job was not yet started
 
    */
 
    {
 
      distrenjob_enqueue(geninfo, job);
 
      return 0;
 
    }
 

	
 
  /**
 
     iterate through linked list of jobs 
 
  */
 
    for(current_job = &geninfo->head;
 
	current_job != NULL
 
	  && job->priority > current_job->priority;
 
	current_job = current_job->next)
 
      prev_job = current_job;
 

	
 
    prev_job->next = job;
 
    job->next = current_job;
 

	
 

	
 
  update_xml_joblist(geninfo);
 
  /** reserialize after changes */
 
  serialname = job_getserialfilename(geninfo, job->jobnum);
 
  distrenjob_serialize(job, serialname);
 
  free(serialname);
 

	
 
  return 0;
 
}
 

	
 
/**
 
  Frame Finder: matches your computer up with a lovely frame to render, starts looking at oldest job first
 
  @TODO: We must return both jobnum and framenum
 
  @TODO: Add calls in main()
 
  @return 0 success, other: error
 
*/
 
int find_jobframe(struct general_info *geninfo, struct distrenjob **job, struct frameset **frame)
 
{
 
  if(geninfo->hibernate)
 
    return 1;
 

	
 
  unsigned int frame_counter;
 
  unsigned short int found;
 

	
 
  struct distrenjob *distrenjob_ptr;
 

	
 
  found = 0;
 
  /* iterate through jobs from first to last */
 
  for(distrenjob_ptr = geninfo->head.next; distrenjob_ptr && !distrenjob_ptr->hibernate; distrenjob_ptr = distrenjob_ptr->next)
 
  {
 
    for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++)
 
    {
 
      if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found
 
      {
 
    	found = 1;
 
    	distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED;
 
    	distrenjob_ptr->frameset[frame_counter].start_time = clock();
 
    	distrenjob_ptr->assigned_frames++;
 
    	distrenjob_ptr->prev_frame_index = frame_counter;
 
    	updateJobStatsXML(distrenjob_ptr);
 
      }
 

	
 
      if(found)
 
    	break;
 
    }
 

	
 
    if(found)
 
      break;
 
  }
 

	
 
  if(!found)
 
    {
 
      fprintf(stderr, "No more jobs to render\n");
 
      sleep(1); /*< @todo eliminate the need for this line*/
 
      return 1;
 
    }
 

	
 
  *job = distrenjob_ptr;
 
  *frame = &distrenjob_ptr->frameset[frame_counter];
 

	
 
  return 0;
 
}
 

	
 
int find_jobframe_from_job(struct distrenjob *distrenjob_ptr, struct distrenjob **job, struct frameset **frame)
 
{
 
  unsigned int frame_counter;
 
  unsigned short int found;
 

	
 
  found = 0;
 
  for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++)
 
  {
 
    if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found
 
      {
 
    	found = 1;
 
		distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED;
 
		distrenjob_ptr->frameset[frame_counter].start_time = clock();
 
		distrenjob_ptr->assigned_frames++;
 
		distrenjob_ptr->prev_frame_index = frame_counter;
 
		updateJobStatsXML(distrenjob_ptr);
 
      }
 

	
 
    if(found)
 
      break;
 
  }
 

	
 

	
 
  if(!found)
 
    {
 
      fprintf(stderr, "No more frames in this job number %d", distrenjob_ptr->jobnum);
 
      distrenjob_ptr->prev_frame_index = frame_counter;
 
      return 1;
 
    }
 

	
 
  *job = distrenjob_ptr;
 
  *frame = &distrenjob_ptr->frameset[frame_counter];
 

	
 
  return 0;
 
}
 

	
 
// find a frame to render when the job that the last frame was for no longer exists
 
int find_jobframe_new(struct general_info *geninfo, int rend_pwr, struct distrenjob **job, struct frameset **frame)
 
{
 
  if(geninfo->hibernate)
 
    return 1;
 

	
 
  float power_difference;
 
  float greatest_power_difference;
 
  unsigned short int found;
 
  struct distrenjob *job_to_render;
 

	
 
  struct distrenjob *distrenjob_ptr;
 

	
 
  greatest_power_difference = -10000;
 
  found = 0;
 
  /* iterate through jobs from first to last */
 
  for(distrenjob_ptr = geninfo->head.next;
 
      distrenjob_ptr && !distrenjob_ptr->hibernate;
 
      distrenjob_ptr = distrenjob_ptr->next)
 
  {
 
	  if(distrenjob_ptr->prev_frame_index < (distrenjob_ptr->total_frames - 1))
 
	  {
 
	    /**
 
	       Why is the number 11 found here again? --ohnobinki
 
	     */
 
		  power_difference = (((float)geninfo->total_render_power / (float)geninfo->total_priority_pieces) * (NUMBER_ELEVEN - (float)distrenjob_ptr->priority));
 
		  power_difference = power_difference - (float)distrenjob_ptr->assigned_render_power;
 

	
 
		  fprintf(stderr, "job num %d\npower difference: %f\n", distrenjob_ptr->jobnum, power_difference);
 

	
 
		  if(power_difference > greatest_power_difference)
 
		  {
 
			  job_to_render = distrenjob_ptr;
 
			  greatest_power_difference = power_difference;
 
			  found = 1;
 
		  }
 
	  }
 
  }
 

	
 
  if(!found)
 
    {
 
      fprintf(stderr, "No more jobs to render\n");
 
      return 1;
 
    }
 

	
 
  find_jobframe_from_job(job_to_render, job, frame);
 
  job_to_render->assigned_render_power = job_to_render->assigned_render_power + rend_pwr;
 

	
 
  return 0;
 
}
 

	
 
// gets a frame to render from the same job that the previously rendered frame was from
 
int find_jobframe_again(struct general_info *geninfo, int jobnum, int rend_pwr, struct distrenjob **job, struct frameset **frame)
 
{
 
  if(geninfo->hibernate)
 
    return 1;
 

	
0 comments (0 inline, 0 general)