Changeset - 02159cabda3c
[Not reviewed]
default
0 1 0
Nathan Brink (binki) - 16 years ago 2009-09-13 22:19:17
ohnobinki@ohnopublishing.net
rename add_job_to_queue() to distrenjob_enqueue()
1 file changed with 4 insertions and 4 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "execio.h"
 
#include "options.h"
 
#include "distrenjob.h"
 
#include "protocol.h"
 
#include "slavefuncs.h"
 

	
 
#include <confuse.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <malloc.h>
 
#include <unistd.h> /* getopt */
 
#include <time.h>
 
#include <sys/stat.h>
 

	
 
/* ******************* Structs ************************ */
 

	
 
// Gets config info from confs
 
struct distrend_config
 
{
 
  cfg_t *mycfg;
 
  struct options_common *options;
 
  struct distrend_listen **listens; /*< Null terminated array of structs */
 
  char *datadir;
 
};
 

	
 

	
 

	
 
/*
 
 frame[frame] Status Assignments:
 
  "NULL" - don't render me
 
  "0" - canceled
 
  "1" - unassigned
 
  "2" - assigned to slave
 
  "3" - completed by slave and uploaded
 
*/
 

	
 
struct general_info {
 
  short int jobs_in_queue; //
 
  unsigned short int free_clients;
 
  unsigned short int rendering_clients;//
 
  unsigned short int total_finished_jobs; //
 
  unsigned int total_frames_rendered; //
 
  unsigned int highest_jobnum;
 
} general_info;
 

	
 

	
 

	
 
/*
 
  internally defined funcs's prototypes
 
*/
 
void status_report_generator(struct distrenjob **distrenjobs_head);
 
void distrenjob_remove(struct distrenjob **head, struct distrenjob *bj);
 

	
 
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum);
 
void add_job_to_queue(struct distrenjob *head, struct distrenjob *job);
 
void distrenjob_enqueue(struct distrenjob *head, struct distrenjob *job);
 

	
 
/* Global Vars, try to cut down on these */
 
jobnum_t jobnum = 0; // The next job number to create in the queue
 
int hcfjob; // Highest consecutively finished job
 
int highest_jobnum; // The job number of the most recently created job, this is used when creating new jobs
 

	
 

	
 
/* ********************** Functions ************************* */
 

	
 
/** Dumps all data in RAM to an xml file (such as current jobs, etc) which is parsed by start_data. Remember to invoke this before shutting down! */
 
int xml_dump()
 
{
 
  return 0;
 
}
 
/**
 
   Performs command stored in a client's request.
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 
/**
 
   Accepts a client's connection
 
 */
 
void distrend_accept()
 
{
 

	
 
}
 
/**
 
   Frees the action
 
*/
 
void distrend_action_free()
 
{
 

	
 
}
 
/**
 
   Start listening
 
*/
 
void distrend_listen()
 
{
 

	
 
}
 
/**
 
   Stop listening
 
*/
 
void distrend_unlisten()
 
{
 

	
 
}
 
/**
 
   This is probably just a placeholder for remotio
 
*/
 
void remotio_send_to_client()
 
{
 
	// I am futile!
 
}
 

	
 
/** Fill variables after crash / shutdown from XML dumps */
 
int start_data(){
 
  struct stat buffer;
 
  if(stat(SYSCONFDIR "/data.xml", &buffer) == 0){
 

	
 
    // @TODO: retrieve total_finished_jobs and total_finished_frames from xml file
 

	
 
    fprintf(stderr,"Parsing XML files and restoring previous state...\n");
 
    return 1;
 
  }
 
  else{
 
    general_info.total_finished_jobs = 0;
 
    general_info.total_frames_rendered = 0;
 
    fprintf(stderr,"Can't find XML dump, starting up fresh.\n");
 
    return 2;
 
  }
 
  free(&buffer); // @TODO: Is this pointless?
 
}
 

	
 
/** Finish-Setter: Sets a frame to the "completed" status.*/
 
