Changeset - 4f50bc8e5de6
[Not reviewed]
default
0 4 0
Nathan Brink (binki) - 16 years ago 2010-01-03 12:39:20
ohnobinki@ohnopublishing.net
remove struct distrend_action, improve distrend_accept()
4 files changed with 75 insertions and 38 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "asprintf.h"
 
#include "distrenjob.h"
 
#include "execio.h"
 
#include "listen.h"
 
#include "options.h"
 
#include "protocol.h"
 
#include "slavefuncs.h"
 

	
 
#include <confuse.h>
 
#include <malloc.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 

	
 
/* local defs */
 
#define NUMBER_ELEVEN 11
 

	
 
/* ******************* Structs ************************ */
 

	
 
struct general_info
 
{
 
  struct distrenjob head;
 

	
 
  struct distrend_config *config;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 

	
 
/*
 
  internally defined funcs's prototypes @TODO: Make all functions nice and proper 
 
*/
 
void distrenjob_remove(struct general_info *, struct distrenjob *bj);
 

	
 
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum);
 
int distrenjob_enqueue(struct general_info *, struct distrenjob *job);
 
int mortition(struct general_info *, struct distrenjob *job);
 
int update_xml_joblist(struct general_info *);
 
int createQueueFromXML(struct general_info*);
 
int reCreateQueueFromXML(struct general_info*, xmlDocPtr doc, xmlNodePtr current);
 
void update_general_info(struct general_info*);
 
int import_general_info(struct general_info*);
 
int updateJobStatsXML(struct distrenjob *job);
 

	
 
char *job_getserialfilename(struct general_info *, unsigned int jobnum);
 

	
 
/* ********************** Functions ************************* */
 

	
 
/** Dumps all data in RAM to an xml file (such as current jobs, etc) which is parsed by start_data. Remember to invoke this before shutting down! @TODO: Fill this stub*/
 
int xml_dump()
 
{
 
  return 0;
 
}
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 
/**
 
   Frees the action. @TODO: Fill stub
 
*/
 
void distrend_action_free()
 
{
 

	
 
}
 

	
 
/** 
 
    Fill variables at startup from XML dumps or defaults
 
 */
 
int start_data(struct general_info *general_info, char *datadir)
 
{
 
  struct stat buffer;
 

	
 
  /**
 
     defaults
 
   */
 
  memset(&general_info->head, '\0', sizeof(struct distrenjob));
 
  general_info->head.priority = 0;
 

	
 
  general_info->jobs_in_queue = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->total_finished_jobs = 0;
 
  general_info->total_frames_rendered = 0;
 
  general_info->highest_jobnum = 0;
 
  general_info->hibernate = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  fprintf(stderr, "Initialized default global and queue states\n");
 

	
 
  if(stat("general_info.xml", &buffer) == 0)
 
    {
 
      fprintf(stderr, "previous state file found, loading:\n");
 

	
 
      fprintf(stderr, "Parsing XML files and restoring previous state...\n");
 
      if(import_general_info(general_info))
 
	fprintf(stderr, "FAILURE\n");
 
      
 
      fprintf(stderr, "Restoring queue...\n");
 
      if(createQueueFromXML(general_info))
 
	fprintf(stderr, "FAILURE\n");
 
      
 
      fprintf(stderr, "done\n");
 
    }
 
  
 
  return 0;
 
}
 

	
 
/** Finish-Setter: Sets a frame to the "completed" status.*/
 
void finish_frame(struct general_info *geninfo, struct distrenjob *distrenjob, int frame)
 
{
 
  distrenjob->frameset[frame].status = FRAMESETSTATUS_DONE;
 
  distrenjob->total_render_time = distrenjob->total_render_time + (clock() - distrenjob->frameset[frame].start_time);
 
  distrenjob->completed_frames ++;
 
  distrenjob->assigned_frames --;
 
  geninfo->total_frames_rendered ++; /*< Increase total frames var for stats */
 

	
 
  update_general_info(geninfo);
 
  updateJobStatsXML(distrenjob);
 
}
 

	
 
/**
 
   checks to see if a job is actually done.
 
   - scans the folder of the job to make sure all output files are present
 
*/
 
