/* Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando This file is a part of DistRen. DistRen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DistRen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with DistRen. If not, see . */ /* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */ #include "asprintf.h" #include "distrenjob.h" #include "execio.h" #include "listen.h" #include "options.h" #include "protocol.h" #include "slavefuncs.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* local defs */ #define NUMBER_ELEVEN 11 /* ******************* Structs ************************ */ struct general_info { struct distrenjob head; struct distrend_config *config; int jobs_in_queue; unsigned int free_clients; unsigned int rendering_clients; unsigned int total_finished_jobs; unsigned int total_frames_rendered; unsigned int highest_jobnum; int hibernate; time_t timestamp; unsigned long total_render_power; unsigned long total_priority_pieces; }; /* internally defined funcs's prototypes @TODO: Make all functions nice and proper */ void distrenjob_remove(struct general_info *, struct distrenjob *bj); struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum); int distrenjob_enqueue(struct general_info *, struct distrenjob *job); int mortition(struct general_info *, struct distrenjob *job); int update_xml_joblist(struct general_info *); int createQueueFromXML(struct general_info*); int reCreateQueueFromXML(struct general_info*, xmlDocPtr doc, xmlNodePtr current); void update_general_info(struct general_info*); int import_general_info(struct general_info*); int updateJobStatsXML(struct distrenjob *job); char *job_getserialfilename(struct general_info *, unsigned int jobnum); /* ********************** Functions ************************* */ /** Dumps all data in RAM to an xml file (such as current jobs, etc) which is parsed by start_data. Remember to invoke this before shutting down! */ int xml_dump() { return 0; } /** Performs command stored in a client's request. */ int distrend_do() { return 0; } /** Frees the action */ void distrend_action_free() { } /** Fill variables at startup from XML dumps or defaults */ int start_data(struct general_info *general_info, char *datadir) { struct stat buffer; /** defaults */ memset(&general_info->head, '\0', sizeof(struct distrenjob)); general_info->head.priority = 0; general_info->jobs_in_queue = 0; general_info->free_clients = 0; general_info->rendering_clients = 0; general_info->total_finished_jobs = 0; general_info->total_frames_rendered = 0; general_info->highest_jobnum = 0; general_info->hibernate = 0; general_info->timestamp = 0; general_info->total_render_power = 0; general_info->total_priority_pieces = 0; fprintf(stderr, "Initialized default global and queue states\n"); if(stat("general_info.xml", &buffer) == 0) { fprintf(stderr, "previous state file found, loading:\n"); fprintf(stderr, "Parsing XML files and restoring previous state...\n"); if(import_general_info(general_info)) fprintf(stderr, "FAILURE\n"); fprintf(stderr, "Restoring queue...\n"); if(createQueueFromXML(general_info)) fprintf(stderr, "FAILURE\n"); fprintf(stderr, "done\n"); } return 0; } /** Finish-Setter: Sets a frame to the "completed" status.*/ void finish_frame(struct general_info *geninfo, struct distrenjob *distrenjob, int frame) { distrenjob->frameset[frame].status = FRAMESETSTATUS_DONE; distrenjob->total_render_time = distrenjob->total_render_time + (clock() - distrenjob->frameset[frame].start_time); distrenjob->completed_frames ++; distrenjob->assigned_frames --; geninfo->total_frames_rendered ++; /*< Increase total frames var for stats */ update_general_info(geninfo); updateJobStatsXML(distrenjob); } /** checks to see if a job is actually done. - scans the folder of the job to make sure all output files are present */ int mortition(struct general_info *geninfo, struct distrenjob *job) { short int isJobDone; int counter; char *path_and_number; struct stat buffer; isJobDone = 1; for(counter = 0; counter < job->total_frames; counter++) { _distren_asprintf(&path_and_number, "%s/stor/job%d/out/%d.%s", geninfo->config->datadir, job->jobnum, job->frameset[counter].num, job->output_format); if(stat(path_and_number, &buffer) == -1) { /** missing frame found */ job->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED; job->completed_frames--; geninfo->total_frames_rendered--; isJobDone = 0; } free(path_and_number); } if(isJobDone) { /** all frames were accounted for */ distrenjob_remove(geninfo, job); distrenjob_free(&job); geninfo->jobs_in_queue --; update_xml_joblist(geninfo); } else /** if the job isn't done, have frame_finder() start from the first frame, allowing it to see the frames that are now unassigned */ job->prev_frame_index = -1; update_general_info(geninfo); return 0; } /** scans the frames of a job to initialize a job after server @return 0 if the job is completely done and there are no missing frames, 1 if missing frames are found. */ int restoreJobState(struct distrenjob *job) { short int isJobDone; int counter; char *path_and_number; struct stat buffer; isJobDone = 1; for(counter = 0; counter < job->total_frames; counter++) { _distren_asprintf(&path_and_number, "stor/job%d/out/%d.%s", job->jobnum, job->frameset[counter].num, job->output_format); /*< @TODO this path is used in multiple places, construct/build/determine it in a central function */ if(stat(path_and_number, &buffer) == 0) { job->frameset[counter].status = FRAMESETSTATUS_ASSIGNED; job->completed_frames++; } else isJobDone = 0; free(path_and_number); } return !isJobDone; } /** creates a structure from starting data, then calls another function to actually add that struct to the queue. */ int prepare_distrenjob(struct general_info *geninfo, int type, char *name, char *submitter, char *email, int priority, int start_frame, int end_frame, int width, int height) { int counter2; int counter; int tmp; char *path_with_num; char *serialfile; struct distrenjob *distrenjob; tmp = distrenjob_new(&distrenjob); if(tmp) return 1; geninfo->highest_jobnum ++; distrenjob->jobnum = geninfo->highest_jobnum; distrenjob->type = 1; distrenjob->name = name; distrenjob->submitter = submitter; distrenjob->email = email; distrenjob->priority = priority; distrenjob->width = width; distrenjob->height = height; /** sets the total number of frames in animation for status purposes */ distrenjob->total_frames = (end_frame - start_frame + 1); distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames); if(!distrenjob->frameset) { distrenjob_free(&distrenjob); return 1; } /** prepares all the frames by setting that status to "unassigned" */ counter2 = start_frame; for(counter = 0; counter < distrenjob->total_frames; counter++){ distrenjob->frameset[counter].num = counter2; distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED; counter2++; } _distren_asprintf(&path_with_num, "stor/job%d/out/", distrenjob->jobnum); fprintf(stderr, ">>NOT<< creating dir ``%s''\n", path_with_num); /*< @TODO recursively create job directory */ serialfile = job_getserialfilename(geninfo, distrenjob->jobnum); free(path_with_num); /** add job to queue */ fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_serialize()\n"); distrenjob_serialize(distrenjob, serialfile); free(serialfile); fprintf(stderr, "\nprepare_distrenjob: attempting distrenjob_enqueue()\n"); distrenjob_enqueue(geninfo, distrenjob); geninfo->jobs_in_queue ++; fprintf(stderr, "\nprepare_distrenjob: attempting update_xml_joblist()\n"); update_xml_joblist(geninfo); fprintf(stderr, "\nprepare_distrenjob: attempting update_general_info()\n"); update_general_info(geninfo); fprintf(stderr, "\nprepare_distrenjob: attempting updateJobStatsXML()\n"); updateJobStatsXML(distrenjob); return 0; } /** distrenjob_enqueue: This function adds the job to the queue based on its priority */ int distrenjob_enqueue(struct general_info *geninfo, struct distrenjob *job) { struct distrenjob *prev_job; struct distrenjob *current_job; struct distrenjob *head; head = &geninfo->head; prev_job = head; /** iterate through linked jobs */ for(current_job = head->next; current_job; current_job = current_job->next) { fprintf(stderr, "enqueue loop iteration\n"); if(job->priority < current_job->priority) { /** if job's priority is less than current_job's priority, insert job keep in mind 1 is the highest priority given to jobs, head has a priority of zero so it will always be before other jobs */ prev_job->next = job; job->next = current_job; fprintf(stderr, "adding job before jobname: %s\n", current_job->name); return 0; } prev_job = current_job; } /** if it has reached the end of the list, add job there */ prev_job->next = job; fprintf(stderr, "adding job at end of queue\n"); return 0; } /** Changes the priority of an existing (and maybe running) job. @arg head I may end up changing the head if job == head */ int change_job_priority(struct general_info *geninfo, struct distrenjob *job, int new_priority){ struct distrenjob *current_job; struct distrenjob *prev_job; char *serialname; distrenjob_remove(geninfo, job); job->priority = new_priority; prev_job = &geninfo->head; if(job->frameset[0].status == FRAMESETSTATUS_UNASSIGNED) /** if job was not yet started */ { distrenjob_enqueue(geninfo, job); return 0; } /** iterate through linked list of jobs */ for(current_job = &geninfo->head; current_job != NULL && job->priority > current_job->priority; current_job = current_job->next) prev_job = current_job; prev_job->next = job; job->next = current_job; update_xml_joblist(geninfo); /** reserialize after changes */ serialname = job_getserialfilename(geninfo, job->jobnum); distrenjob_serialize(job, serialname); free(serialname); return 0; } /** Frame Finder: matches your computer up with a lovely frame to render, starts looking at oldest job first @TODO: We must return both jobnum and framenum @TODO: Add calls in main() @return 0 success, other: error */ int find_jobframe(struct general_info *geninfo, struct distrenjob **job, struct frameset **frame) { if(geninfo->hibernate) return 1; unsigned int frame_counter; unsigned short int found; struct distrenjob *distrenjob_ptr; found = 0; /* iterate through jobs from first to last */ for(distrenjob_ptr = geninfo->head.next; distrenjob_ptr && !distrenjob_ptr->hibernate; distrenjob_ptr = distrenjob_ptr->next) { for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++) { if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found { found = 1; distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED; distrenjob_ptr->frameset[frame_counter].start_time = clock(); distrenjob_ptr->assigned_frames++; distrenjob_ptr->prev_frame_index = frame_counter; updateJobStatsXML(distrenjob_ptr); } if(found) break; } if(found) break; } if(!found) { fprintf(stderr, "No more jobs to render\n"); sleep(1); /*< @todo eliminate the need for this line*/ return 1; } *job = distrenjob_ptr; *frame = &distrenjob_ptr->frameset[frame_counter]; return 0; } int find_jobframe_from_job(struct distrenjob *distrenjob_ptr, struct distrenjob **job, struct frameset **frame) { unsigned int frame_counter; unsigned short int found; found = 0; for(frame_counter = (distrenjob_ptr->prev_frame_index + 1); frame_counter < distrenjob_ptr->total_frames; frame_counter ++) { if(distrenjob_ptr->frameset[frame_counter].status == FRAMESETSTATUS_UNASSIGNED) // jobframe found { found = 1; distrenjob_ptr->frameset[frame_counter].status = FRAMESETSTATUS_ASSIGNED; distrenjob_ptr->frameset[frame_counter].start_time = clock(); distrenjob_ptr->assigned_frames++; distrenjob_ptr->prev_frame_index = frame_counter; updateJobStatsXML(distrenjob_ptr); } if(found) break; } if(!found) { fprintf(stderr, "No more frames in this job number %d", distrenjob_ptr->jobnum); distrenjob_ptr->prev_frame_index = frame_counter; return 1; } *job = distrenjob_ptr; *frame = &distrenjob_ptr->frameset[frame_counter]; return 0; } // find a frame to render when the job that the last frame was for no longer exists int find_jobframe_new(struct general_info *geninfo, int rend_pwr, struct distrenjob **job, struct frameset **frame) { if(geninfo->hibernate) return 1; float power_difference; float greatest_power_difference; unsigned short int found; struct distrenjob *job_to_render; struct distrenjob *distrenjob_ptr; greatest_power_difference = -10000; found = 0; /* iterate through jobs from first to last */ for(distrenjob_ptr = geninfo->head.next; distrenjob_ptr && !distrenjob_ptr->hibernate; distrenjob_ptr = distrenjob_ptr->next) { if(distrenjob_ptr->prev_frame_index < (distrenjob_ptr->total_frames - 1)) { /** Why is the number 11 found here again? --ohnobinki */ power_difference = (((float)geninfo->total_render_power / (float)geninfo->total_priority_pieces) * (NUMBER_ELEVEN - (float)distrenjob_ptr->priority)); power_difference = power_difference - (float)distrenjob_ptr->assigned_render_power; fprintf(stderr, "job num %d\npower difference: %f\n", distrenjob_ptr->jobnum, power_difference); if(power_difference > greatest_power_difference) { job_to_render = distrenjob_ptr; greatest_power_difference = power_difference; found = 1; } } } if(!found) { fprintf(stderr, "No more jobs to render\n"); return 1; } find_jobframe_from_job(job_to_render, job, frame); job_to_render->assigned_render_power = job_to_render->assigned_render_power + rend_pwr; return 0; } // gets a frame to render from the same job that the previously rendered frame was from int find_jobframe_again(struct general_info *geninfo, int jobnum, int rend_pwr, struct distrenjob **job, struct frameset **frame) { if(geninfo->hibernate) return 1; short int found; struct distrenjob *distrenjob_ptr; distrenjob_ptr = distrenjob_get(&geninfo->head, jobnum); // if the job was not found or there are no frames left in the job... if(!distrenjob_ptr || distrenjob_ptr->prev_frame_index >= (distrenjob_ptr->total_frames - 1)) { fprintf(stderr, "Job number %d has been finished, finding new job\n", jobnum); // if previous job isn't yet finished the render power of the slave is removed from it if(distrenjob_ptr) distrenjob_ptr->assigned_render_power = distrenjob_ptr->assigned_render_power - rend_pwr; return find_jobframe_new(geninfo, rend_pwr, job, frame); } found = 0; found = find_jobframe_from_job(distrenjob_ptr, job, frame); if(found) find_jobframe_new(geninfo, rend_pwr, job, frame); return 0; } /** Checks for dead, latent, or stale slaves */ void frame_watchdog(struct distrenjob *distrenjob_head) { struct distrenjob *distrenjob_ptr; unsigned int counter; for(distrenjob_ptr = distrenjob_head->next; distrenjob_ptr; distrenjob_ptr = distrenjob_ptr->next) /* iterate through jobs */ { /* if the job has been started, checks by seeing if either to first or second frame has been started */ if(distrenjob_ptr->frameset[0].status != FRAMESETSTATUS_UNASSIGNED || distrenjob_ptr->frameset[1].status != FRAMESETSTATUS_UNASSIGNED) /* iterate through all frames for this job: */ for(counter = 0; counter < distrenjob_ptr->total_frames; counter ++) /*watchdog_forgiveness = seconds of forgiveness before frame is re-assigned: */ if((distrenjob_ptr->frameset[counter].start_time + distrenjob_ptr->watchdog_forgiveness) < clock()) { /* If frame is not completed within the number of seconds specified by watchdog_forgiveness Then change the frame status to unassigned */ distrenjob_ptr->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED; distrenjob_ptr->assigned_frames--; } updateJobStatsXML(distrenjob_ptr); } } /** Finds a distrenjob struct based on the jobnum @arg jobnum job number to search for @return NULL on job doesn't exist */ struct distrenjob *distrenjob_get(struct distrenjob *head, jobnum_t jobnum) { struct distrenjob *distrenjob_ptr; /* The conditions of the for loop will leave distrenjob_ptr at NULL if the end of the list is reached. It will leave it pointing to the correct job if it is found. */ for(distrenjob_ptr = head; distrenjob_ptr && distrenjob_ptr->jobnum != jobnum; distrenjob_ptr = distrenjob_ptr->next); return distrenjob_ptr; } /** Removes a distrenjob from the distrenjob linked list. It does not free the distrenjob, however. You should do that with distrenjob_free() from distrenjob.h @arg head pointer to the head of the linkedlist of distrenjobs */ void distrenjob_remove(struct general_info *geninfo, struct distrenjob *bj) { struct distrenjob *previous_distrenjob; for(previous_distrenjob = &geninfo->head; previous_distrenjob && previous_distrenjob->next != bj; /*< stop on the distrenjob that comes before bj */ previous_distrenjob = previous_distrenjob->next) ; /* This removes references to bj from the linked list. I.E., we now skip bj when iterating through the list */ previous_distrenjob->next = bj->next; geninfo->jobs_in_queue --; } /* Grabs config info from confs */ int distrend_do_config(int argc, char *argv[], struct distrend_config **config) { unsigned int counter; cfg_opt_t myopts_listen[] = { CFG_SIMPLE_STR("type", NULL), CFG_SIMPLE_STR("path", NULL), CFG_SIMPLE_INT("port", NULL), CFG_END() }; cfg_opt_t myopts[] = { CFG_SEC("listen", /* this must be imported into struct listens (which must still be declared) */ myopts_listen, CFGF_MULTI), CFG_SIMPLE_STR("datadir", NULL), CFG_END() }; struct distrenjob *distrenjob; cfg_t *cfg_listen; int tmp; /* * test distrenjob_unserialize() */ tmp = distrenjob_unserialize(&distrenjob, "distrenjob.xml.example"); if(tmp) fprintf(stderr, "distrenjob_unserialize() returned %d. Try to cd to distren/doc if you want to test out the distrenjob_unserialize() function. (This will only fix this error if the error is due to an inability of the xml library to access distrenjob.xml.example)\n\n", tmp); else fprintf(stderr, "using email ``%s'' for user ``%s'' -- reading in XML files and pulling data from them using libxml2+XPath works!!!\n", distrenjob->email, distrenjob->submitter); fprintf(stderr, "%s:%d running config\n", __FILE__, __LINE__); *config = malloc(sizeof(struct distrend_config)); myopts[1].simple_value = &(*config)->datadir; if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options)) return 1; /** grab listen blocks: */ (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1)); for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++) { cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter); (*config)->listens[counter].port = cfg_getint(cfg_listen, "port"); (*config)->listens[counter].sock = -1; } memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen)); fprintf(stderr, "using %s as datadir\n", (*config)->datadir); return 0; } int distrend_config_free(struct distrend_config *config) { options_free(config->options); free(config); return 0; } /* ************************** XML Functions ************************* */ // writes the general_info.xml file which is a copy of the general_info structure // except that it doesn't hold free_clients and rendering_clients void update_general_info(struct general_info *geninfo) { xmlTextWriterPtr writer; char *tmp; writer = xmlNewTextWriterFilename("general_info.xml", 0); xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL); xmlTextWriterStartElement(writer, (xmlChar*)"general_info"); _distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue); xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs); xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered); xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", geninfo->highest_jobnum); xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp); free(tmp); xmlTextWriterEndDocument(writer); xmlFreeTextWriter(writer); } // this reads the information from general_info.xml to the general_info structure int import_general_info(struct general_info *general_info) { xmlDocPtr doc; xmlNodePtr cur; doc = xmlParseFile("general_info.xml"); cur = xmlDocGetRootElement(doc); if (xmlStrcmp(cur->name, (xmlChar*)"general_info")) { fprintf(stderr, "xml document is wrong type"); xmlFreeDoc(doc); return 1; } cur = cur->xmlChildrenNode; general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); cur = cur->next; general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); cur = cur->next; general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); cur = cur->next; general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); general_info->hibernate = 0; general_info->free_clients = 0; general_info->rendering_clients = 0; general_info->timestamp = 0; general_info->total_render_power = 0; general_info->total_priority_pieces = 0; xmlFreeDoc(doc); return 0; } // extracts data from the xml created by above function and creates a job from it // it returns a pointer to the created job struct distrenjob *createJobFromXML(struct general_info *geninfo, unsigned int jobnum) { xmlDocPtr doc; xmlNodePtr cur; char *file_name; struct distrenjob *distrenjob; int start_frame; int counter; int counter2; file_name = job_getserialfilename(geninfo, jobnum); doc = xmlParseFile(file_name); if(!doc) return NULL; distrenjob_new(&distrenjob); cur = xmlDocGetRootElement(doc); distrenjob->name = (char*)xmlGetProp(cur, (xmlChar*)"name"); distrenjob->submitter = (char*)xmlGetProp(cur, (xmlChar*)"submitter"); distrenjob->priority = atoi((char*)xmlGetProp(cur, (xmlChar*)"priority")); cur = cur->xmlChildrenNode; distrenjob->width = atoi((char*)xmlGetProp(cur, (xmlChar*)"width")); distrenjob->height = atoi((char*)xmlGetProp(cur, (xmlChar*)"number")); cur = cur->next; start_frame = atoi((char*)xmlGetProp(cur, (xmlChar*)"start_frame")); distrenjob->total_frames = atoi((char*)xmlGetProp(cur, (xmlChar*)"end_frame")) - start_frame + 1; distrenjob->output_format = (char*)xmlGetProp(cur, (xmlChar*)"output_format"); cur = cur->next; distrenjob->watchdog_forgiveness = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); xmlFreeDoc(doc); /** load up stats.xml file to retrieve data */ _distren_asprintf(&file_name, "stor/job%d/stats.xml", jobnum); doc = xmlParseFile(file_name); free(file_name); cur = xmlDocGetRootElement(doc); distrenjob->total_render_time = (time_t)atol((char*)xmlGetProp(cur, (xmlChar*)"total_render_time")); xmlFreeDoc(doc); distrenjob->frameset = malloc(sizeof(struct frameset) * distrenjob->total_frames); // change the status of all frames to "unassigned" counter2 = start_frame; for(counter = 0; counter <= distrenjob->total_frames; counter++){ distrenjob->frameset[counter].num = counter2; distrenjob->frameset[counter].status = FRAMESETSTATUS_UNASSIGNED; counter2++; } restoreJobState(distrenjob); return distrenjob; } int updateJobStatsXML(struct distrenjob *job) { xmlTextWriterPtr writer; char *tmp; _distren_asprintf(&tmp, "stor/job%d/stats.xml", job->jobnum); /** create xml document at the location tmp with no compression */ writer = xmlNewTextWriterFilename(tmp, 0); free(tmp); xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL); xmlTextWriterStartElement(writer, (xmlChar*)"stats"); _distren_asprintf(&tmp, "%d", job->assigned_frames); xmlTextWriterWriteAttribute(writer, (xmlChar*)"assigned_frames", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", job->completed_frames); xmlTextWriterWriteAttribute(writer, (xmlChar*)"completed_frames", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", job->total_render_time); xmlTextWriterWriteAttribute(writer, (xmlChar*)"total_render_time", (xmlChar*)tmp); free(tmp); /** end document */ xmlTextWriterEndDocument(writer); /** free writer and save xml file to disk @TODO confirma that calling xmlFreeTextWriter calls write() and close(), respectively. (close() causes asynchronous writes to be completed). */ xmlFreeTextWriter(writer); return 1; } /** updates job_list.xml which lists all the jobs in the queue @return 0 means success */ int update_xml_joblist(struct general_info *geninfo) { struct distrenjob *job; xmlTextWriterPtr writer; char *tmp; char *tmp2; int counter; /** update timestamp */ geninfo->timestamp ++; if(geninfo->timestamp > 65530) geninfo->timestamp = 0; writer = xmlNewTextWriterFilename("job_list.xml", 0); xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL); /** create root element job_list */ xmlTextWriterStartElement(writer, (xmlChar*)"job_list"); _distren_asprintf(&tmp, "%d", geninfo->timestamp); xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp); free(tmp); geninfo->total_priority_pieces = 0; counter = 0; for(job = geninfo->head.next; job; job = job->next) { _distren_asprintf(&tmp, "jobnum%d", counter); _distren_asprintf(&tmp2, "%d", job->jobnum); xmlTextWriterWriteElement(writer, (xmlChar*)tmp, (xmlChar*)tmp2); free(tmp2); free(tmp); /** this is needed for the new frame finder to work Why the random constant numeral 11? --ohnobinki */ geninfo->total_priority_pieces = geninfo->total_priority_pieces + (NUMBER_ELEVEN - job->priority); counter++; } /** close elements and end document */ xmlTextWriterEndDocument(writer); /** free writer and save xml file to disk */ xmlFreeTextWriter(writer); return 0; } /** returns 0 if completed successfully this reads a list of jobs in the queue before DistRen was shut down and then adds the jobs to the queue, as if it were never shut down */ int createQueueFromXML(struct general_info *geninfo) { int tmp; xmlDocPtr doc; xmlNodePtr cur; /** load xml document */ doc = xmlParseFile("job_list.xml"); if(doc == NULL) { fprintf(stderr, "createQueueFromXML: document not found\n"); return 1; } /** have cur point to the root element of the xml document pointed to by doc */ cur = xmlDocGetRootElement(doc); if(cur == NULL) { fprintf(stderr, "createQueueFromXML: document empty\n"); return 1; } if(xmlStrcmp(cur->name, (const xmlChar*)"job_list")) { fprintf(stderr, "createQueueFromXML: incorrect root element (%s)", (char*)cur->name); } /** moves into the children elements of job_list */ cur = cur->xmlChildrenNode; tmp = reCreateQueueFromXML(geninfo, doc, cur); xmlFreeDoc(doc); return tmp; } /** inserts jobs at front of queue, starting with the last job in the job_list xml file to preserve the order of the queue */ int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current) { struct distrenjob *holder; // holds the job that "was" after head, so that the new struct can be inserted after head struct distrenjob *job; // job to be added xmlChar *tmp; int job_num; if(current == NULL) // base case, if element doesn't exist then don't do anything return 0; /** recursively call itself so that the next job in the queue is added to the front before the current one */ if(reCreateQueueFromXML(geninfo, doc, current->next)) /** bail if failure recursevely ^-- is this the best thing to do in this case? @TODO bail-out cleanup */ return 1; /** actual work is done now: add the job to the front of the queue */ holder = geninfo->head.next; tmp = xmlNodeListGetString(doc, current->xmlChildrenNode, 1); // get job number job_num = atoi((char*)tmp); xmlFree(tmp); job = createJobFromXML(geninfo, job_num); if(job == NULL) return 1; /** insert job at front of the queue */ geninfo->head.next = job; job->next = holder; return 0; } /* ************************** Test Functions ************************* */ int printFrameInfo(struct frameset *frame) { char *status; status = NULL; switch(frame->status) { case FRAMESETSTATUS_UNASSIGNED: _distren_asprintf(&status, "%s", "unassigned"); break; case FRAMESETSTATUS_ASSIGNED: _distren_asprintf(&status, "%s", "assigned"); break; case FRAMESETSTATUS_DONE: _distren_asprintf(&status, "%s", "completed"); break; case FRAMESETSTATUS_CANCELED: _distren_asprintf(&status, "%s", "canceled"); } printf("frame #: %d --> %s\n", frame->num, status); free(status); return 0; } int printJob(struct distrenjob *job) { int counter; fprintf(stderr, "frame_num: status\n"); for(counter = 0; counter < job->total_frames; counter++) { printFrameInfo(&job->frameset[counter]); } return 1; } int printJobInfo(struct distrenjob *job) { fprintf(stderr, "type: %d\n", job->type); fprintf(stderr, "name: %s\n", job->email); fprintf(stderr, "submitter: %s\n", job->submitter); fprintf(stderr, "e-mail: %s\n", job->email); fprintf(stderr, "priority: %d\n", job->priority); fprintf(stderr, "completed: %d\n", job->completed_frames); fprintf(stderr, "assigned: %d\n", job->assigned_frames); fprintf(stderr, "total: %d\n", job->total_frames); fprintf(stderr, "watchdog: %d\n", job->watchdog_forgiveness); fprintf(stderr, "hibernate: %d\n", job->hibernate); fprintf(stderr, "prev_frame: %d\n", job->prev_frame_index); fprintf(stderr, "render power: %ld\n", job->assigned_render_power); return 1; } int printAllJobnums(struct distrenjob *head) { struct distrenjob *current_job; int total_jobs; fprintf(stderr, "job numbers in the order they will be processed:\n"); total_jobs = 0; for(current_job = head->next; current_job; current_job = current_job->next) { fprintf(stderr, "%d: %s\n", current_job->jobnum, current_job->name); total_jobs++; } fprintf(stderr, "\n%d jobs in queue\n\n", total_jobs); return 1; } /* ************************** Main ************************* */ int main(int argc, char *argv[]) { /* Argument-parser */ int counter; int test; // Interactive test mode if 1 struct general_info general_info; int cont; struct distrend_clientset *clients; enum clientstatus { CLIENTSTATUS_UNINITIALIZED = 0, CLIENTSTATUS_BUSY = 1, CLIENTSTATUS_IDLE = 2 } clientstatus; int command; jobnum_t jobnum; struct distrenjob *tmp_job; struct frameset *tmp_frame; int type; char *name; char *submitter; char *email; int priority; int width; int height; int start_frame; int end_frame; size_t read_buf; xmlinit(); test = 0; for(counter = 0; counter < argc; counter ++) { if(strcmp(argv[counter], "-h") == 0) { fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n"); return 2; } else if(strcmp(argv[counter], "-t") == 0) { fprintf(stderr, "Entering into test mode...\n\n"); test=1; } } cont = 1; if(distrend_do_config(argc, argv, &general_info.config)) return 1; if(start_data(&general_info, general_info.config->datadir)) { fprintf(stderr, "%s:%d: start_data() failed\n", __FILE__, __LINE__); return 1; } // pre-loaded jobs for testing prepare_distrenjob(&general_info, 1, "awesome", "LordOfWar", "onlylordofwar@gmail.com", 8, 1, 100, 640, 480); prepare_distrenjob(&general_info, 1, "hamburger", "Ohnobinki", "ohnobinki@ohnopublishing.net", 3, 1, 50, 1280, 720); while(test == 1) { fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n"); fprintf(stderr, "\t1 \tPrint all frames in a job\n"); fprintf(stderr, "\t2 \tExamine certain job\n"); fprintf(stderr, "\t3 \tGet a frame to render\n"); fprintf(stderr, "\t4 \tAdd a job\n"); fprintf(stderr, "\t5 \tDelete a job\n"); fprintf(stderr, "\t6 \tPrint jobnums in queue\n"); fprintf(stderr, "\t7 \tPrint general info\n"); fprintf(stderr, "\t8 \tQuit\n"); scanf("%d", &command); switch(command) { case 1: fprintf(stderr, "Job number: "); scanf("%d", &jobnum); printJob(distrenjob_get(&general_info.head, jobnum)); break; case 2: fprintf(stderr, "Job number: "); scanf("%d", &jobnum); printJobInfo(distrenjob_get(&general_info.head, jobnum)); break; case 3: general_info.total_render_power ++; if(!find_jobframe_again(&general_info, -1, 1, &tmp_job, &tmp_frame)) { fprintf(stderr, "frame was found, details below\n"); fprintf(stderr, "Job#:%d\n", tmp_job->jobnum); fprintf(stderr, "Frame#:%d\n", tmp_frame->num); } break; case 4: name = NULL; submitter = NULL; email = NULL; fprintf(stderr, "\nType: \n\t 1 \t blender\n\t 2 \t povray\n"); scanf("%d", &type); fprintf(stderr, "\nName: "); scanf("\n"); getline(&name, &read_buf, stdin); fprintf(stderr, "\nSubmitter: "); getline(&submitter, &read_buf, stdin); fprintf(stderr, "\nEmail: "); getline(&email, &read_buf, stdin); fprintf(stderr, "\nPriority: "); scanf("%d", &priority); fprintf(stderr, "\nStart frame: "); scanf("%d", &start_frame); fprintf(stderr, "\nEnd frame: "); scanf("%d", &end_frame); fprintf(stderr, "\nWidth: "); scanf("%d", &width); fprintf(stderr, "\nHeight: "); scanf("%d", &height); prepare_distrenjob(&general_info, type, name, submitter, email, priority, start_frame, end_frame, width, height); break; case 5: fprintf(stderr, "\nJob number: "); scanf("%d", &jobnum); distrenjob_remove(&general_info, distrenjob_get(&general_info.head, jobnum)); break; case 6: printAllJobnums(&general_info.head); break; case 7: fprintf(stderr, "\nHighest job number: %d", general_info.highest_jobnum); fprintf(stderr, "\nJobs in queue: %d", general_info.jobs_in_queue); fprintf(stderr, "\nTotal frames rendered: %d", general_info.total_frames_rendered); fprintf(stderr, "\nTimestamp: %lu", (long)general_info.timestamp); fprintf(stderr, "\nTotal priority pieces: %ld", general_info.total_priority_pieces); fprintf(stderr, "\nTotal render power: %ld\n", general_info.total_render_power); break; case 8: fprintf(stderr,"Goodbye.\n"); return 0; default: fprintf(stderr, "Invalid input, please try again.\n"); } } distrend_listen(general_info.config, &clients); /* This is called the "main loop" */ while(cont) { struct distrend_action *action; int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */ distrend_accept(general_info.config, clients, &action); cont = distrend_do(action); /* Make the following code more event-driven */ frame_watchdog(&general_info.head); struct frameset *frame; struct distrenjob *job; /* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */ if(clientstatus == CLIENTSTATUS_IDLE) { int returnnum = find_jobframe(&general_info, &job, &frame); // Finds a frame to render if(returnnum) { fprintf(stderr,"No frames are available to render at this time. Idling...\n"); sleep(10); } else remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client } /* If the client states that they finished the frame */ if(clientsays == DISTREN_REQUEST_DONEFRAME){ clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle finish_frame(&general_info, job, frame->num); // @TODO: Make sure this actually works. } distrend_action_free(action); } distrend_unlisten(general_info.config->listens, clients); distrend_config_free(general_info.config); xmlcleanup(); return 0; } /** constructs the filename for a distrenjob's serialized XML to be stored/retrieved from/in. @return pointer to filename allocated using something malloc() and free() compatible. Must be free()ed by the caller */ char *job_getserialfilename(struct general_info *geninfo, unsigned int jobnum) { char *filename; _distren_asprintf(&filename, "%s/stor/job/%d/distrenjob.xml", geninfo->config->datadir, jobnum); return filename; }