void finish_frame(struct distrenjob *distrenjob, int frame){
 
  distrenjob->frameset[frame].status = 2;
 
  distrenjob->frameset[frame].time_to_render = (clock() - distrenjob[jobnum].frameset[frame].start_time); // Consider changing time-to-render to time-for-frame or something?
 
  general_info.total_frames_rendered++; // Increase total frames var for stats
 
}
 

	
 

	
 
/**
 
   creates a structure from starting data, then calls another function to actually add that struct to the queue
 
*/
 
void prepare_distrenjob(struct frameset *head, int type, char *name, char *submitter, char *email, int priority, int start_frame, int end_frame)
 
{
 
  int counter2;
 
  int counter;
 

	
 
  struct distrenjob *distrenjob;
 

	
 
  distrenjob->type = 1; /*@TODO: add type to blendjob struct*/
 
  distrenjob->name = name;
 
  distrenjob->submitter = submitter;
 
  distrenjob->email = email;
 
  distrenjob->priority = priority;
 
  distrenjob->frameset = malloc(end_frame - start_frame + 1);
 
  distrenjob->total_frames = (end_frame - start_frame + 1); // sets the total number of frames in animation for status purposes
 

	
 
  /* @TODO: what does this do? frameset array is declared above */
 
  /*
 
  distrenjob->frameset = malloc(sizeof(struct frameset) * numframes);
 
  if(!job->frameset)
 
    fprintf(stderr, "Error allocating memory\n");
 
  */
 

	
 
  /* prepares all the frames by setting that status to "unassigned" */
 
  counter2 = start_frame;
 
  for(counter = 0; counter <= (end_frame- start_frame + 1); counter++){
 
    distrenjob->frameset[counter].num = counter2;
 
    distrenjob->frameset[counter].status = 0;
 

	
 
    counter2++;
 
  }
 

	
 
  /* @TODO: change frame_num_struct_builder to add_blendjob_to_queue */
 
  add_job_to_queue(head, distrenjob);
 
  distrenjob_enqueue(head, distrenjob);
 

	
 
  general_info.jobs_in_queue++;
 
}
 

	
 
/** -- OLD -- Queuer: Adds files to the queue
 
int queue(struct distrenjob *distrenjob, int type, char *name, char *submitter, char *email, int priority, int mode, int spp, struct frameset *frameset) {
 
  // Type: 1 = blender, add more types later
 
  // jobnum is the next available job number
 
  if(type == 1){
 
    distrenjob->name = name;
 
    distrenjob->submitter = submitter;
 
    distrenjob->email = email;
 
    distrenjob->priority = priority;
 
    distrenjob->frameset = frameset;
 
  }
 
  else{
 
    // Throw error.
 
    fprintf(stderr,"You tried to queue a job type that isn't supported by this version of DistRen.\nI have no idea how this would happen, but it just did.\n");
 
    return 0; // fail
 
  }
 
jobnum++; // Advance the jobnumber for the next queued job
 
return 0; // OK
 
}*/
 

	
 

	
 
/**
 
  Status Report Generator:
 
  -figures out how much of the job is done, where jobnum corresponds to the job number
 
  -removes finished jobs
 

	
 
  @param distrenjobs_head a pointer to a pointer because the head of the distrenjobs linked list may need to be changed by distrenjob_remove
 
*/
 
void status_report_generator(struct distrenjob **distrenjobs_head)
 