int mortition(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  short int isJobDone;
 
  int counter;
 
  char *path_and_number;
 
  struct stat buffer;
 

	
 
  isJobDone = 1;
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      _distren_asprintf(&path_and_number, "%s/stor/job%d/out/%d.%s",
 
			geninfo->config->datadir,
 
			job->jobnum,
 
			job->frameset[counter].num,
 
			job->output_format);
 
      if(stat(path_and_number, &buffer) == -1)
 
        {
 
	  /**
 
	     missing frame found
 
	   */
 
          job->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 
          job->completed_frames--;
 
          geninfo->total_frames_rendered--;
 
          isJobDone = 0;
 
        }
 
      free(path_and_number);
 
    }
 

	
 
  if(isJobDone)
 
    {
 
      /**
 
	 all frames were accounted for
 
      */
 
      distrenjob_remove(geninfo, job);
 
      distrenjob_free(&job);
 
      geninfo->jobs_in_queue --;
 
      update_xml_joblist(geninfo);
 
    }
 
  else
 
    /**
 
       if the job isn't done, have frame_finder() start from the first frame, allowing it to see the frames that are now unassigned 
 
    */
 
    job->prev_frame_index = -1;
 

	
 
  update_general_info(geninfo);
 

	
 
  return 0;
 
}
 

	
 
/**
 
   scans the frames of a job to initialize a job after server
 
   @return 0 if the job is completely done and there are no missing frames,
 
           1 if missing frames are found.
 
*/
 
int restoreJobState(struct distrenjob *job)
 
{
 
  short int isJobDone;
 
  int counter;
 
  char *path_and_number;
 
  struct stat buffer;
 

	
 
  isJobDone = 1;
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      _distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format); /*< @TODO this path is used in multiple places, construct/build/determine it in a central function */
 
      if(stat(path_and_number, &buffer) == 0)
 
        {
 
          job->frameset[counter].status = FRAMESETSTATUS_ASSIGNED;
 
          job->completed_frames++;
 
        }
 
      else
 
        isJobDone = 0;
 
      free(path_and_number);
 
    }
 

	
 
  return !isJobDone;
 
}
 

	
 
/**
 
   creates a structure from starting data, then calls another
 
   function to actually add that struct to the queue.
 
*/
 
int prepare_distrenjob(struct general_info *geninfo, int type, char *name, char *submitter, char *email, int priority, int start_frame, int end_frame, int width, int height)
 
{
 
  int counter2;
 
  int counter;
 
  int tmp;
 
  char *path_with_num;
 
  char *serialfile;
 

	
 
  struct distrenjob *distrenjob;
 
  tmp = distrenjob_new(&distrenjob);
 
  if(tmp)
 
    return 1;
 

	
 
  geninfo->highest_jobnum ++;
 
  distrenjob->jobnum = geninfo->highest_jobnum;
 

	
 
  distrenjob->type = 1;
 
  distrenjob->name = name;
 
  distrenjob->submitter = submitter;
 
  distrenjob->email = email;
 
  distrenjob->priority = priority;
 
  distrenjob->width = width;
 
  distrenjob->height = height;
 
  /** sets the total number of frames in animation for status purposes */
 
  distrenjob->total_frames = (end_frame - start_frame + 1);
 
  distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames);
 
  if(!distrenjob->frameset)
 
    {
 
      distrenjob_free(&distrenjob);
 
      return 1;
 
    }
 

	
 
  /** prepares all the frames by setting that status to "unassigned" */
 
  counter2 = start_frame;
 
  for(counter = 0; counter < distrenjob->total_frames; counter++){
 
    distrenjob->frameset[counter].num = counter2;
 
    distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
 

	
 
    counter2++;
 
  }
 

	
 
  _distren_asprintf(&path_with_num, "stor/job%d/out/", distrenjob->jobnum);
 
  fprintf(stderr, ">>NOT<< creating dir ``%s''\n", path_with_num); /*< @TODO recursively create job directory */
 
  serialfile = job_getserialfilename(geninfo, distrenjob->jobnum);
 

	
 
  free(path_with_num);
 

	
 
  /**
 
     add job to queue
 
  */
 
  fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_serialize()\n");
 
  distrenjob_serialize(distrenjob, serialfile);
 
  free(serialfile);
 

	
 
  fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_enqueue()\n");
 
  distrenjob_enqueue(geninfo, distrenjob);
 

	
 
  geninfo->jobs_in_queue ++;
 
  fprintf(stderr, "\nprepare_distrenjob: attempting update_xml_joblist()\n");
 
  update_xml_joblist(geninfo);
 
  fprintf(stderr, "\nprepare_distrenjob: attempting update_general_info()\n");
 
  update_general_info(geninfo);
 
  fprintf(stderr, "\nprepare_distrenjob: attempting updateJobStatsXML()\n");
 
  updateJobStatsXML(distrenjob);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   distrenjob_enqueue: This function adds the job to the queue based on its priority
 
*/
 
int distrenjob_enqueue(struct general_info *geninfo, struct distrenjob *job)
 
