/* Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando This file is a part of DistRen. DistRen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DistRen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with DistRen. If not, see . */ /* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */ #include "distrenjob.h" #include "listen.h" #include "slavefuncs.h" #include "mysql.h" #include "common/asprintf.h" #include "common/execio.h" #include "common/options.h" #include "common/protocol.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* ******************* Structs ************************ */ struct general_info { struct distrenjob head; distrend_mysql_conn_t conn; struct distrend_config *config; struct { /** general_info.xml */ char *geninfo; } files; int jobs_in_queue; unsigned int free_clients; unsigned int rendering_clients; unsigned int total_finished_jobs; unsigned int total_frames_rendered; unsigned int highest_jobnum; int hibernate; time_t timestamp; unsigned long total_render_power; unsigned long total_priority_pieces; }; /* ********************************************* Function Prototypes ********************************************* */ /* ************General Functions************* */ // int xml_dump(); int distrend_do(); int start_data(struct general_info *general_info); int mortition(struct general_info *geninfo, struct distrenjob *job); void frame_watchdog(struct distrenjob *distrenjob_head); int distrend_do_config(int argc, char *argv[], struct distrend_config **config); int distrend_config_free(struct distrend_config *config); /* **************XML Functions**************** */ void update_general_info(struct general_info *geninfo); int import_general_info(struct general_info *general_info); int restore_distrenjob(struct general_info *geninfo, struct distrenjob **distrenjob, jobnum_t jobnum); int updateJobStatsXML(struct distrenjob *job); int update_xml_joblist(struct general_info *geninfo); int createQueueFromXML(struct general_info *geninfo); int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current); /* **************Test Functions**************** */ int printFrameInfo(struct frameset *frame); int printJob(struct distrenjob *job); int printJobInfo(struct distrenjob *job); int printAllJobnums(struct distrenjob *head); int interactiveTest(int test, struct general_info general_info); int job_getserialfilename(char **filename, struct general_info *geninfo, unsigned int jobnum, int createdir); int distren_mkdir_recurse(char *dirname); /* **************** Main ********************* */ int main(int argc, char *argv[]) { /* Parse arguments */ int counter; int test = 0; /*< Interactive mode if 1 */ struct general_info general_info; struct distrend_clientset *clients; enum clientstatus { CLIENTSTATUS_UNINITIALIZED = 0, CLIENTSTATUS_BUSY = 1, CLIENTSTATUS_IDLE = 2 } clientstatus; xmlinit(); for(counter = 0; counter < argc; counter ++) { if(strcmp(argv[counter], "-h") == 0) { fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n"); return 2; } else if(strcmp(argv[counter], "-t") == 0) { fprintf(stderr, "Entering into test mode...\n\n"); test = 1; } } if(distrend_do_config(argc, argv, &general_info.config)) return 1; /** preset paths */ _distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml", general_info.config->datadir); /** MySQL Connection */ if(mysqlConnect(&general_info.conn)) { fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__); return 1; } /** Execute test function */ // interactiveTest(test, general_info); distrend_listen(general_info.config, &clients); int slaveKey = 0; // Remotio should set me on a per-slave basis /* Main Loop */ while(!general_info.config->die) { int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */ distrend_accept(general_info.config, clients); /* Make the following code more event-driven */ frame_watchdog(&general_info.head); struct frameset *frame; struct distrenjob *job; /* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */ if(clientstatus == CLIENTSTATUS_IDLE) { int returnnum = find_jobframe(general_info.conn, slaveKey, job->jobnum, frame->num); // Finds a frame to render @FIXME: Slavenum :D if(returnnum) { fprintf(stderr,"No frames are available to render at this time. Idling...\n"); sleep(10); } else remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client } /* If the client states that they finished the frame */ if(clientsays == DISTREN_REQUEST_DONEFRAME){ clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle finish_frame(general_info.conn, 0, job->jobnum, frame->num); // @TODO: Make sure this actually works. } } distrend_unlisten(general_info.config->listens, clients); distrend_config_free(general_info.config); xmlcleanup(); /** free() paths */ free(general_info.files.geninfo); mysqlDisconnect(general_info.conn); return 0; } /* ********************** Functions ************************* */ /** Performs command stored in a client's request. @TODO: Fill stub */ int distrend_do() { return 0; } /** Checks for dead, latent, or stale slaves */ void frame_watchdog(struct distrenjob *distrenjob_head) { // Replace with mysqlness, maybe // iterate through jobs // if the job has been started, checks by seeing if either to first or second frame has been started // FRAMESETSTATUS_UNASSIGNED // iterate through all frames for this job: //watchdog_forgiveness = seconds of forgiveness before frame is re-assigned: // If frame is not completed within the number of seconds specified by watchdog_forgiveness // Then change the frame status to unassigned } /* Grabs config info from confs */ int distrend_do_config(int argc, char *argv[], struct distrend_config **config) { unsigned int counter; cfg_opt_t myopts_listen[] = { CFG_SIMPLE_STR("type", NULL), CFG_SIMPLE_STR("path", NULL), CFG_SIMPLE_INT("port", NULL), CFG_END() }; cfg_opt_t myopts[] = { CFG_SEC("listen", /* this must be imported into struct listens (which must still be declared) */ myopts_listen, CFGF_MULTI), CFG_SIMPLE_STR("datadir", NULL), CFG_STR_LIST("render_types", NULL, CFGF_NONE), CFG_END() }; cfg_t *cfg_listen; fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__); *config = malloc(sizeof(struct distrend_config)); myopts[1].simple_value = &(*config)->datadir; if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options)) return 1; /** grab listen blocks: */ (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1)); for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++) { cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter); (*config)->listens[counter].port = cfg_getint(cfg_listen, "port"); (*config)->listens[counter].sock = -1; } memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen)); fprintf(stderr, "using %s as datadir\n", (*config)->datadir); return 0; } int distrend_config_free(struct distrend_config *config) { options_free(config->options); free(config); return 0; } /* ************************** XML Functions ************************* */ // writes the general_info.xml file which is a copy of the general_info structure // except that it doesn't hold free_clients and rendering_clients void update_general_info(struct general_info *geninfo) { xmlTextWriterPtr writer; char *tmp; writer = xmlNewTextWriterFilename(geninfo->files.geninfo, 0); xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL); xmlTextWriterStartElement(writer, (xmlChar*)"general_info"); _distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue); xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs); xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered); xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp); free(tmp); _distren_asprintf(&tmp, "%d", geninfo->highest_jobnum); xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp); free(tmp); xmlTextWriterEndDocument(writer); xmlFreeTextWriter(writer); } /** Reads general state information from general_info.xml into the general_info structure. */ int import_general_info(struct general_info *general_info) { xmlDocPtr doc; xmlNodePtr cur; doc = xmlParseFile(general_info->files.geninfo); cur = xmlDocGetRootElement(doc); if (xmlStrcmp(cur->name, (xmlChar*)"general_info")) { fprintf(stderr, "xml document is wrong type"); xmlFreeDoc(doc); return 1; } cur = cur->xmlChildrenNode; general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); cur = cur->next; general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); cur = cur->next; general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); cur = cur->next; general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1)); general_info->hibernate = 0; general_info->free_clients = 0; general_info->rendering_clients = 0; general_info->timestamp = 0; general_info->total_render_power = 0; general_info->total_priority_pieces = 0; xmlFreeDoc(doc); return 0; } /** updates job_list.xml which lists all the jobs in the queue @return 0 means success */ // @QUERY: Likely obselete (don't remove at request of ohnobinki) int update_xml_joblist(struct general_info *geninfo) { struct distrenjob *job; xmlTextWriterPtr writer; char *tmp; int counter; /** update timestamp */ geninfo->timestamp ++; if(geninfo->timestamp > 65530) geninfo->timestamp = 0; _distren_asprintf(&tmp, "%s/job_list.xml", geninfo->config->datadir); writer = xmlNewTextWriterFilename(tmp, 0); free(tmp); xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL); /** create root element job_list */ xmlTextWriterStartElement(writer, (xmlChar*)"job_list"); _distren_asprintf(&tmp, "%d", geninfo->timestamp); xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp); free(tmp); geninfo->total_priority_pieces = 0; counter = 0; for(job = geninfo->head.next; job; job = job->next) { _distren_asprintf(&tmp, "%d", job->jobnum); xmlTextWriterWriteElement(writer, (xmlChar*)"jobnum", (xmlChar*)tmp); free(tmp); /** this is needed for the new frame finder to work Why the random constant numeral 11? --ohnobinki */ geninfo->total_priority_pieces = geninfo->total_priority_pieces + job->priority; counter++; } xmlTextWriterEndElement(writer); /** close elements and end document */ xmlTextWriterEndDocument(writer); /** free writer and save xml file to disk */ xmlFreeTextWriter(writer); return 0; } /* ************************** Test Functions ************************* */ /** Prints info about frames in a frameset */ int printFrameInfo(struct frameset *frame) { char *status; status = NULL; switch(frame->status) { case FRAMESETSTATUS_UNASSIGNED: _distren_asprintf(&status, "%s", "unassigned"); break; case FRAMESETSTATUS_ASSIGNED: _distren_asprintf(&status, "%s", "assigned"); break; case FRAMESETSTATUS_DONE: _distren_asprintf(&status, "%s", "completed"); break; case FRAMESETSTATUS_CANCELED: _distren_asprintf(&status, "%s", "canceled"); } printf("frame #: %d --> %s\n", frame->num, status); free(status); return 0; } /** Prints information about all frames in a job */ int printJob(struct distrenjob *job) { int counter; fprintf(stderr, "frame_num: status\n"); for(counter = 0; counter < job->total_frames; counter++) { printFrameInfo(&job->frameset[counter]); } return 1; } /** Prints information about a job */ int printJobInfo(struct distrenjob *job) { fprintf(stderr, "type: %d\n", job->type); fprintf(stderr, "name: %s\n", job->name); fprintf(stderr, "submitter: %s\n", job->submitter); fprintf(stderr, "priority: %d\n", job->priority); fprintf(stderr, "completed: %d\n", job->completed_frames); fprintf(stderr, "assigned: %d\n", job->assigned_frames); fprintf(stderr, "total: %d\n", job->total_frames); fprintf(stderr, "watchdog: %d\n", job->watchdog_forgiveness); fprintf(stderr, "hibernate: %d\n", job->hibernate); fprintf(stderr, "prev_frame: %d\n", job->prev_frame_index); fprintf(stderr, "render power: %ld\n", job->assigned_render_power); return 1; } /** Print all jobnums in the queue */ int printAllJobnums(struct distrenjob *head) { struct distrenjob *current_job; int total_jobs; fprintf(stderr, "job numbers in the order they will be processed:\n"); total_jobs = 0; for(current_job = head->next; current_job; current_job = current_job->next) { fprintf(stderr, "%d: %s\n", current_job->jobnum, current_job->name); total_jobs++; } fprintf(stderr, "\n%d jobs in queue\n\n", total_jobs); return 1; } /** Interactive test for the queuing system */ /* @QUEUE: Test uses methods not present in C code using mysql web-based system int interactiveTest(int test, struct general_info general_info){ int command; jobnum_t jobnum; struct distrenjob *tmp_job; struct frameset *tmp_frame; int type; char *name; char *submitter; char *email; int priority; int width; int height; int start_frame; int end_frame; size_t read_buf; while(test == 1) { fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n"); fprintf(stderr, "\t1 \tPrint all frames in a job\n"); fprintf(stderr, "\t2 \tExamine certain job\n"); fprintf(stderr, "\t3 \tGet a frame to render\n"); fprintf(stderr, "\t4 \tAdd a job\n"); fprintf(stderr, "\t5 \tDelete a job\n"); fprintf(stderr, "\t6 \tPrint jobnums in queue\n"); fprintf(stderr, "\t7 \tPrint general info\n"); fprintf(stderr, "\t8 \tQuit\n"); scanf("%d", &command); switch(command) { case 1: fprintf(stderr, "Job number: "); scanf("%d", &jobnum); printJob(distrenjob_get(&general_info.head, jobnum)); break; case 2: fprintf(stderr, "Job number: "); scanf("%d", &jobnum); printJobInfo(distrenjob_get(&general_info.head, jobnum)); break; case 3: general_info.total_render_power ++; if(!find_jobframe_again(&general_info, -1, 1, &tmp_job, &tmp_frame)) { fprintf(stderr, "frame was found, details below\n"); fprintf(stderr, "Job#:%d\n", tmp_job->jobnum); fprintf(stderr, "Frame#:%d\n", tmp_frame->num); } break; case 4: name = NULL; submitter = NULL; email = NULL; fprintf(stderr, "\nType: \n\t 1 \t blender\n\t 2 \t povray\n"); scanf("%d", &type); fprintf(stderr, "\nName: "); scanf("\n"); getline(&name, &read_buf, stdin); fprintf(stderr, "\nSubmitter: "); getline(&submitter, &read_buf, stdin); fprintf(stderr, "\nEmail: "); getline(&email, &read_buf, stdin); fprintf(stderr, "\nPriority: "); scanf("%d", &priority); fprintf(stderr, "\nStart frame: "); scanf("%d", &start_frame); fprintf(stderr, "\nEnd frame: "); scanf("%d", &end_frame); fprintf(stderr, "\nWidth: "); scanf("%d", &width); fprintf(stderr, "\nHeight: "); scanf("%d", &height); prepare_distrenjob(&general_info, type, name, submitter, priority, start_frame, end_frame, width, height); break; case 5: fprintf(stderr, "\nJob number: "); scanf("%d", &jobnum); distrenjob_remove(&general_info, distrenjob_get(&general_info.head, jobnum)); break; case 6: printAllJobnums(&general_info.head); break; case 7: fprintf(stderr, "\nHighest job number: %d", general_info.highest_jobnum); fprintf(stderr, "\nJobs in queue: %d", general_info.jobs_in_queue); fprintf(stderr, "\nTotal frames rendered: %d", general_info.total_frames_rendered); fprintf(stderr, "\nTimestamp: %lu", (long)general_info.timestamp); fprintf(stderr, "\nTotal priority pieces: %ld", general_info.total_priority_pieces); fprintf(stderr, "\nTotal render power: %ld\n", general_info.total_render_power); break; case 8: fprintf(stderr,"Goodbye.\n"); test = 0; default: fprintf(stderr, "Invalid input, please try again.\n"); } } return 0; } */ /** @TODO: reuse for constructing data paths? constructs the filename for a distrenjob's serialized XML to be stored/retrieved from/in. @arg filename pointer to filename allocated using something malloc() and free() compatible. Must be free()ed by the caller @return 0 on success */ int job_getserialfilename(char **filename, struct general_info *geninfo, unsigned int jobnum, int createdir) { _distren_asprintf(filename, "%s/stor/job/%d/distrenjob.xml", geninfo->config->datadir, jobnum); if(!*filename) return 1; if(createdir) return distren_mkdir_recurse(*filename); return 0; }