{
 
  struct distrenjob *distrenjob_ptr;
 
  unsigned short workers_working; /*< used to count the total number of workers working */
 
  unsigned int numjobs; /*< used to track number of jobs */
 

	
 
  distrenjob_ptr = *distrenjobs_head;
 
  workers_working = 0;
 
  numjobs = 0;
 

	
 
  while(distrenjob_ptr)
 
    {
 
      if(distrenjob_ptr->priority != 0)
 
	/* If the job is not done, scan it */
 
	{
 
	  unsigned int framecounter;  /*< to scan through frames */
 
	  unsigned long finished_frames; /*< variable that counts the completed frames */
 
	  unsigned int pending_frames; /*< variable that counts the assigned frames */
 
	  float percent_frames_finished;  /*< variable that stores the percent done of the distrenjob */
 
	  unsigned int total_time;  /*< total time taken to render all the completed frames for a job */
 

	
 
	  framecounter = 0;
 
	  finished_frames = 0;
 
	  pending_frames = 0;
 
	  percent_frames_finished = 0;
 
	  total_time = 0;
 

	
 
	  while(framecounter < distrenjob_ptr->total_frames)
 
	    /* scans through frames, based on their status it runs a statement(s) */
 
	    {
 
	      if(distrenjob_ptr->frameset[framecounter].status == 2)
 
		/* If the frame is done */
 
		{
 
		  finished_frames ++;
 
		  total_time += distrenjob_ptr->frameset[framecounter].time_to_render;
 
		}
 

	
 
	      if(distrenjob_ptr->frameset[framecounter].status == 1)
 
		/* If the frame is assigned */
 
		{
 
		  pending_frames ++;
 
		  workers_working ++;
 
		}
 

	
 
	      framecounter ++;
 
	    } /* while(framecounter < distrenjob_ptr->total_frames) */
 

	
 
	  // find the percent of completed frames
 
	  percent_frames_finished = (finished_frames / distrenjob_ptr->total_frames) * 100; /*< @LordofWar: extraneous parentheses! */
 

	
 
	  /* updates values in the distrenjob struct */
 
	  distrenjob_ptr->completed_frames = finished_frames;
 
	  distrenjob_ptr->assigned_frames = pending_frames;
 
	  distrenjob_ptr->percent_done = percent_frames_finished;
 
	  distrenjob_ptr->avg_render_time = (total_time / finished_frames); /*< extraneous parentheses! */
 
	  distrenjob_ptr->time_remaining = (distrenjob_ptr->avg_render_time * (distrenjob_ptr->total_frames - finished_frames)); /*< extraneous parentheses! */
 

	
 
	  if(finished_frames == distrenjob_ptr->total_frames)
 
	    /* If all frames are complete */
 
	    {
 
	      distrenjob_ptr->priority = 0; /*< set priority to zero to indicate job is complete */
 
	      distrenjob_remove(distrenjobs_head, distrenjob_ptr); /*< remove this job from the linkedlist */
 
	      distrenjob_free(distrenjob_ptr);
 
	      general_info.total_finished_jobs++; /*< add one to the total finished jobs */
 

	
 
	    }
 
	  else if (finished_frames > distrenjob_ptr->total_frames)
 
	    /* just in case ;-) */
 
	    {
 
	      fprintf(stderr, "%s:%d: finished_frames (%lu) > distrenjob_ptr->total_frames (%d)",
 
		      __FILE__, __LINE__,
 
		      finished_frames,
 
		      distrenjob_ptr->total_frames);
 
	      abort();
 
	    }
 
	}
 

	
 
      general_info.rendering_clients = workers_working; /*< should this be a +=? */
 

	
 
      distrenjob_ptr = distrenjob_ptr->next; /*< This is the essence of linked lists and iterating through them */
 
      numjobs ++;
 
    } /* while(distrenjob_ptr) */
 

	
 
  general_info.jobs_in_queue = (highest_jobnum - general_info.total_finished_jobs); /*< extraneous parentheses! */
 
}
 

	
 

	
 
/**
 
   Structure Builder: This function creates frame array based on the total number of frames to be rendered, which will then be parsed by function frame_farmer.
 
*/
 
/** Structure Builder: This function creates frame array based on the total number of frames to be rendered, which will then be parsed by function frame_farmer. */
 