{
 
  struct distrenjob *prev_job;
 
  struct distrenjob *current_job;
 
  struct distrenjob *head;
 

	
 
  head = &geninfo->head;
 
  prev_job = head;
 

	
 
  /**
 
     iterate through linked jobs
 
  */
 
  for(current_job = head->next; current_job; current_job = current_job->next)
 
  {
 
    fprintf(stderr, "enqueue loop iteration\n");
 
    if(job->priority < current_job->priority)
 
      {
 
	/**
 
	   if job's priority is less than current_job's priority, insert job
 
	   keep in mind 1 is the highest priority given to jobs, head has a
 
	   priority of zero so it will always be before other jobs
 
	*/
 
	prev_job->next = job;
 
	job->next = current_job;
 
	fprintf(stderr, "adding job before jobname: %s\n", current_job->name);
 

	
 
	return 0;
 
      }
 

	
 
    prev_job = current_job;
 
  }
 

	
 
  /**
 
     if it has reached the end of the list, add job there
 
  */
 
  prev_job->next = job;
 
  fprintf(stderr, "adding job at end of queue\n");
 

	
 
  return 0;
 
}
 

	
 
/** Changes the priority of an existing (and maybe running) job. @arg head I may end up changing the head if job == head */
 
int change_job_priority(struct general_info *geninfo, struct distrenjob *job, int new_priority){
 
  struct distrenjob *current_job;
 
  struct distrenjob *prev_job;
 
  char *serialname;
 

	
 
  distrenjob_remove(geninfo, job);
 
  job->priority = new_priority;
 

	
 
  prev_job = &geninfo->head;
 

	
 
  if(job->frameset[0].status == FRAMESETSTATUS_UNASSIGNED)
 
    /**
 
       if job was not yet started
 
    */
 
    {
 
      distrenjob_enqueue(geninfo, job);
 
      return 0;
 
    }
 

	
 
  /**
 
     iterate through linked list of jobs 
 
  */
 
    for(current_job = &geninfo->head;
 
	current_job != NULL
 
	  && job->priority > current_job->priority;
 
	current_job = current_job->next)
 
      prev_job = current_job;
 

	
 
    prev_job->next = job;
 
    job->next = current_job;
 

	
 

	
 
  update_xml_joblist(geninfo);
 
  /** reserialize after changes */
 
  serialname = job_getserialfilename(geninfo, job->jobnum);
 
  distrenjob_serialize(job, serialname);
 
  free(serialname);
 

	
 
  return 0;
 
}
 

	
 
/**
 
  Frame Finder: matches your computer up with a lovely frame to render, starts looking at oldest job first
 
  @TODO: We must return both jobnum and framenum
 
  @TODO: Add calls in main()
 
  @return 0 success, other: error
 
*/
 
int find_jobframe(struct general_info *geninfo, struct distrenjob **job, struct frameset **frame)
 
{
 
  if(geninfo->hibernate)
 
    return 1;
 

	
 
  unsigned int frame_counter;
 
  unsigned short int found;
 

	
 
  struct distrenjob *distrenjob_ptr;
 

	
 
  found = 0;
 
  /* iterate through jobs from first to last */
 
  for(distrenjob_ptr = geninfo->head.next; distrenjob_ptr && !distrenjob_ptr->hibernate; distrenjob_ptr = distrenjob_ptr->next)
 
  {
 
    for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++)
 
    {
 
      if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found
 
      {
 
    	found = 1;
 
    	distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED;
 
    	distrenjob_ptr->frameset[frame_counter].start_time = clock();
 
    	distrenjob_ptr->assigned_frames++;
 
    	distrenjob_ptr->prev_frame_index = frame_counter;
 
    	updateJobStatsXML(distrenjob_ptr);
 
      }
 

	
 
      if(found)
 
    	break;
 
    }
 

	
 
    if(found)
 
      break;
 
  }
 

	
 
  if(!found)
 
    {
 
      fprintf(stderr, "No more jobs to render\n");
 
      sleep(1); /*< @todo eliminate the need for this line*/
 
      return 1;
 
    }
 

	
 
  *job = distrenjob_ptr;
 
  *frame = &distrenjob_ptr->frameset[frame_counter];
 

	
 
  return 0;
 
}
 

	
 
int find_jobframe_from_job(struct distrenjob *distrenjob_ptr, struct distrenjob **job, struct frameset **frame)
 
