/*
Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
This file is a part of DistRen.
DistRen is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DistRen is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with DistRen. If not, see .
*/
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
#include "distrenjob.h"
#include "listen.h"
#include "slavefuncs.h"
#include "mysql.h"
#include "common/asprintf.h"
#include "common/execio.h"
#include "common/options.h"
#include "common/protocol.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* ******************* Structs ************************ */
struct general_info
{
struct distrenjob head;
distrend_mysql_conn_t conn;
struct distrend_config *config;
struct
{
/** general_info.xml */
char *geninfo;
} files;
int jobs_in_queue;
unsigned int free_clients;
unsigned int rendering_clients;
unsigned int total_finished_jobs;
unsigned int total_frames_rendered;
unsigned int highest_jobnum;
int hibernate;
time_t timestamp;
unsigned long total_render_power;
unsigned long total_priority_pieces;
};
/* *********************************************
Function Prototypes
********************************************* */
/* ************General Functions************* */
// int xml_dump();
int distrend_do();
int start_data(struct general_info *general_info);
int mortition(struct general_info *geninfo, struct distrenjob *job);
void frame_watchdog(struct distrenjob *distrenjob_head);
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
int distrend_config_free(struct distrend_config *config);
/* **************XML Functions**************** */
void update_general_info(struct general_info *geninfo);
int import_general_info(struct general_info *general_info);
int restore_distrenjob(struct general_info *geninfo, struct distrenjob **distrenjob, jobnum_t jobnum);
int updateJobStatsXML(struct distrenjob *job);
int update_xml_joblist(struct general_info *geninfo);
int createQueueFromXML(struct general_info *geninfo);
int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current);
/* **************Test Functions**************** */
int printFrameInfo(struct frameset *frame);
int printJob(struct distrenjob *job);
int printJobInfo(struct distrenjob *job);
int printAllJobnums(struct distrenjob *head);
int interactiveTest(int test, struct general_info general_info);
int job_getserialfilename(char **filename, struct general_info *geninfo, unsigned int jobnum, int createdir);
int distren_mkdir_recurse(char *dirname);
/* **************** Main ********************* */
int main(int argc, char *argv[])
{
/* Parse arguments */
int counter;
int test = 0; /*< Interactive mode if 1 */
struct general_info general_info;
struct distrend_clientset *clients;
enum clientstatus
{
CLIENTSTATUS_UNINITIALIZED = 0,
CLIENTSTATUS_BUSY = 1,
CLIENTSTATUS_IDLE = 2
} clientstatus;
xmlinit();
for(counter = 0; counter < argc; counter ++)
{
if(strcmp(argv[counter], "-h") == 0)
{
fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
return 2;
}
else if(strcmp(argv[counter], "-t") == 0)
{
fprintf(stderr, "Entering into test mode...\n\n");
test = 1;
}
}
if(distrend_do_config(argc, argv, &general_info.config))
return 1;
/** preset paths */
_distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml",
general_info.config->datadir);
/** MySQL Connection */
if(mysqlConnect(&general_info.conn))
{
fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__);
return 1;
}
/** Execute test function */
// interactiveTest(test, general_info);
distrend_listen(general_info.config, &clients);
int slaveKey = 0; // Remotio should set me on a per-slave basis
/* Main Loop */
while(!general_info.config->die)
{
int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */
distrend_accept(general_info.config, clients);
/* Make the following code more event-driven */
frame_watchdog(&general_info.head);
struct frameset *frame;
struct distrenjob *job;
/* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */
if(clientstatus == CLIENTSTATUS_IDLE)
{
int returnnum = find_jobframe(general_info.conn, slaveKey, job->jobnum, frame->num); // Finds a frame to render @FIXME: Slavenum :D
if(returnnum)
{
fprintf(stderr,"No frames are available to render at this time. Idling...\n");
sleep(10);
}
else
remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client
}
/* If the client states that they finished the frame */
if(clientsays == DISTREN_REQUEST_DONEFRAME){
clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
finish_frame(general_info.conn, 0, job->jobnum, frame->num); // @TODO: Make sure this actually works.
}
}
distrend_unlisten(general_info.config->listens, clients);
distrend_config_free(general_info.config);
xmlcleanup();
/** free() paths */
free(general_info.files.geninfo);
mysqlDisconnect(general_info.conn);
return 0;
}
/* ********************** Functions ************************* */
/**
Performs command stored in a client's request. @TODO: Fill stub
*/
int distrend_do()
{
return 0;
}
/** Checks for dead, latent, or stale slaves */
void frame_watchdog(struct distrenjob *distrenjob_head)
{
// Replace with mysqlness, maybe
// iterate through jobs
// if the job has been started, checks by seeing if either to first or second frame has been started
// FRAMESETSTATUS_UNASSIGNED
// iterate through all frames for this job:
//watchdog_forgiveness = seconds of forgiveness before frame is re-assigned:
// If frame is not completed within the number of seconds specified by watchdog_forgiveness
// Then change the frame status to unassigned
}
/* Grabs config info from confs */
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
{
unsigned int counter;
cfg_opt_t myopts_listen[] =
{
CFG_SIMPLE_STR("type", NULL),
CFG_SIMPLE_STR("path", NULL),
CFG_SIMPLE_INT("port", NULL),
CFG_END()
};
cfg_opt_t myopts[] =
{
CFG_SEC("listen", /* this must be imported into struct listens (which must still be declared) */
myopts_listen,
CFGF_MULTI),
CFG_SIMPLE_STR("datadir", NULL),
CFG_STR_LIST("render_types", NULL, CFGF_NONE),
CFG_END()
};
cfg_t *cfg_listen;
fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__);
*config = malloc(sizeof(struct distrend_config));
myopts[1].simple_value = &(*config)->datadir;
if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
return 1;
/**
grab listen blocks:
*/
(*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
{
cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
(*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
(*config)->listens[counter].sock = -1;
}
memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
return 0;
}
int distrend_config_free(struct distrend_config *config)
{
options_free(config->options);
free(config);
return 0;
}
/* ************************** XML Functions ************************* */
// writes the general_info.xml file which is a copy of the general_info structure
// except that it doesn't hold free_clients and rendering_clients
void update_general_info(struct general_info *geninfo)
{
xmlTextWriterPtr writer;
char *tmp;
writer = xmlNewTextWriterFilename(geninfo->files.geninfo, 0);
xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
xmlTextWriterStartElement(writer, (xmlChar*)"general_info");
_distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue);
xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs);
xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered);
xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp);
free(tmp);
_distren_asprintf(&tmp, "%d", geninfo->highest_jobnum);
xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp);
free(tmp);
xmlTextWriterEndDocument(writer);
xmlFreeTextWriter(writer);
}
/**
Reads general state information from general_info.xml
into the general_info structure.
*/
int import_general_info(struct general_info *general_info)
{
xmlDocPtr doc;
xmlNodePtr cur;
doc = xmlParseFile(general_info->files.geninfo);
cur = xmlDocGetRootElement(doc);
if (xmlStrcmp(cur->name, (xmlChar*)"general_info"))
{
fprintf(stderr, "xml document is wrong type");
xmlFreeDoc(doc);
return 1;
}
cur = cur->xmlChildrenNode;
general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
cur = cur->next;
general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
cur = cur->next;
general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
cur = cur->next;
general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
general_info->hibernate = 0;
general_info->free_clients = 0;
general_info->rendering_clients = 0;
general_info->timestamp = 0;
general_info->total_render_power = 0;
general_info->total_priority_pieces = 0;
xmlFreeDoc(doc);
return 0;
}
/**
updates job_list.xml which lists all the jobs in the queue
@return 0 means success
*/
// @QUERY: Likely obselete (don't remove at request of ohnobinki)
int update_xml_joblist(struct general_info *geninfo)
{
struct distrenjob *job;
xmlTextWriterPtr writer;
char *tmp;
int counter;
/**
update timestamp
*/
geninfo->timestamp ++;
if(geninfo->timestamp > 65530)
geninfo->timestamp = 0;
_distren_asprintf(&tmp, "%s/job_list.xml",
geninfo->config->datadir);
writer = xmlNewTextWriterFilename(tmp, 0);
free(tmp);
xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
/**
create root element job_list
*/
xmlTextWriterStartElement(writer, (xmlChar*)"job_list");
_distren_asprintf(&tmp, "%d", geninfo->timestamp);
xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
free(tmp);
geninfo->total_priority_pieces = 0;
counter = 0;
for(job = geninfo->head.next; job; job = job->next)
{
_distren_asprintf(&tmp, "%d", job->jobnum);
xmlTextWriterWriteElement(writer, (xmlChar*)"jobnum", (xmlChar*)tmp);
free(tmp);
/**
this is needed for the new frame finder to work
Why the random constant numeral 11? --ohnobinki
*/
geninfo->total_priority_pieces = geninfo->total_priority_pieces + job->priority;
counter++;
}
xmlTextWriterEndElement(writer);
/**
close elements and end document
*/
xmlTextWriterEndDocument(writer);
/**
free writer and save xml file to disk
*/
xmlFreeTextWriter(writer);
return 0;
}
/* ************************** Test Functions ************************* */
/** Prints info about frames in a frameset */
int printFrameInfo(struct frameset *frame)
{
char *status;
status = NULL;
switch(frame->status)
{
case FRAMESETSTATUS_UNASSIGNED:
_distren_asprintf(&status, "%s", "unassigned");
break;
case FRAMESETSTATUS_ASSIGNED:
_distren_asprintf(&status, "%s", "assigned");
break;
case FRAMESETSTATUS_DONE:
_distren_asprintf(&status, "%s", "completed");
break;
case FRAMESETSTATUS_CANCELED:
_distren_asprintf(&status, "%s", "canceled");
}
printf("frame #: %d --> %s\n", frame->num, status);
free(status);
return 0;
}
/** Prints information about all frames in a job */
int printJob(struct distrenjob *job)
{
int counter;
fprintf(stderr, "frame_num: status\n");
for(counter = 0; counter < job->total_frames; counter++)
{
printFrameInfo(&job->frameset[counter]);
}
return 1;
}
/** Prints information about a job */
int printJobInfo(struct distrenjob *job)
{
fprintf(stderr, "type: %d\n", job->type);
fprintf(stderr, "name: %s\n", job->name);
fprintf(stderr, "submitter: %s\n", job->submitter);
fprintf(stderr, "priority: %d\n", job->priority);
fprintf(stderr, "completed: %d\n", job->completed_frames);
fprintf(stderr, "assigned: %d\n", job->assigned_frames);
fprintf(stderr, "total: %d\n", job->total_frames);
fprintf(stderr, "watchdog: %d\n", job->watchdog_forgiveness);
fprintf(stderr, "hibernate: %d\n", job->hibernate);
fprintf(stderr, "prev_frame: %d\n", job->prev_frame_index);
fprintf(stderr, "render power: %ld\n", job->assigned_render_power);
return 1;
}
/** Print all jobnums in the queue */
int printAllJobnums(struct distrenjob *head)
{
struct distrenjob *current_job;
int total_jobs;
fprintf(stderr, "job numbers in the order they will be processed:\n");
total_jobs = 0;
for(current_job = head->next; current_job; current_job = current_job->next)
{
fprintf(stderr, "%d: %s\n", current_job->jobnum, current_job->name);
total_jobs++;
}
fprintf(stderr, "\n%d jobs in queue\n\n", total_jobs);
return 1;
}
/** Interactive test for the queuing system */
/* @QUEUE: Test uses methods not present in C code using mysql web-based system
int interactiveTest(int test, struct general_info general_info){
int command;
jobnum_t jobnum;
struct distrenjob *tmp_job;
struct frameset *tmp_frame;
int type;
char *name;
char *submitter;
char *email;
int priority;
int width;
int height;
int start_frame;
int end_frame;
size_t read_buf;
while(test == 1)
{
fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
fprintf(stderr, "\t1 \tPrint all frames in a job\n");
fprintf(stderr, "\t2 \tExamine certain job\n");
fprintf(stderr, "\t3 \tGet a frame to render\n");
fprintf(stderr, "\t4 \tAdd a job\n");
fprintf(stderr, "\t5 \tDelete a job\n");
fprintf(stderr, "\t6 \tPrint jobnums in queue\n");
fprintf(stderr, "\t7 \tPrint general info\n");
fprintf(stderr, "\t8 \tQuit\n");
scanf("%d", &command);
switch(command)
{
case 1:
fprintf(stderr, "Job number: ");
scanf("%d", &jobnum);
printJob(distrenjob_get(&general_info.head, jobnum));
break;
case 2:
fprintf(stderr, "Job number: ");
scanf("%d", &jobnum);
printJobInfo(distrenjob_get(&general_info.head, jobnum));
break;
case 3:
general_info.total_render_power ++;
if(!find_jobframe_again(&general_info, -1, 1, &tmp_job, &tmp_frame))
{
fprintf(stderr, "frame was found, details below\n");
fprintf(stderr, "Job#:%d\n", tmp_job->jobnum);
fprintf(stderr, "Frame#:%d\n", tmp_frame->num);
}
break;
case 4:
name = NULL;
submitter = NULL;
email = NULL;
fprintf(stderr, "\nType: \n\t 1 \t blender\n\t 2 \t povray\n"); scanf("%d", &type);
fprintf(stderr, "\nName: "); scanf("\n"); getline(&name, &read_buf, stdin);
fprintf(stderr, "\nSubmitter: "); getline(&submitter, &read_buf, stdin);
fprintf(stderr, "\nEmail: "); getline(&email, &read_buf, stdin);
fprintf(stderr, "\nPriority: "); scanf("%d", &priority);
fprintf(stderr, "\nStart frame: "); scanf("%d", &start_frame);
fprintf(stderr, "\nEnd frame: "); scanf("%d", &end_frame);
fprintf(stderr, "\nWidth: "); scanf("%d", &width);
fprintf(stderr, "\nHeight: "); scanf("%d", &height);
prepare_distrenjob(&general_info, type, name, submitter, priority, start_frame, end_frame, width, height);
break;
case 5:
fprintf(stderr, "\nJob number: ");
scanf("%d", &jobnum);
distrenjob_remove(&general_info, distrenjob_get(&general_info.head, jobnum));
break;
case 6:
printAllJobnums(&general_info.head);
break;
case 7:
fprintf(stderr, "\nHighest job number: %d", general_info.highest_jobnum);
fprintf(stderr, "\nJobs in queue: %d", general_info.jobs_in_queue);
fprintf(stderr, "\nTotal frames rendered: %d", general_info.total_frames_rendered);
fprintf(stderr, "\nTimestamp: %lu", (long)general_info.timestamp);
fprintf(stderr, "\nTotal priority pieces: %ld", general_info.total_priority_pieces);
fprintf(stderr, "\nTotal render power: %ld\n", general_info.total_render_power);
break;
case 8:
fprintf(stderr,"Goodbye.\n");
test = 0;
default:
fprintf(stderr, "Invalid input, please try again.\n");
}
}
return 0;
}
*/
/**
@TODO: reuse for constructing data paths?
constructs the filename for a distrenjob's serialized XML
to be stored/retrieved from/in.
@arg filename pointer to filename allocated using something
malloc() and free() compatible. Must be free()ed by the
caller
@return 0 on success
*/
int job_getserialfilename(char **filename, struct general_info *geninfo, unsigned int jobnum, int createdir)
{
_distren_asprintf(filename, "%s/stor/job/%d/distrenjob.xml",
geninfo->config->datadir,
jobnum);
if(!*filename)
return 1;
if(createdir)
return distren_mkdir_recurse(*filename);
return 0;
}