void add_job_to_queue(struct distrenjob *head, struct distrenjob *job) {
 
void distrenjob_enqueue(struct distrenjob *head, struct distrenjob *job) {
 
  struct distrenjob *prev_job = head; // create pointer to previous job
 
  struct distrenjob *current_job;     // create pointer to current_job (which job is being compared to)
 

	
 
  // iterate through linked list of jobs
 
  for(current_job = head; current_job != NULL; current_job = current_job->next){
 
    if(current_job == NULL){ // if it has reached the end of the list, add job there
 
      current_job = job;
 
      break;
 
    }
 
    else if(job->priority < current_job->priority){ // if job's priority is less than current_job's priority, insert job
 
      prev_job->next = job;                        // keep in mind 1 is the highest priority given to jobs, head has a
 
      job->next = current_job;                     // priority of zero so it will always be before other jobs
 
      break;
 
    }
 

	
 
    prev_job = current_job;
 
  } /* for(current_job) */
 
}
 

	
 
/**
 
   @arg head I may end up changing the head if job == head
 
 */
 
void change_job_priority(struct distrenjob *head, struct distrenjob *job, int new_priority){
 
  distrenjob_remove(head, job);
 
  job->priority = new_priority;
 
  struct distrenjob *current_job;
 
  struct distrenjob *prev_job = head;
 

	
 
  if(job->frameset[0].status == 0){ // if job was not yet started
 
    add_job_to_queue(head, job);
 
    distrenjob_enqueue(head, job);
 
  }
 
  else{ // if job has already been started then place it before the jobs with the same priority
 
    // iterate through linked list of jobs
 
    for(current_job = head; current_job != NULL; current_job = current_job->next){
 
      if(current_job == NULL){ // if it has reached the end of the list, add job there
 
        current_job = job;
 
        break;
 
      }
 
      else if(job->priority <= current_job->priority){ // if job's priority is less than or equal to current_job's priority, insert job
 
        prev_job->next = job;                        // keep in mind 1 is the highest priority given to jobs, head has a
 
        job->next = current_job;                     // priority of zero so it will always be before other jobs
 
        break;
 
      }
 

	
 
      prev_job = current_job;
 
    }
 
  }
 
}
 

	
 
/**
 
  Frame Finder: matches your computer up with a lovely frame to render
 
  TODO: Major issue here, the client needs to know the frame number, AND the job number!
 
  Notice that this function starts by looking at the oldest job first
 

	
 
	TODO: Link this up with the main() function to check if there are frames available or not and provide jobnum/framenum to the client
 

	
 
  @return 0 success, other: error
 
*/
 
int frame_finder(struct distrenjob *head, struct distrenjob **job, struct frameset **frame)
 
{
 
  int your_frame;  // your_frame is an integer value that will be given to the client as the frame number to render
 
  int your_job;	   // @TODO: Fixme: do we need this var? you_job is an integer value that must ALSO be given to the client
 

	
 
  unsigned short int found;
 
  unsigned short int priority;
 

	
 
  struct distrenjob *distrenjob_ptr;
 

	
 
  found = 0;
 
  while(!found)
 
    {
 
      /* enumerate through priority levels */
 
      for(priority = 10;
 
        priority > 0
 
            && !found;
 
           priority --)
 
	/* Find the job with the highest priority */
 
        for(distrenjob_ptr = head;
 
            distrenjob_ptr != NULL
 
            && !found;
 
            distrenjob_ptr = distrenjob_ptr->next)
 
          if(distrenjob_ptr->priority == priority)
 
	    found = 1;
 

	
 
      if(!found)
 
        {
 
          fprintf(stderr, "No more jobs to render\n");
 
          return 1;
 
        }
 

	
 
      found = 0;
 
      for(your_frame = 0;
 
        your_frame < distrenjob_ptr->total_frames;
 
        your_frame ++)
 
        if(distrenjob_ptr->frameset[your_frame].status == 0)
 
          found = 1;
 

	
 
      if(!found)
 
        {
 
          /* there are no frames left in this job */
 
          distrenjob_ptr->priority --;
 

	
 
	  /* If that job had no open frames for some reason, run the status report generator so that */
 
          status_report_generator(&head);
 

	
 
          /* should the job be removed now? */
 
          fprintf(stderr, "Job %d is finished, this is probably the place to call the job-removal function\n", distrenjob_ptr->jobnum);
 
          /* @TODO: At this point, all slaves should be instructed to delete the source data for this job. */
 
        }
 
    }
 

	
 
  /* Sets the value of the frame to 1, which means it is taken */
 
  distrenjob_ptr->frameset[your_frame].status++;
 

	
 
  distrenjob_ptr->frameset[your_frame].start_time = clock();
 

	
 
  *job = distrenjob_ptr;
 
  *frame =  &distrenjob_ptr->frameset[your_frame];
 

	
 
  return 0;
 
}
 

	
 
/** Checks for dead, laggy, or stale slaves */
 
void blend_frame_watchdog(struct distrenjob *distrenjob_head)
 
{
 
  unsigned short int watchdog_forgiveness; /*< seconds to wait on a frame before re-assigning it */
 
  struct distrenjob *distrenjob_ptr;
 
  unsigned int counter;
 

	
 
  watchdog_forgiveness = 1; /*< hours of forgiveness before frame is re-assigned @TODO: Make this more user-configurable (maybe per-job), 3 hours is a LONG time */
 
  distrenjob_ptr = distrenjob_head;
 

	
 
  for(distrenjob_ptr = distrenjob_head; distrenjob_ptr; distrenjob_ptr = distrenjob_ptr->next)
 
    /* iterate through jobs */
 

	
 
    for(counter = 0; counter < distrenjob_ptr->total_frames; counter ++)
 
      /* iterate through all frames for this job*/
 
      {
 
        if((distrenjob_ptr->frameset[counter].start_time + (watchdog_forgiveness * 3600)) < clock())
 
          /*
 
            If frame is not completed within the number of hours specified by watchdog_forgiveness
 
            Then change the frame status to unassigned
 
           */
 
          distrenjob_ptr->frameset[counter].status = 0;
 
      }
 

	
 
}
 

	
 
/**
 
   Finds a distrenjob struct based on the jobnum
 
   @arg jobnum job number to search for
 
   @return NULL on job doesn't exist
 
 */
 
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum)
 