{
 
  unsigned int frame_counter;
 
  unsigned short int found;
 

	
 
  found = 0;
 
  for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++)
 
  {
 
    if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found
 
      {
 
    	found = 1;
 
		distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED;
 
		distrenjob_ptr->frameset[frame_counter].start_time = clock();
 
		distrenjob_ptr->assigned_frames++;
 
		distrenjob_ptr->prev_frame_index = frame_counter;
 
		updateJobStatsXML(distrenjob_ptr);
 
      }
 

	
 
    if(found)
 
      break;
 
  }
 

	
 

	
 
  if(!found)
 
    {
 
      fprintf(stderr, "No more frames in this job number %d", distrenjob_ptr->jobnum);
 
      distrenjob_ptr->prev_frame_index = frame_counter;
 
      return 1;
 
    }
 

	
 
  *job = distrenjob_ptr;
 
  *frame = &distrenjob_ptr->frameset[frame_counter];
 
@@ -899,445 +892,441 @@ int updateJobStatsXML(struct distrenjob 
 
	  return 1;
 
}
 

	
 

	
 
/**
 
   updates job_list.xml which lists all the jobs in the queue
 
   @return 0 means success
 
*/
 
int update_xml_joblist(struct general_info *geninfo)
 
{
 
  struct distrenjob *job;
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 
  char *tmp2;
 
  int counter;
 

	
 
  /**
 
     update timestamp
 
  */
 
  geninfo->timestamp ++;
 
  if(geninfo->timestamp > 65530)
 
    geninfo->timestamp = 0;
 

	
 
  writer = xmlNewTextWriterFilename("job_list.xml", 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  /**
 
     create root element job_list
 
  */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"job_list");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->timestamp);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  geninfo->total_priority_pieces = 0;
 
  counter = 0;
 
  for(job = geninfo->head.next; job; job = job->next)
 
    {
 
      _distren_asprintf(&tmp, "jobnum%d", counter);
 
      _distren_asprintf(&tmp2, "%d", job->jobnum);
 
      xmlTextWriterWriteElement(writer, (xmlChar*)tmp, (xmlChar*)tmp2);
 
      free(tmp2);
 
      free(tmp);
 

	
 
      /**
 
	this is needed for the new frame finder to work 
 
      
 
	Why the random constant numeral 11? --ohnobinki
 
      */
 
      geninfo->total_priority_pieces = geninfo->total_priority_pieces + (NUMBER_ELEVEN - job->priority);
 

	
 
      counter++;
 
    }
 

	
 
  /**
 
     close elements and end document
 
  */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk
 
  */
 
  xmlFreeTextWriter(writer);
 
  return 0;
 
}
 

	
 
/**
 
 returns 0 if completed successfully
 
 this reads a list of jobs in the queue before DistRen was shut down
 
 and then adds the jobs to the queue, as if it were never shut down
 
*/
 
int createQueueFromXML(struct general_info *geninfo)
 
{
 
  int tmp;
 

	
 
  xmlDocPtr doc;
 
  xmlNodePtr cur;
 

	
 
  /**
 
     load xml document
 
  */
 
  doc = xmlParseFile("job_list.xml");
 
  if(doc == NULL)
 
    {
 
      fprintf(stderr, "createQueueFromXML: document not found\n");
 
      return 1;
 
    }
 

	
 
  /**
 
     have cur point to the root element of the xml document pointed to by doc
 
  */
 
  cur = xmlDocGetRootElement(doc);
 
  if(cur == NULL)
 
    {
 
      fprintf(stderr, "createQueueFromXML: document empty\n");
 
      return 1;
 
    }
 

	
 
  if(xmlStrcmp(cur->name, (const xmlChar*)"job_list"))
 
    {
 
      fprintf(stderr, "createQueueFromXML: incorrect root element (%s)", (char*)cur->name);
 
    }
 

	
 
  /**
 
     moves into the children elements of job_list
 
  */
 
  cur = cur->xmlChildrenNode;
 

	
 
  tmp = reCreateQueueFromXML(geninfo, doc, cur);
 

	
 
  xmlFreeDoc(doc);
 

	
 
  return tmp;
 
}
 

	
 
/**
 
   inserts jobs at front of queue, starting
 
   with the last job in the job_list xml file
 
   to preserve the order of the queue
 
*/
 
int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current)
 
{
 
  struct distrenjob *holder; // holds the job that "was" after head, so that the new struct can be inserted after head
 
  struct distrenjob *job; // job to be added
 
  xmlChar *tmp;
 
  int job_num;
 
  if(current == NULL) // base case, if element doesn't exist then don't do anything
 
    return 0;
 

	
 
  /**
 
     recursively call itself so that the next job in
 
     the queue is added to the front before the current one
 
  */
 
  if(reCreateQueueFromXML(geninfo, doc, current->next))
 
    /**
 
       bail if failure recursevely
 
        ^-- is this the best thing to do in this case?
 
	@TODO bail-out cleanup
 
     */
 
    return 1;
 

	
 
  /** actual work is done now: add the job to the front of the queue */
 
  holder = geninfo->head.next;
 

	
 
  tmp = xmlNodeListGetString(doc, current->xmlChildrenNode, 1); // get job number
 
  job_num = atoi((char*)tmp);
 
  xmlFree(tmp);
 

	
 
  job = createJobFromXML(geninfo, job_num);
 
  if(job == NULL)
 
    return 1;
 

	
 
  /**
 
     insert job at front of the queue
 
  */
 
  geninfo->head.next = job;
 
  job->next = holder;
 

	
 
  return 0;
 
}
 

	
 
