/*
Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
This file is a part of DistRen.
DistRen is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DistRen is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with DistRen. If not, see .
*/
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
#include "asprintf.h"
#include "distrenjob.h"
#include "execio.h"
#include "listen.h"
#include "options.h"
#include "protocol.h"
#include "slavefuncs.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* local defs */
#define NUMBER_ELEVEN 11
/* ******************* Structs ************************ */
struct general_info
{
struct distrenjob head;
struct distrend_config *config;
int jobs_in_queue;
unsigned int free_clients;
unsigned int rendering_clients;
unsigned int total_finished_jobs;
unsigned int total_frames_rendered;
unsigned int highest_jobnum;
int hibernate;
time_t timestamp;
unsigned long total_render_power;
unsigned long total_priority_pieces;
};
/*
internally defined funcs's prototypes @TODO: Make all functions nice and proper
*/
void distrenjob_remove(struct general_info *, struct distrenjob *bj);
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum);
int distrenjob_enqueue(struct general_info *, struct distrenjob *job);
int mortition(struct general_info *, struct distrenjob *job);
int update_xml_joblist(struct general_info *);
int createQueueFromXML(struct general_info*);
int reCreateQueueFromXML(struct general_info*, xmlDocPtr doc, xmlNodePtr current);
void update_general_info(struct general_info*);
int import_general_info(struct general_info*);
int updateJobStatsXML(struct distrenjob *job);
char *job_getserialfilename(struct general_info *, unsigned int jobnum);
/* ********************** Functions ************************* */
/** Dumps all data in RAM to an xml file (such as current jobs, etc) which is parsed by start_data. Remember to invoke this before shutting down! */
int xml_dump()
{
return 0;
}
/**
Performs command stored in a client's request.
*/
int distrend_do()
{
return 0;
}
/**
Frees the action
*/
void distrend_action_free()
{
}
/**
Fill variables at startup from XML dumps or defaults
*/
int start_data(struct general_info *general_info, char *datadir)
{
struct stat buffer;
/**
defaults
*/
memset(&general_info->head, '\0', sizeof(struct distrenjob));
general_info->head.priority = 0;
general_info->jobs_in_queue = 0;
general_info->free_clients = 0;
general_info->rendering_clients = 0;
general_info->total_finished_jobs = 0;
general_info->total_frames_rendered = 0;
general_info->highest_jobnum = 0;
general_info->hibernate = 0;
general_info->timestamp = 0;
general_info->total_render_power = 0;
general_info->total_priority_pieces = 0;
fprintf(stderr, "Initialized default global and queue states\n");
if(stat("general_info.xml", &buffer) == 0)
{
fprintf(stderr, "previous state file found, loading:\n");
fprintf(stderr, "Parsing XML files and restoring previous state...\n");
if(import_general_info(general_info))
fprintf(stderr, "FAILURE\n");
fprintf(stderr, "Restoring queue...\n");
if(createQueueFromXML(general_info))
fprintf(stderr, "FAILURE\n");
fprintf(stderr, "done\n");
}
return 0;
}
/** Finish-Setter: Sets a frame to the "completed" status.*/
void finish_frame(struct general_info *geninfo, struct distrenjob *distrenjob, int frame)
{
distrenjob->frameset[frame].status = FRAMESETSTATUS_DONE;
distrenjob->total_render_time = distrenjob->total_render_time + (clock() - distrenjob->frameset[frame].start_time);
distrenjob->completed_frames ++;
distrenjob->assigned_frames --;
geninfo->total_frames_rendered ++; /*< Increase total frames var for stats */
update_general_info(geninfo);
updateJobStatsXML(distrenjob);
}
/**
checks to see if a job is actually done.
- scans the folder of the job to make sure all output files are present
*/
int mortition(struct general_info *geninfo, struct distrenjob *job)
{
short int isJobDone;
int counter;
char *path_and_number;
struct stat buffer;
isJobDone = 1;
for(counter = 0; counter < job->total_frames; counter++)
{
_distren_asprintf(&path_and_number, "%s/stor/job%d/out/%d.%s",
geninfo->config->datadir,
job->jobnum,
job->frameset[counter].num,
job->output_format);
if(stat(path_and_number, &buffer) == -1)
{
/**
missing frame found
*/
job->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
job->completed_frames--;
geninfo->total_frames_rendered--;
isJobDone = 0;
}
free(path_and_number);
}
if(isJobDone)
{
/**
all frames were accounted for
*/
distrenjob_remove(geninfo, job);
distrenjob_free(&job);
geninfo->jobs_in_queue --;
update_xml_joblist(geninfo);
}
else
/**
if the job isn't done, have frame_finder() start from the first frame, allowing it to see the frames that are now unassigned
*/
job->prev_frame_index = -1;
update_general_info(geninfo);
return 0;
}
/**
scans the frames of a job to initialize a job after server
@return 0 if the job is completely done and there are no missing frames,
1 if missing frames are found.
*/
int restoreJobState(struct distrenjob *job)
{
short int isJobDone;
int counter;
char *path_and_number;
struct stat buffer;
isJobDone = 1;
for(counter = 0; counter < job->total_frames; counter++)
{
_distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format); /*< @TODO this path is used in multiple places, construct/build/determine it in a central function */
if(stat(path_and_number, &buffer) == 0)
{
job->frameset[counter].status = FRAMESETSTATUS_ASSIGNED;
job->completed_frames++;
}
else
isJobDone = 0;
free(path_and_number);
}
return !isJobDone;
}
/**
creates a structure from starting data, then calls another
function to actually add that struct to the queue.
*/
int prepare_distrenjob(struct general_info *geninfo, int type, char *name, char *submitter, char *email, int priority, int start_frame, int end_frame, int width, int height)
{
int counter2;
int counter;
int tmp;
char *path_with_num;
char *serialfile;
struct distrenjob *distrenjob;
tmp = distrenjob_new(&distrenjob);
if(tmp)
return 1;
geninfo->highest_jobnum ++;
distrenjob->jobnum = geninfo->highest_jobnum;
distrenjob->type = 1;
distrenjob->name = name;
distrenjob->submitter = submitter;
distrenjob->email = email;
distrenjob->priority = priority;
distrenjob->width = width;
distrenjob->height = height;
/** sets the total number of frames in animation for status purposes */
distrenjob->total_frames = (end_frame - start_frame + 1);
distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames);
if(!distrenjob->frameset)
{
distrenjob_free(&distrenjob);
return 1;
}
/** prepares all the frames by setting that status to "unassigned" */
counter2 = start_frame;
for(counter = 0; counter < distrenjob->total_frames; counter++){
distrenjob->frameset[counter].num = counter2;
distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
counter2++;
}
_distren_asprintf(&path_with_num, "stor/job%d/out/", distrenjob->jobnum);
fprintf(stderr, ">>NOT<< creating dir ``%s''\n", path_with_num); /*< @TODO recursively create job directory */
serialfile = job_getserialfilename(geninfo, distrenjob->jobnum);
free(path_with_num);
/**
add job to queue
*/
fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_serialize()\n");
distrenjob_serialize(distrenjob, serialfile);
free(serialfile);
fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_enqueue()\n");
distrenjob_enqueue(geninfo, distrenjob);
geninfo->jobs_in_queue ++;
fprintf(stderr, "\nprepare_distrenjob: attempting update_xml_joblist()\n");
update_xml_joblist(geninfo);
fprintf(stderr, "\nprepare_distrenjob: attempting update_general_info()\n");
update_general_info(geninfo);
fprintf(stderr, "\nprepare_distrenjob: attempting updateJobStatsXML()\n");
updateJobStatsXML(distrenjob);
return 0;
}
/**
distrenjob_enqueue: This function adds the job to the queue based on its priority
*/
int distrenjob_enqueue(struct general_info *geninfo, struct distrenjob *job)
{
struct distrenjob *prev_job;
struct distrenjob *current_job;
struct distrenjob *head;
head = &geninfo->head;
prev_job = head;
/**
iterate through linked jobs
*/
for(current_job = head->next; current_job; current_job = current_job->next)
{
fprintf(stderr, "enqueue loop iteration\n");
if(job->priority < current_job->priority)
{
/**
if job's priority is less than current_job's priority, insert job
keep in mind 1 is the highest priority given to jobs, head has a
priority of zero so it will always be before other jobs
*/
prev_job->next = job;
job->next = current_job;
fprintf(stderr, "adding job before jobname: %s\n", current_job->name);
return 0;
}
prev_job = current_job;
}
/**
if it has reached the end of the list, add job there
*/
prev_job->next = job;
fprintf(stderr, "adding job at end of queue\n");
return 0;
}
/** Changes the priority of an existing (and maybe running) job. @arg head I may end up changing the head if job == head */
int change_job_priority(struct general_info *geninfo, struct distrenjob *job, int new_priority){
struct distrenjob *current_job;
struct distrenjob *prev_job;
char *serialname;
distrenjob_remove(geninfo, job);
job->priority = new_priority;
prev_job = &geninfo->head;
if(job->frameset[0].status == FRAMESETSTATUS_UNASSIGNED)
/**
if job was not yet started
*/
{
distrenjob_enqueue(geninfo, job);
return 0;
}
/**
iterate through linked list of jobs
*/
for(current_job = &geninfo->head;
current_job != NULL
&& job->priority > current_job->priority;
current_job = current_job->next)
prev_job = current_job;
prev_job->next = job;
job->next = current_job;
update_xml_joblist(geninfo);
/** reserialize after changes */
serialname = job_getserialfilename(geninfo, job->jobnum);
distrenjob_serialize(job, serialname);
free(serialname);
return 0;
}
/**
Frame Finder: matches your computer up with a lovely frame to render, starts looking at oldest job first
@TODO: We must return both jobnum and framenum
@TODO: Add calls in main()
@return 0 success, other: error
*/
int find_jobframe(struct general_info *geninfo, struct distrenjob **job, struct frameset **frame)
{
if(geninfo->hibernate)
return 1;
unsigned int frame_counter;
unsigned short int found;
struct distrenjob *distrenjob_ptr;
found = 0;
/* iterate through jobs from first to last */
for(distrenjob_ptr = geninfo->head.next; distrenjob_ptr && !distrenjob_ptr->hibernate; distrenjob_ptr = distrenjob_ptr->next)
{
for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++)
{
if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found
{
found = 1;
distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED;
distrenjob_ptr->frameset[frame_counter].start_time = clock();
distrenjob_ptr->assigned_frames++;
distrenjob_ptr->prev_frame_index = frame_counter;
updateJobStatsXML(distrenjob_ptr);
}
if(found)
break;
}
if(found)
break;
}
if(!found)
{
fprintf(stderr, "No more jobs to render\n");
sleep(1); /*< @todo eliminate the need for this line*/
return 1;
}
*job = distrenjob_ptr;
*frame = &distrenjob_ptr->frameset[frame_counter];
return 0;
}
int find_jobframe_from_job(struct distrenjob *distrenjob_ptr, struct distrenjob **job, struct frameset **frame)
{
unsigned int frame_counter;
unsigned short int found;
found = 0;
for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++)
{
if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found
{
found = 1;
distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED;
distrenjob_ptr->frameset[frame_counter].start_time = clock();
distrenjob_ptr->assigned_frames++;
distrenjob_ptr->prev_frame_index = frame_counter;
updateJobStatsXML(distrenjob_ptr);
}
if(found)
break;
}
if(!found)
{
fprintf(stderr, "No more frames in this job number %d", distrenjob_ptr->jobnum);
distrenjob_ptr->prev_frame_index = frame_counter;
return 1;
}
*job = distrenjob_ptr;
*frame = &distrenjob_ptr->frameset[frame_counter];
return 0;
}
// find a frame to render when the job that the last frame was for no longer exists
int find_jobframe_new(struct general_info *geninfo, int rend_pwr, struct distrenjob **job, struct frameset **frame)
{
if(geninfo->hibernate)
return 1;
float power_difference;
float greatest_power_difference;
unsigned short int found;
struct distrenjob *job_to_render;
struct distrenjob *distrenjob_ptr;
greatest_power_difference = -10000;
found = 0;
/* iterate through jobs from first to last */
for(distrenjob_ptr = geninfo->head.next;
distrenjob_ptr && !distrenjob_ptr->hibernate;
distrenjob_ptr = distrenjob_ptr->next)
{
if(distrenjob_ptr->prev_frame_index < (distrenjob_ptr->total_frames - 1))
{
/**
Why is the number 11 found here again? --ohnobinki
*/
power_difference = (((float)geninfo->total_render_power / (float)geninfo->total_priority_pieces) * (NUMBER_ELEVEN - (float)distrenjob_ptr->priority));
power_difference = power_difference - (float)distrenjob_ptr->assigned_render_power;
fprintf(stderr, "job num %d\npower difference: %f\n", distrenjob_ptr->jobnum, power_difference);
if(power_difference > greatest_power_difference)
{
job_to_render = distrenjob_ptr;
greatest_power_difference = power_difference;
found = 1;
}
}
}
if(!found)
{
fprintf(stderr, "No more jobs to render\n");
return 1;
}
find_jobframe_from_job(job_to_render, job, frame);
job_to_render->assigned_render_power = job_to_render->assigned_render_power + rend_pwr;
return 0;
}
// gets a frame to render from the same job that the previously rendered frame was from
int find_jobframe_again(struct general_info *geninfo, int jobnum, int rend_pwr, struct distrenjob **job, struct frameset **frame)
{
if(geninfo->hibernate)
return 1;
short int found;
struct distrenjob *distrenjob_ptr;
distrenjob_ptr = distrenjob_get(&geninfo->head, jobnum);
// if the job was not found or there are no frames left in the job...
if(!distrenjob_ptr || distrenjob_ptr->prev_frame_index >= (distrenjob_ptr->total_frames - 1))
{
fprintf(stderr, "Job number %d has been finished, finding new job\n", jobnum);
// if previous job isn't yet finished the render power of the slave is removed from it
if(distrenjob_ptr)
distrenjob_ptr->assigned_render_power = distrenjob_ptr->assigned_render_power - rend_pwr;
return find_jobframe_new(geninfo, rend_pwr, job, frame);
}
found = 0;
found = find_jobframe_from_job(distrenjob_ptr, job, frame);
if(found)
find_jobframe_new(geninfo, rend_pwr, job, frame);
return 0;
}
/** Checks for dead, latent, or stale slaves */
void frame_watchdog(struct distrenjob *distrenjob_head)
{
struct distrenjob *distrenjob_ptr;
unsigned int counter;
for(distrenjob_ptr = distrenjob_head->next; distrenjob_ptr; distrenjob_ptr = distrenjob_ptr->next)
/* iterate through jobs */
{
/* if the job has been started, checks by seeing if either to first or second frame has been started */
if(distrenjob_ptr->frameset[0].status != FRAMESETSTATUS_UNASSIGNED || distrenjob_ptr->frameset[1].status != FRAMESETSTATUS_UNASSIGNED)
/* iterate through all frames for this job: */
for(counter = 0; counter < distrenjob_ptr->total_frames; counter ++)
/*watchdog_forgiveness = seconds of forgiveness before frame is re-assigned: */
if((distrenjob_ptr->frameset[counter].start_time + distrenjob_ptr->watchdog_forgiveness) < clock())
{
/*
If frame is not completed within the number of seconds specified by watchdog_forgiveness
Then change the frame status to unassigned
*/
distrenjob_ptr->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
distrenjob_ptr->assigned_frames--;
}
updateJobStatsXML(distrenjob_ptr);
}
}
/**
Finds a distrenjob struct based on the jobnum
@arg jobnum job number to search for
@return NULL on job doesn't exist
*/
struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum)
{
struct distrenjob *distrenjob_ptr;
/*
The conditions of the for loop will leave distrenjob_ptr at NULL if the end of the list is reached. It will leave it pointing to the correct job if it is found.
*/
for(distrenjob_ptr = head;
distrenjob_ptr
&& distrenjob_ptr->jobnum != jobnum;
distrenjob_ptr = distrenjob_ptr->next);
return distrenjob_ptr;
}
/**
Removes a distrenjob from the distrenjob linked list. It does not free the distrenjob, however. You should do that with distrenjob_free() from distrenjob.h
@arg head pointer to the head of the linkedlist of distrenjobs
*/
void distrenjob_remove(struct general_info *geninfo, struct distrenjob *bj)
{
struct distrenjob *previous_distrenjob;
for(previous_distrenjob = &geninfo->head;
previous_distrenjob
&& previous_distrenjob->next != bj; /*< stop on the distrenjob that comes before bj */
previous_distrenjob = previous_distrenjob->next)
;
/*
This removes references to bj from the linked list. I.E., we now skip bj when iterating through the list
*/
previous_distrenjob->next = bj->next;
geninfo->jobs_in_queue --;
}
/* Grabs config info from confs */
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
{
unsigned int counter;
cfg_opt_t myopts_listen[] =
{
CFG_SIMPLE_STR("type", NULL),
CFG_SIMPLE_STR("path", NULL),
CFG_SIMPLE_INT("port", NULL),
CFG_END()
};
cfg_opt_t myopts[] =
{
CFG_SEC("listen", /* this must be imported into struct listens (which must still be declared) */
myopts_listen,
CFGF_MULTI),
CFG_SIMPLE_STR("datadir", NULL),
CFG_END()
};
struct distrenjob *distrenjob;
cfg_t *cfg_listen;
int tmp;
/*
* test distrenjob_unserialize()
*/
tmp = distrenjob_unserialize(&distrenjob, "distrenjob.xml.example");
if(tmp)
fprintf(stderr, "distrenjob_unserialize() returned %d. Try to cd to distren/doc if you want to test out the distrenjob_unserialize() function. (This will only fix this error if the error is due to an inability of the xml library to access distrenjob.xml.example)\n\n", tmp);
else
fprintf(stderr, "using email ``%s'' for user ``%s'' -- reading in XML files and pulling data from them using libxml2+XPath works!!!\n", distrenjob->email, distrenjob->submitter);
fprintf(stderr, "%s:%d running config\n", __FILE__, __LINE__);
*config = malloc(sizeof(struct distrend_config));
myopts[1].simple_value = &(*config)->datadir;
if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
return 1;
/**
grab listen blocks:
*/
(*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
{
cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
(*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
(*config)->listens[counter].sock = -1;
}
memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
return 0;
}
int distrend_config_free(struct distrend_config *config)
{
options_free(config->options);
free(config);
return 0;
}
/* ************************** XML Functions ************************* */
// writes the general_info.xml file which is a copy of the general_info structure
// except that it doesn't hold free_clients and rendering_clients
void update_general_info(struct general_info *geninfo)
{
xmlTextWriterPtr writer;
char *tmp;
writer = xmlNewTextWriterFilename("general_info.xml", 0);
xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
xmlTextWriterStartElement(writer, (xmlChar*)"general_info");
_distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue);
xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs);
xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered);
xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", geninfo->highest_jobnum);
xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp);
free(tmp);
xmlTextWriterEndDocument(writer);
xmlFreeTextWriter(writer);
}
// this reads the information from general_info.xml to the general_info structure
int import_general_info(struct general_info *general_info)
{
xmlDocPtr doc;
xmlNodePtr cur;
doc = xmlParseFile("general_info.xml");
cur = xmlDocGetRootElement(doc);
if (xmlStrcmp(cur->name, (xmlChar*)"general_info"))
{
fprintf(stderr, "xml document is wrong type");
xmlFreeDoc(doc);
return 1;
}
cur = cur->xmlChildrenNode;
general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
cur = cur->next;
general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
cur = cur->next;
general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
cur = cur->next;
general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
general_info->hibernate = 0;
general_info->free_clients = 0;
general_info->rendering_clients = 0;
general_info->timestamp = 0;
general_info->total_render_power = 0;
general_info->total_priority_pieces = 0;
xmlFreeDoc(doc);
return 0;
}
// extracts data from the xml created by above function and creates a job from it
// it returns a pointer to the created job
struct distrenjob *createJobFromXML(struct general_info *geninfo, unsigned int jobnum)
{
xmlDocPtr doc;
xmlNodePtr cur;
char *file_name;
struct distrenjob *distrenjob;
int start_frame;
int counter;
int counter2;
file_name = job_getserialfilename(geninfo, jobnum);
doc = xmlParseFile(file_name);
if(!doc)
return NULL;
distrenjob_new(&distrenjob);
cur = xmlDocGetRootElement(doc);
distrenjob->name = (char*)xmlGetProp(cur, (xmlChar*)"name");
distrenjob->submitter = (char*)xmlGetProp(cur, (xmlChar*)"submitter");
distrenjob->priority = atoi((char*)xmlGetProp(cur, (xmlChar*)"priority"));
cur = cur->xmlChildrenNode;
distrenjob->width = atoi((char*)xmlGetProp(cur, (xmlChar*)"width"));
distrenjob->height = atoi((char*)xmlGetProp(cur, (xmlChar*)"number"));
cur = cur->next;
start_frame = atoi((char*)xmlGetProp(cur, (xmlChar*)"start_frame"));
distrenjob->total_frames = atoi((char*)xmlGetProp(cur, (xmlChar*)"end_frame")) - start_frame + 1;
distrenjob->output_format = (char*)xmlGetProp(cur, (xmlChar*)"output_format");
cur = cur->next;
distrenjob->watchdog_forgiveness = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
xmlFreeDoc(doc);
/** load up stats.xml file to retrieve data */
_distren_asprintf(&file_name, "stor/job%d/stats.xml", jobnum);
doc = xmlParseFile(file_name);
free(file_name);
cur = xmlDocGetRootElement(doc);
distrenjob->total_render_time = (time_t)atol((char*)xmlGetProp(cur, (xmlChar*)"total_render_time"));
xmlFreeDoc(doc);
distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames);
// change the status of all frames to "unassigned"
counter2 = start_frame;
for(counter = 0; counter <= distrenjob->total_frames; counter++){
distrenjob->frameset[counter].num = counter2;
distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED;
counter2++;
}
restoreJobState(distrenjob);
return distrenjob;
}
int updateJobStatsXML(struct distrenjob *job)
{
xmlTextWriterPtr writer;
char *tmp;
_distren_asprintf(&tmp, "stor/job%d/stats.xml", job->jobnum);
/**
create xml document at the location tmp with no compression
*/
writer = xmlNewTextWriterFilename(tmp, 0);
free(tmp);
xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
xmlTextWriterStartElement(writer, (xmlChar*)"stats");
_distren_asprintf(&tmp, "%d", job->assigned_frames);
xmlTextWriterWriteAttribute(writer, (xmlChar*)"assigned_frames", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", job->completed_frames);
xmlTextWriterWriteAttribute(writer, (xmlChar*)"completed_frames", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", job->total_render_time);
xmlTextWriterWriteAttribute(writer, (xmlChar*)"total_render_time", (xmlChar*)tmp);
free(tmp);
/**
end document
*/
xmlTextWriterEndDocument(writer);
/**
free writer and save xml file to disk
@TODO confirma that calling xmlFreeTextWriter calls
write() and close(), respectively.
(close() causes asynchronous writes to be
completed).
*/
xmlFreeTextWriter(writer);
return 1;
}
/**
updates job_list.xml which lists all the jobs in the queue
@return 0 means success
*/
int update_xml_joblist(struct general_info *geninfo)
{
struct distrenjob *job;
xmlTextWriterPtr writer;
char *tmp;
char *tmp2;
int counter;
/**
update timestamp
*/
geninfo->timestamp ++;
if(geninfo->timestamp > 65530)
geninfo->timestamp = 0;
writer = xmlNewTextWriterFilename("job_list.xml", 0);
xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
/**
create root element job_list
*/
xmlTextWriterStartElement(writer, (xmlChar*)"job_list");
_distren_asprintf(&tmp, "%d", geninfo->timestamp);
xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
free(tmp);
geninfo->total_priority_pieces = 0;
counter = 0;
for(job = geninfo->head.next; job; job = job->next)
{
_distren_asprintf(&tmp, "jobnum%d", counter);
_distren_asprintf(&tmp2, "%d", job->jobnum);
xmlTextWriterWriteElement(writer, (xmlChar*)tmp, (xmlChar*)tmp2);
free(tmp2);
free(tmp);
/**
this is needed for the new frame finder to work
Why the random constant numeral 11? --ohnobinki
*/
geninfo->total_priority_pieces = geninfo->total_priority_pieces + (NUMBER_ELEVEN - job->priority);
counter++;
}
/**
close elements and end document
*/
xmlTextWriterEndDocument(writer);
/**
free writer and save xml file to disk
*/
xmlFreeTextWriter(writer);
return 0;
}
/**
returns 0 if completed successfully
this reads a list of jobs in the queue before DistRen was shut down
and then adds the jobs to the queue, as if it were never shut down
*/
int createQueueFromXML(struct general_info *geninfo)
{
int tmp;
xmlDocPtr doc;
xmlNodePtr cur;
/**
load xml document
*/
doc = xmlParseFile("job_list.xml");
if(doc == NULL)
{
fprintf(stderr, "createQueueFromXML: document not found\n");
return 1;
}
/**
have cur point to the root element of the xml document pointed to by doc
*/
cur = xmlDocGetRootElement(doc);
if(cur == NULL)
{
fprintf(stderr, "createQueueFromXML: document empty\n");
return 1;
}
if(xmlStrcmp(cur->name, (const xmlChar*)"job_list"))
{
fprintf(stderr, "createQueueFromXML: incorrect root element (%s)", (char*)cur->name);
}
/**
moves into the children elements of job_list
*/
cur = cur->xmlChildrenNode;
tmp = reCreateQueueFromXML(geninfo, doc, cur);
xmlFreeDoc(doc);
return tmp;
}
/**
inserts jobs at front of queue, starting
with the last job in the job_list xml file
to preserve the order of the queue
*/
int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current)
{
struct distrenjob *holder; // holds the job that "was" after head, so that the new struct can be inserted after head
struct distrenjob *job; // job to be added
xmlChar *tmp;
int job_num;
if(current == NULL) // base case, if element doesn't exist then don't do anything
return 0;
/**
recursively call itself so that the next job in
the queue is added to the front before the current one
*/
if(reCreateQueueFromXML(geninfo, doc, current->next))
/**
bail if failure recursevely
^-- is this the best thing to do in this case?
@TODO bail-out cleanup
*/
return 1;
/** actual work is done now: add the job to the front of the queue */
holder = geninfo->head.next;
tmp = xmlNodeListGetString(doc, current->xmlChildrenNode, 1); // get job number
job_num = atoi((char*)tmp);
xmlFree(tmp);
job = createJobFromXML(geninfo, job_num);
if(job == NULL)
return 1;
/**
insert job at front of the queue
*/
geninfo->head.next = job;
job->next = holder;
return 0;
}
/* ************************** Test Functions ************************* */
int printFrameInfo(struct frameset *frame)
{
char *status;
status = NULL;
switch(frame->status)
{
case FRAMESETSTATUS_UNASSIGNED:
_distren_asprintf(&status, "%s", "unassigned");
break;
case FRAMESETSTATUS_ASSIGNED:
_distren_asprintf(&status, "%s", "assigned");
break;
case FRAMESETSTATUS_DONE:
_distren_asprintf(&status, "%s", "completed");
break;
case FRAMESETSTATUS_CANCELED:
_distren_asprintf(&status, "%s", "canceled");
}
printf("frame #: %d --> %s\n", frame->num, status);
free(status);
return 0;
}
int printJob(struct distrenjob *job)
{
int counter;
fprintf(stderr, "frame_num: status\n");
for(counter = 0; counter < job->total_frames; counter++)
{
printFrameInfo(&job->frameset[counter]);
}
return 1;
}
int printJobInfo(struct distrenjob *job)
{
fprintf(stderr, "type: %d\n", job->type);
fprintf(stderr, "name: %s\n", job->email);
fprintf(stderr, "submitter: %s\n", job->submitter);
fprintf(stderr, "e-mail: %s\n", job->email);
fprintf(stderr, "priority: %d\n", job->priority);
fprintf(stderr, "completed: %d\n", job->completed_frames);
fprintf(stderr, "assigned: %d\n", job->assigned_frames);
fprintf(stderr, "total: %d\n", job->total_frames);
fprintf(stderr, "watchdog: %d\n", job->watchdog_forgiveness);
fprintf(stderr, "hibernate: %d\n", job->hibernate);
fprintf(stderr, "prev_frame: %d\n", job->prev_frame_index);
fprintf(stderr, "render power: %ld\n", job->assigned_render_power);
return 1;
}
int printAllJobnums(struct distrenjob *head)
{
struct distrenjob *current_job;
int total_jobs;
fprintf(stderr, "job numbers in the order they will be processed:\n");
total_jobs = 0;
for(current_job = head->next; current_job; current_job = current_job->next)
{
fprintf(stderr, "%d: %s\n", current_job->jobnum, current_job->name);
total_jobs++;
}
fprintf(stderr, "\n%d jobs in queue\n\n", total_jobs);
return 1;
}
/* ************************** Main ************************* */
int main(int argc, char *argv[])
{
/* Argument-parser */
int counter;
int test; // Interactive test mode if 1
struct general_info general_info;
int cont;
struct distrend_clientset *clients;
enum clientstatus
{
CLIENTSTATUS_UNINITIALIZED = 0,
CLIENTSTATUS_BUSY = 1,
CLIENTSTATUS_IDLE = 2
} clientstatus;
int command;
jobnum_t jobnum;
struct distrenjob *tmp_job;
struct frameset *tmp_frame;
int type;
char *name;
char *submitter;
char *email;
int priority;
int width;
int height;
int start_frame;
int end_frame;
size_t read_buf;
xmlinit();
test = 0;
for(counter = 0; counter < argc; counter ++)
{
if(strcmp(argv[counter], "-h") == 0)
{
fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
return 2;
}
else if(strcmp(argv[counter], "-t") == 0)
{
fprintf(stderr, "Entering into test mode...\n\n");
test=1;
}
}
cont = 1;
if(distrend_do_config(argc, argv, &general_info.config))
return 1;
if(start_data(&general_info, general_info.config->datadir))
{
fprintf(stderr, "%s:%d: start_data() failed\n", __FILE__, __LINE__);
return 1;
}
// pre-loaded jobs for testing
prepare_distrenjob(&general_info, 1, "awesome", "LordOfWar", "onlylordofwar@gmail.com", 8, 1, 100, 640, 480);
prepare_distrenjob(&general_info, 1, "hamburger", "Ohnobinki", "ohnobinki@ohnopublishing.net", 3, 1, 50, 1280, 720);
while(test == 1)
{
fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
fprintf(stderr, "\t1 \tPrint all frames in a job\n");
fprintf(stderr, "\t2 \tExamine certain job\n");
fprintf(stderr, "\t3 \tGet a frame to render\n");
fprintf(stderr, "\t4 \tAdd a job\n");
fprintf(stderr, "\t5 \tDelete a job\n");
fprintf(stderr, "\t6 \tPrint jobnums in queue\n");
fprintf(stderr, "\t7 \tPrint general info\n");
fprintf(stderr, "\t8 \tQuit\n");
scanf("%d", &command);
switch(command)
{
case 1:
fprintf(stderr, "Job number: ");
scanf("%d", &jobnum);
printJob(distrenjob_get(&general_info.head, jobnum));
break;
case 2:
fprintf(stderr, "Job number: ");
scanf("%d", &jobnum);
printJobInfo(distrenjob_get(&general_info.head, jobnum));
break;
case 3:
general_info.total_render_power ++;
if(!find_jobframe_again(&general_info, -1, 1, &tmp_job, &tmp_frame))
{
fprintf(stderr, "frame was found, details below\n");
fprintf(stderr, "Job#:%d\n", tmp_job->jobnum);
fprintf(stderr, "Frame#:%d\n", tmp_frame->num);
}
break;
case 4:
name = NULL;
submitter = NULL;
email = NULL;
fprintf(stderr, "\nType: \n\t 1 \t blender\n\t 2 \t povray\n"); scanf("%d", &type);
fprintf(stderr, "\nName: "); scanf("\n"); getline(&name, &read_buf, stdin);
fprintf(stderr, "\nSubmitter: "); getline(&submitter, &read_buf, stdin);
fprintf(stderr, "\nEmail: "); getline(&email, &read_buf, stdin);
fprintf(stderr, "\nPriority: "); scanf("%d", &priority);
fprintf(stderr, "\nStart frame: "); scanf("%d", &start_frame);
fprintf(stderr, "\nEnd frame: "); scanf("%d", &end_frame);
fprintf(stderr, "\nWidth: "); scanf("%d", &width);
fprintf(stderr, "\nHeight: "); scanf("%d", &height);
prepare_distrenjob(&general_info, type, name, submitter, email, priority, start_frame, end_frame, width, height);
break;
case 5:
fprintf(stderr, "\nJob number: ");
scanf("%d", &jobnum);
distrenjob_remove(&general_info, distrenjob_get(&general_info.head, jobnum));
break;
case 6:
printAllJobnums(&general_info.head);
break;
case 7:
fprintf(stderr, "\nHighest job number: %d", general_info.highest_jobnum);
fprintf(stderr, "\nJobs in queue: %d", general_info.jobs_in_queue);
fprintf(stderr, "\nTotal frames rendered: %d", general_info.total_frames_rendered);
fprintf(stderr, "\nTimestamp: %lu", (long)general_info.timestamp);
fprintf(stderr, "\nTotal priority pieces: %ld", general_info.total_priority_pieces);
fprintf(stderr, "\nTotal render power: %ld\n", general_info.total_render_power);
break;
case 8:
fprintf(stderr,"Goodbye.\n");
return 0;
default:
fprintf(stderr, "Invalid input, please try again.\n");
}
}
distrend_listen(general_info.config, &clients);
/* This is called the "main loop" */
while(cont)
{
struct distrend_action *action;
int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */
distrend_accept(general_info.config, clients, &action);
cont = distrend_do(action);
/* Make the following code more event-driven */
frame_watchdog(&general_info.head);
struct frameset *frame;
struct distrenjob *job;
/* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */
if(clientstatus == CLIENTSTATUS_IDLE)
{
int returnnum = find_jobframe(&general_info, &job, &frame); // Finds a frame to render
if(returnnum)
{
fprintf(stderr,"No frames are available to render at this time. Idling...\n");
sleep(10);
}
else
remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client
}
/* If the client states that they finished the frame */
if(clientsays == DISTREN_REQUEST_DONEFRAME){
clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
finish_frame(&general_info, job, frame->num); // @TODO: Make sure this actually works.
}
distrend_action_free(action);
}
distrend_unlisten(general_info.config->listens, clients);
distrend_config_free(general_info.config);
xmlcleanup();
return 0;
}
/**
constructs the filename for a distrenjob's serialized XML
to be stored/retrieved from/in.
@return pointer to filename allocated using something
malloc() and free() compatible. Must be free()ed by the
caller
*/
char *job_getserialfilename(struct general_info *geninfo, unsigned int jobnum)
{
char *filename;
_distren_asprintf(&filename, "%s/stor/job/%d/distrenjob.xml",
geninfo->config->datadir,
jobnum);
return filename;
}