{
 
  struct distrenjob *distrenjob_ptr;
 

	
 
  /*
 
    The conditions of the for loop will leave distrenjob_ptr at NULL if the end of the list is reached. It will leave it pointing to the correct job if it is found.
 
   */
 
  for(distrenjob_ptr = head;
 
      distrenjob_ptr
 
        && distrenjob_ptr->jobnum != jobnum;
 
      distrenjob_ptr = distrenjob_ptr->next);
 

	
 
  return distrenjob_ptr;
 
}
 

	
 

	
 
/**
 
   Removes a distrenjob from the distrenjob linked list.
 

	
 
   @arg head a double pointer. the head pointer will have to be changed if distrenjob == *head. Thus, make sure that the pointer points to the pointer to the head that all functions use. (I'm going to come back to this and misunderstand myself ;-))
 
 */
 
void distrenjob_remove(struct distrenjob **head, struct distrenjob *bj)
 
{
 
  struct distrenjob *previous_distrenjob;
 

	
 
  if(bj == *head)
 
    *head = bj->next;
 
  else
 
    {
 

	
 
      for(previous_distrenjob = *head;
 
        previous_distrenjob
 
          && previous_distrenjob->next != bj; /*< stop on the distrenjob that comes before bj */
 
          previous_distrenjob = previous_distrenjob->next)
 
	/* all of the action is in the definition of the for loop itself */;
 

	
 
      /*
 
	This removes references to bj from the linked list. I.E., we now skip bj when iterating through the list
 
       */
 
      previous_distrenjob->next = bj->next;
 
    }
 
}
 

	
 

	
 
/* Grabs config info from confs */
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
 
{
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_END()
 
    };
 

	
 
  struct distrenjob *distrenjob;
 

	
 
  int tmp;
 

	
 
  xmlinit();
 
  /*
0 comments (0 inline, 0 general)