/* ************************** Test Functions ************************* */
 
int printFrameInfo(struct frameset *frame)
 
{
 
  char *status;
 

	
 
  status = NULL;
 

	
 
  switch(frame->status)
 
  {
 
    case FRAMESETSTATUS_UNASSIGNED:
 
      _distren_asprintf(&status, "%s", "unassigned");
 
      break;
 
    case FRAMESETSTATUS_ASSIGNED:
 
      _distren_asprintf(&status, "%s", "assigned");
 
      break;
 
    case FRAMESETSTATUS_DONE:
 
      _distren_asprintf(&status, "%s", "completed");
 
      break;
 
    case FRAMESETSTATUS_CANCELED:
 
      _distren_asprintf(&status, "%s", "canceled");
 
  }
 

	
 
  printf("frame #: %d --> %s\n", frame->num, status);
 
  free(status);
 

	
 
  return 0;
 
}
 

	
 
int printJob(struct distrenjob *job)
 
{
 
  int counter;
 
  fprintf(stderr, "frame_num: status\n");
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      printFrameInfo(&job->frameset[counter]);
 
    }
 

	
 
  return 1;
 
}
 

	
 
int printJobInfo(struct distrenjob *job)
 
{
 
  fprintf(stderr, "type: %d\n", job->type);
 
  fprintf(stderr, "name: %s\n", job->email);
 
  fprintf(stderr, "submitter: %s\n", job->submitter);
 
  fprintf(stderr, "e-mail: %s\n", job->email);
 
  fprintf(stderr, "priority: %d\n", job->priority);
 
  fprintf(stderr, "completed: %d\n", job->completed_frames);
 
  fprintf(stderr, "assigned: %d\n", job->assigned_frames);
 
  fprintf(stderr, "total: %d\n", job->total_frames);
 
  fprintf(stderr, "watchdog: %d\n", job->watchdog_forgiveness);
 
  fprintf(stderr, "hibernate: %d\n", job->hibernate);
 
  fprintf(stderr, "prev_frame: %d\n", job->prev_frame_index);
 
  fprintf(stderr, "render power: %ld\n", job->assigned_render_power);
 

	
 
  return 1;
 
}
 

	
 
int printAllJobnums(struct distrenjob *head)
 
{
 
  struct distrenjob *current_job;
 
  int total_jobs;
 
  fprintf(stderr, "job numbers in the order they will be processed:\n");
 

	
 
  total_jobs = 0;
 
  for(current_job = head->next; current_job; current_job = current_job->next)
 
    {
 
      fprintf(stderr, "%d: %s\n", current_job->jobnum, current_job->name);
 
      total_jobs++;
 
    }
 

	
 
  fprintf(stderr, "\n%d jobs in queue\n\n", total_jobs);
 

	
 
  return 1;
 
}
 

	
 
/* ************************** Main ************************* */
 

	
 
int main(int argc, char *argv[])
 
{
 

	
 
  /* Argument-parser */
 
  int counter;
 
  int test; // Interactive test mode if 1
 

	
 
  struct general_info general_info;
 

	
 
  int cont;
 
  struct distrend_clientset *clients;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
  } clientstatus;
 

	
 
  int command;
 
  jobnum_t jobnum;
 
  struct distrenjob *tmp_job;
 
  struct frameset *tmp_frame;
 
  int type;
 
  char *name;
 
  char *submitter;
 
  char *email;
 
  int priority;
 
  int width;
 
  int height;
 
  int start_frame;
 
  int end_frame;
 
  size_t read_buf;
 

	
 
  xmlinit();
 

	
 
  test = 0;
 
  for(counter = 0; counter < argc; counter ++)
 
    {
 
      if(strcmp(argv[counter], "-h") == 0)
 
      {
 
    	  fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
 
		  return 2;
 
      }
 

	
 
      else if(strcmp(argv[counter], "-t") == 0)
 
      {
 
    	  fprintf(stderr, "Entering into test mode...\n\n");
 
		  test=1;
 
      }
 
    }
 
  cont = 1;
 

	
 
  if(distrend_do_config(argc, argv, &general_info.config))
 
    return 1;
 

	
 
  if(start_data(&general_info, general_info.config->datadir))
 
    {
 
      fprintf(stderr, "%s:%d: start_data() failed\n", __FILE__, __LINE__);
 
      return 1;
 
    }
 

	
 
  // pre-loaded jobs for testing
 
  prepare_distrenjob(&general_info, 1, "awesome", "LordOfWar", "onlylordofwar@gmail.com", 8, 1, 100, 640, 480);
 
  prepare_distrenjob(&general_info, 1, "hamburger", "Ohnobinki", "ohnobinki@ohnopublishing.net", 3, 1, 50, 1280, 720);
 

	
 
  while(test == 1)
 
  {
 
    fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
 
    fprintf(stderr, "\t1 \tPrint all frames in a job\n");
 
    fprintf(stderr, "\t2 \tExamine certain job\n");
 
    fprintf(stderr, "\t3 \tGet a frame to render\n");
 
    fprintf(stderr, "\t4 \tAdd a job\n");
 
    fprintf(stderr, "\t5 \tDelete a job\n");
 
    fprintf(stderr, "\t6 \tPrint jobnums in queue\n");
 
    fprintf(stderr, "\t7 \tPrint general info\n");
 
    fprintf(stderr, "\t8 \tQuit\n");
 

	
 
    scanf("%d", &command);
 

	
 
    switch(command)
 
    {
 
    case 1:
 
      fprintf(stderr, "Job number: ");
 
      scanf("%d", &jobnum);
 
      printJob(distrenjob_get(&general_info.head, jobnum));
 
      break;
 
    case 2:
 
      fprintf(stderr, "Job number: ");
 
      scanf("%d", &jobnum);
 
      printJobInfo(distrenjob_get(&general_info.head, jobnum));
 
      break;
 
    case 3:
 
      general_info.total_render_power ++;
 
      if(!find_jobframe_again(&general_info, -1, 1, &tmp_job, &tmp_frame))
 
      {
 
    	  fprintf(stderr, "frame was found, details below\n");
 
          fprintf(stderr, "Job#:%d\n", tmp_job->jobnum);
 
          fprintf(stderr, "Frame#:%d\n", tmp_frame->num);
 
      }
 

	
 
      break;
 
    case 4:
 
      name = NULL;
 
      submitter = NULL;
 
      email = NULL;
 

	
 
      fprintf(stderr, "\nType: \n\t 1 \t blender\n\t 2 \t povray\n"); scanf("%d", &type);
 
      fprintf(stderr, "\nName: ");        scanf("\n"); getline(&name, &read_buf, stdin);
 
      fprintf(stderr, "\nSubmitter: ");                getline(&submitter, &read_buf, stdin);
 
      fprintf(stderr, "\nEmail: ");                    getline(&email, &read_buf, stdin);
 
      fprintf(stderr, "\nPriority: ");                 scanf("%d", &priority);
 
      fprintf(stderr, "\nStart frame: ");              scanf("%d", &start_frame);
 
      fprintf(stderr, "\nEnd frame: ");                scanf("%d", &end_frame);
 
      fprintf(stderr, "\nWidth: ");                    scanf("%d", &width);
 
      fprintf(stderr, "\nHeight: ");                   scanf("%d", &height);
 
      prepare_distrenjob(&general_info, type, name, submitter, email, priority, start_frame, end_frame, width, height);
 
      break;
 
    case 5:
 
      fprintf(stderr, "\nJob number: ");
 
      scanf("%d", &jobnum);
 
      distrenjob_remove(&general_info, distrenjob_get(&general_info.head, jobnum));
 
      break;
 
    case 6:
 
      printAllJobnums(&general_info.head);
 
      break;
 
    case 7:
 
      fprintf(stderr, "\nHighest job number: %d", general_info.highest_jobnum);
 
      fprintf(stderr, "\nJobs in queue: %d", general_info.jobs_in_queue);
 
      fprintf(stderr, "\nTotal frames rendered: %d", general_info.total_frames_rendered);
 
      fprintf(stderr, "\nTimestamp: %lu", (long)general_info.timestamp);
 
      fprintf(stderr, "\nTotal priority pieces: %ld", general_info.total_priority_pieces);
 
      fprintf(stderr, "\nTotal render power: %ld\n", general_info.total_render_power);
 
      break;
 
    case 8:
 
      fprintf(stderr,"Goodbye.\n");
 
      return 0;
 
    default:
 
      fprintf(stderr, "Invalid input, please try again.\n");
 
    }
 
  }
 

	
 
  distrend_listen(general_info.config, &clients);
 
  /* This is called the "main loop" */
 
  while(cont)
 
  while(!general_info.config->die)
 
    {
 
      struct distrend_action *action;
 
      int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config, clients, &action);
 
      cont = distrend_do(action);
 
      distrend_accept(general_info.config, clients);
 

	
 
      /* Make the following code more event-driven */
 
      frame_watchdog(&general_info.head);
 

	
 

	
 
      struct frameset *frame;
 
      struct distrenjob *job;
 

	
 
      /* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */
 
      if(clientstatus == CLIENTSTATUS_IDLE)
 
	{
 
	  int returnnum = find_jobframe(&general_info, &job, &frame); // Finds a frame to render
 
	  if(returnnum)
 
	    {
 
	      fprintf(stderr,"No frames are available to render at this time. Idling...\n");
 
	      sleep(10);
 
	    }
 
	  else
 
	    remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client
 
	}
 
      /* If the client states that they finished the frame */
 
      	if(clientsays == DISTREN_REQUEST_DONEFRAME){
 
      	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
      	  finish_frame(&general_info, job, frame->num); // @TODO: Make sure this actually works.
 
      	}
 

	
 
      distrend_action_free(action);
 
    }
 
    } /* while(!general_info.config->die) */
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  return 0;
 
}
 

	
 
/**
 
   constructs the filename for a distrenjob's serialized XML
 
   to be stored/retrieved from/in.
 
   @return pointer to filename allocated using something
 
           malloc() and free() compatible. Must be free()ed by the
 
	   caller
 
 */
 
char *job_getserialfilename(struct general_info *geninfo, unsigned int jobnum)
 
{
 
  char *filename;
 

	
 
  _distren_asprintf(&filename, "%s/stor/job/%d/distrenjob.xml",
 
		    geninfo->config->datadir,
 
		    jobnum);
 

	
 
  return filename;
 
}
src/server/distrend.h
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
struct distrend_config;
 

	
 
#ifndef _DISTREN_DISTREN_H_
 
#define _DISTREN_DISTREN_H_
 

	
 
#include <confuse.h>
 

	
 
#include "listen.h"
 

	
 
struct distrend_config
 
{
 
  cfg_t *mycfg;
 
  struct options_common *options;
 
  struct distrend_listen *listens; /*< Null terminated array of structs */
 
  char *datadir;
 
};
 

	
 
struct distrend_action
 
{
 
  struct distrend_client *client;
 
  int die;
 
};
 

	
 
#endif
src/server/listen.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "listen.h"
 

	
 
#include <list.h>
 
#include <malloc.h>
 
#include <netinet/in.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#include <sys/select.h>
 
#include <sys/socket.h>
 

	
 
/* local */
 

	
 
struct distrend_clientset
 
{
 
  LIST *clients;
 

	
 
  /*
 
    for select()
 
   */
 
  fd_set readfds;
 
  fd_set writefds;
 
  int nfds;
 
};
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
 
int distrend_client_free(struct distrend_client *client);
 

	
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
 
{
 
  int tmp;
 

	
 
  struct sockaddr_in6 sockaddr =
 
    {
 
      .sin6_family = AF_INET6,
 
      .sin6_port = 0,
 
      .sin6_flowinfo = 0,
 
      .sin6_addr = IN6ADDR_ANY_INIT,
 
      .sin6_scope_id = 0
 
    };
 

	
 
  *clients = malloc(sizeof(struct distrend_clientset));
 

	
 
  (*clients)->clients = list_init();
 

	
 
  sockaddr.sin6_port = htonl(4050);
 

	
 
  config->listens->sock = socket(AF_INET6, SOCK_STREAM, 0);
 
  tmp = bind(config->listens->sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
 
  if(tmp == -1)
 
    {
 
      perror("bind");
 
      free(*clients);
 

	
 
      return 1;
 
    }
 

	
 
  tmp = listen(config->listens->sock, 1);
 

	
 
  FD_ZERO(&(*clients)->readfds);
 
  FD_ZERO(&(*clients)->writefds);
 

	
 
  return 0;
 
}
 

	
 
int distrend_accept_find_client(fd_set *fdset, struct distrend_client *client)
 
int distrend_handleread(struct distrend_config *config,
 
		    struct distrend_client *client)
 
{
 
  fprintf(stderr, "%s:%d: STUB: I'm supposed to read data from the client\n",
 
	  __FILE__, __LINE__);
 

	
 
  return 0;
 
}
 

	
 
struct distrend_accept_client_proc_data
 
{
 
  if(FD_ISSET(client->sock, fdset))
 
    return FALSE;
 
  fd_set *fdset;
 
  struct distrend_config *config;
 
};
 
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
 
				struct distrend_client *client)
 
{
 
  if(!FD_ISSET(client->sock, data->fdset))
 
    /** continue iteration through the list */
 
    return TRUE;
 

	
 
  fprintf(stderr, "%s:%d: My traversal says that sock %d has data waiting\n",
 
	  __FILE__, __LINE__, client->sock);
 
  distrend_handleread(data->config, client);
 

	
 
  /** continue iteration through the list */
 
  return TRUE;
 
}
 

	
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, struct distrend_action **action)
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients)
 
{
 
  int tmp;
 
  fd_set readfds;
 
  fd_set writefds;
 

	
 
  struct distrend_client *cli;
 

	
 
  memcpy(&readfds, &clients->readfds, sizeof(fd_set));
 
  memcpy(&writefds, &clients->writefds, sizeof(fd_set));
 

	
 
  tmp = select(clients->nfds, &readfds, &writefds, NULL, (struct timeval *)NULL);
 
  if(tmp == -1)
 
    {
 
      perror("select");
 

	
 
      return 1;
 
    }
 

	
 
  /**
 
     check if our traversal function found a bit that was set
 
     deal with all sockets that have data waiting
 
   */
 
  if(list_traverse(clients->clients, &readfds, &distrend_accept_find_client, LIST_FRNT | LIST_ALTR) == LIST_OK)
 
    {
 
      cli = (struct distrend_client *)list_curr(clients->clients);
 
      fprintf(stderr, "%s:%d: My traversal says that sock %d has data waiting, which is %s\n",
 
	     __FILE__, __LINE__,
 
	     cli->sock, FD_ISSET(cli->sock, &readfds) ? "true" : "false");
 
      fprintf(stderr, "stub\n");
 
      return 1;
 
    }
 
  list_traverse(clients->clients, &readfds, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
 

	
 

	
 
  return 0;
 
}
 

	
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients)
 
{
 
  fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
 

	
 
  return 1;
 
}
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client()
 
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
 
{
 
    fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
 
}
 

	
 
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
 
{
 
  fprintf(stderr, "%s:%d: I am futile! And I'm happy because of it :-D\n", __FILE__, __LINE__);
 
  *client = malloc(sizeof(struct distrend_client));
 
  if(!*client)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 
  (*client)->sock = sock;
 
  (*client)->state = state;
 
  (*client)->inmsgs = list_init();
 
  (*client)->outmsgs = list_init();
 

	
 
  return 0;
 
}
 

	
 
int distrend_client_free(struct distrend_client *client)
 
{
 
  list_free(client->inmsgs, (void *)LIST_DEALLOC);
 
  list_free(client->outmsgs, (void *)LIST_DEALLOC);
 

	
 
  fprintf(stderr, "%s:%d: stub!", __FILE__, __LINE__);
 
  return 1;
 
}
src/server/listen.h
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
struct distrend_clientset;
 
struct distrend_listen;
 
struct distrend_client;
 

	
 
#ifndef _DISTREN_LISTEN_H
 
#define _DISTREN_LISTEN_H
 

	
 
#include "distrend.h"
 

	
 
#include <list.h>
 

	
 
enum distrend_client_state
 
  {
 
    DISTREND_CLIENT_PREAUTH,
 
    DISTREND_CLIENT_GOODDOGGY,
 
    DISTREND_CLIENT_BADBOY
 
  };
 

	
 
struct distrend_listen
 
{
 
  int port;
 
  int sock;
 
};
 

	
 
struct distrend_client
 
{
 
  int sock;
 
  int state;
 
  enum distrend_client_state state;
 
  LIST *inmsgs;
 
  LIST *outmsgs;
 
};
 

	
 

	
 

	
 
/**
 
   initializes the listens and clientset
 
   @param config the configuration from distrend
 
   @param clients a pointer to a struct distrend_clientset pointer which will be set to memory allocated for the clientset
 
 */
 
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients);
 

	
 
/**
 
   checks states of the sockets I'm managing. If there are any new connections,
 
   or activity on any sockets, I'll call the appropriate function.
 
   I will block until there is some sort of activity, including
 
   signals. If you want to cleanly shut down, it's best to register
 
   signal handlers somewhere
 
 */
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, struct distrend_action **action);
 
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients);
 

	
 
/**
 
   cleans listening socket. Unnecessary for a working server, currently a stub.
 
 */
 
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients);
 

	
 
/**
 
   This is probably just NOT a placeholder for remotio
 
*/
 
void remotio_send_to_client();
 

	
 
#endif
0 comments (0 inline, 0 general)