Changeset - 14cd2541024e
[Not reviewed]
default
0 4 0
Ethan Zonca (ethanzonca) - 16 years ago 2010-02-16 22:34:40
e@ethanzonca.com
Updated test interface, moved/cleaned up various functions
4 files changed with 57 insertions and 192 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
/* This file contains the code which both processes (renders) jobs as a slave, and the code which distributes frames to slaves after receiving them from the client portion of the codebase. */
 

	
 
#include "distrenjob.h"
 
#include "listen.h"
 
#include "slavefuncs.h"
 
#include "mysql.h"
 

	
 
#include "common/asprintf.h"
 
#include "common/execio.h"
 
#include "common/options.h"
 
#include "common/protocol.h"
 

	
 
#include <confuse.h>
 
#include <malloc.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <sys/stat.h>
 
#include <sys/types.h>
 
#include <time.h>
 
#include <unistd.h>
 

	
 
#include <libxml/encoding.h>
 
#include <libxml/parser.h>
 
#include <libxml/tree.h>
 
#include <libxml/xmlmemory.h>
 
#include <libxml/xmlreader.h>
 
#include <libxml/xmlwriter.h>
 

	
 
/* ******************* Structs ************************ */
 
struct general_info
 
{
 
  struct distrenjob head;
 
  distrend_mysql_conn_t conn;
 

	
 
  struct distrend_config *config;
 

	
 
  struct
 
  {
 
    /** general_info.xml */
 
    char *geninfo;
 
    
 
  } files;
 

	
 
  int jobs_in_queue;
 
  unsigned int free_clients;
 
  unsigned int rendering_clients;
 
  unsigned int total_finished_jobs;
 
  unsigned int total_frames_rendered;
 
  unsigned int highest_jobnum;
 
  int hibernate;
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 
/* *********************************************
 
   Function Prototypes
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
// int xml_dump();
 
int distrend_do();
 
int start_data(struct general_info *general_info);
 
int mortition(struct general_info *geninfo, struct distrenjob *job);
 
void frame_watchdog(struct distrenjob *distrenjob_head);
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int restore_distrenjob(struct general_info *geninfo, struct distrenjob **distrenjob, jobnum_t jobnum);
 
int updateJobStatsXML(struct distrenjob *job);
 
int update_xml_joblist(struct general_info *geninfo);
 
int createQueueFromXML(struct general_info *geninfo);
 
int reCreateQueueFromXML(struct general_info *geninfo, xmlDocPtr doc, xmlNodePtr current);
 

	
 
/* **************Test Functions**************** */
 
int printFrameInfo(struct frameset *frame);
 
int printJob(struct distrenjob *job);
 
int printJobInfo(struct distrenjob *job);
 
int printAllJobnums(struct distrenjob *head);
 
int interactiveTest(int test, struct general_info general_info);
 
int job_getserialfilename(char **filename, struct general_info *geninfo, unsigned int jobnum, int createdir);
 
int distren_mkdir_recurse(char *dirname);
 
int interactiveTest(int test, struct general_info general_info, distrend_mysql_conn_t conn);
 

	
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  struct general_info general_info;
 
  struct distrend_clientset *clients;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
  } clientstatus;
 

	
 
  xmlinit();
 
  // xmlinit();
 

	
 
  for(counter = 0; counter < argc; counter ++)
 
    {
 
      if(strcmp(argv[counter], "-h") == 0)
 
      {
 
    	  fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
 
	  return 2;
 
      }
 

	
 
      else if(strcmp(argv[counter], "-t") == 0)
 
      {
 
    	  fprintf(stderr, "Entering into test mode...\n\n");
 
    	  test = 1;
 
      }
 
    }
 

	
 

	
 
  if(distrend_do_config(argc, argv, &general_info.config))
 
    return 1;
 

	
 
  /** preset paths */
 
  _distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml",
 
		    general_info.config->datadir);
 

	
 
  /** MySQL Connection */
 
  fprintf(stderr,"Connecting to mysql...\n");
 
  if(mysqlConnect(&general_info.conn))
 
    {
 
      fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__);
 
      return 1;
 
    }
 
  fprintf(stderr,"Finished connecting!\n");
 

	
 
  /** Execute test function */
 
  // interactiveTest(test, general_info);
 
  interactiveTest(test, general_info, general_info.conn);
 

	
 
  distrend_listen(general_info.config, &clients);
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  while(!general_info.config->die)
 
    {
 
      int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config, clients);
 

	
 
      /* Make the following code more event-driven */
 
      frame_watchdog(&general_info.head);
 

	
 
      struct frameset *frame;
 
      struct distrenjob *job;
 

	
 
      /* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */
 
      if(clientstatus == CLIENTSTATUS_IDLE)
 
	{
 
	  int returnnum = find_jobframe(general_info.conn, slaveKey, job->jobnum, frame->num); // Finds a frame to render @FIXME: Slavenum :D
 
	  if(returnnum)
 
	    {
 
	      fprintf(stderr,"No frames are available to render at this time. Idling...\n");
 
	      sleep(10);
 
	    }
 
	  else
 
	    remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client
 
	}
 
      /* If the client states that they finished the frame */
 
      	if(clientsays == DISTREN_REQUEST_DONEFRAME){
 
      	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
      	  finish_frame(general_info.conn, 0, job->jobnum, frame->num); // @TODO: Make sure this actually works.
 
      	}
 
    }
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
  mysqlDisconnect(general_info.conn);
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 

	
 

	
 
/** Checks for dead, latent, or stale slaves */
 
void frame_watchdog(struct distrenjob *distrenjob_head)
 
{
 
  // Replace with mysqlness, maybe
 
  // iterate through jobs
 
    // if the job has been started, checks by seeing if either to first or second frame has been started
 
    // FRAMESETSTATUS_UNASSIGNED
 
      // iterate through all frames for this job:
 
          //watchdog_forgiveness = seconds of forgiveness before frame is re-assigned:
 
             //   If frame is not completed within the number of seconds specified by watchdog_forgiveness
 
             //   Then change the frame status to unassigned
 
}
 

	
 

	
 
/* Grabs config info from confs */
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
 
{
 
  unsigned int counter;
 

	
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_STR_LIST("render_types", NULL, CFGF_NONE),
 
      CFG_END()
 
    };
 

	
 
  cfg_t *cfg_listen;
 

	
 
  fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__);
 
  *config = malloc(sizeof(struct distrend_config));
 
  myopts[1].simple_value = &(*config)->datadir;
 

	
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
 
      (*config)->listens[counter].sock = -1;
 
    }
 
  memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 

	
 
  return 0;
 
}
 
int distrend_config_free(struct distrend_config *config)
 
{
 
  options_free(config->options);
 
  free(config);
 

	
 
  return 0;
 
}
 
/* ************************** XML Functions ************************* */
 

	
 
// writes the general_info.xml file which is a copy of the general_info structure
 
// except that it doesn't hold free_clients and rendering_clients
 
void update_general_info(struct general_info *geninfo)
 
{
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 

	
 
  writer = xmlNewTextWriterFilename(geninfo->files.geninfo, 0);
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  xmlTextWriterStartElement(writer, (xmlChar*)"general_info");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->jobs_in_queue);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"jobs_in_queue", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_finished_jobs);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_finished_jobs", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->total_frames_rendered);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"total_frames_rendered", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->highest_jobnum);
 
  xmlTextWriterWriteElement(writer, (xmlChar*)"highest_jobnum", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  xmlTextWriterEndDocument(writer);
 
  xmlFreeTextWriter(writer);
 
}
 

	
 
/**
 
   Reads general state information from general_info.xml
 
   into the general_info structure.
 
*/
 
int import_general_info(struct general_info *general_info)
 
{
 
  xmlDocPtr doc;
 
  xmlNodePtr cur;
 

	
 
  doc = xmlParseFile(general_info->files.geninfo);
 
  cur = xmlDocGetRootElement(doc);
 
  if (xmlStrcmp(cur->name, (xmlChar*)"general_info"))
 
    {
 
      fprintf(stderr, "xml document is wrong type");
 
      xmlFreeDoc(doc);
 
      return 1;
 
    }
 

	
 
  cur = cur->xmlChildrenNode;
 
  general_info->jobs_in_queue = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 

	
 
  general_info->hibernate = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  xmlFreeDoc(doc);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   updates job_list.xml which lists all the jobs in the queue
 
   @return 0 means success
 
*/
 
// @QUERY: Likely obselete (don't remove at request of ohnobinki)
 
int update_xml_joblist(struct general_info *geninfo)
 
{
 
  struct distrenjob *job;
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 
  int counter;
 

	
 
  /**
 
     update timestamp
 
  */
 
  geninfo->timestamp ++;
 
  if(geninfo->timestamp > 65530)
 
    geninfo->timestamp = 0;
 

	
 
  _distren_asprintf(&tmp, "%s/job_list.xml",
 
                    geninfo->config->datadir);
 
  writer = xmlNewTextWriterFilename(tmp, 0);
 
  free(tmp);
 

	
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  /**
 
     create root element job_list
 
  */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"job_list");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->timestamp);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  geninfo->total_priority_pieces = 0;
 
  counter = 0;
 
  for(job = geninfo->head.next; job; job = job->next)
 
    {
 
      _distren_asprintf(&tmp, "%d", job->jobnum);
 
      xmlTextWriterWriteElement(writer, (xmlChar*)"jobnum", (xmlChar*)tmp);
 
      free(tmp);
 

	
 
      /**
 
        this is needed for the new frame finder to work
 
      
 
        Why the random constant numeral 11? --ohnobinki
 
      */
 
      geninfo->total_priority_pieces = geninfo->total_priority_pieces + job->priority;
 

	
 
      counter++;
 
    }
 

	
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     close elements and end document
 
  */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk
 
  */
 
  xmlFreeTextWriter(writer);
 
  return 0;
 
}
 

	
 
/* ************************** Test Functions ************************* */
 

	
 
/** Prints info about frames in a frameset */
 
int printFrameInfo(struct frameset *frame)
 
{
 
  char *status;
 

	
 
  status = NULL;
 

	
 
  switch(frame->status)
 
  {
 
    case FRAMESETSTATUS_UNASSIGNED:
 
      _distren_asprintf(&status, "%s", "unassigned");
 
      break;
 
    case FRAMESETSTATUS_ASSIGNED:
 
      _distren_asprintf(&status, "%s", "assigned");
 
      break;
 
    case FRAMESETSTATUS_DONE:
 
      _distren_asprintf(&status, "%s", "completed");
 
      break;
 
    case FRAMESETSTATUS_CANCELED:
 
      _distren_asprintf(&status, "%s", "canceled");
 
  }
 

	
 
  printf("frame #: %d --> %s\n", frame->num, status);
 
  free(status);
 

	
 
  return 0;
 
}
 

	
 
/** Prints information about all frames in a job */
 
int printJob(struct distrenjob *job)
 
{
 
  int counter;
 
  fprintf(stderr, "frame_num: status\n");
 
  for(counter = 0; counter < job->total_frames; counter++)
 
    {
 
      printFrameInfo(&job->frameset[counter]);
 
    }
 

	
 
  return 1;
 
}
 

	
 
/** Prints information about a job */
 
int printJobInfo(struct distrenjob *job)
 
{
 
  fprintf(stderr, "type: %d\n", job->type);
 
  fprintf(stderr, "name: %s\n", job->name);
 
  fprintf(stderr, "submitter: %s\n", job->submitter);
 
  fprintf(stderr, "priority: %d\n", job->priority);
 
  fprintf(stderr, "completed: %d\n", job->completed_frames);
 
  fprintf(stderr, "assigned: %d\n", job->assigned_frames);
 
  fprintf(stderr, "total: %d\n", job->total_frames);
 
  fprintf(stderr, "watchdog: %d\n", job->watchdog_forgiveness);
 
  fprintf(stderr, "hibernate: %d\n", job->hibernate);
 
  fprintf(stderr, "prev_frame: %d\n", job->prev_frame_index);
 
  fprintf(stderr, "render power: %ld\n", job->assigned_render_power);
 

	
 
  return 1;
 
}
 

	
 
/** Print all jobnums in the queue */
 
int printAllJobnums(struct distrenjob *head)
 
{
 
  struct distrenjob *current_job;
 
  int total_jobs;
 
  fprintf(stderr, "job numbers in the order they will be processed:\n");
 

	
 
  total_jobs = 0;
 
  for(current_job = head->next; current_job; current_job = current_job->next)
 
    {
 
      fprintf(stderr, "%d: %s\n", current_job->jobnum, current_job->name);
 
      total_jobs++;
 
    }
 

	
 
  fprintf(stderr, "\n%d jobs in queue\n\n", total_jobs);
 

	
 
  return 1;
 
}
 

	
 
/** Interactive test for the queuing system */
 
/* @QUEUE: Test uses methods not present in C code using mysql web-based system
 
int interactiveTest(int test, struct general_info general_info){
 
/* @QUEUE: Test uses methods not present in C code using mysql web-based system */
 
int interactiveTest(int test, struct general_info general_info, distrend_mysql_conn_t conn){
 
  int command;
 
  jobnum_t jobnum;
 
  struct distrenjob *tmp_job;
 
  struct frameset *tmp_frame;
 
  int type;
 
  char *name;
 
  char *submitter;
 
  char *email;
 
  int priority;
 
  int width;
 
  int height;
 
  int start_frame;
 
  int end_frame;
 
  size_t read_buf;
 
  fprintf(stderr,"Hello!\n");
 
  int32_t slaveKey = 0;
 
  int32_t jobKey = 0;
 
  int32_t frameNum = 0;
 
  int32_t newPriority = 0;
 
  int tmp = 0;
 

	
 
  while(test == 1)
 
   {
 
     fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
 
     fprintf(stderr, "\t1 \tPrint all frames in a job\n");
 
     fprintf(stderr, "\t2 \tExamine certain job\n");
 
     fprintf(stderr, "\t3 \tGet a frame to render\n");
 
     fprintf(stderr, "\t4 \tAdd a job\n");
 
     fprintf(stderr, "\t5 \tDelete a job\n");
 
     fprintf(stderr, "\t6 \tPrint jobnums in queue\n");
 
     fprintf(stderr, "\t7 \tPrint general info\n");
 
     fprintf(stderr, "\t8 \tQuit\n");
 
     fprintf(stderr, "\t1 \tGet a frame to render\n");
 
     fprintf(stderr, "\t2 \tChange job priority\n");
 
     fprintf(stderr, "\t3 \tSet frame finished\n");
 
     fprintf(stderr, "\t4 \tQuit\n");
 

	
 
     scanf("%d", &command);
 

	
 
     switch(command)
 
     {
 
     case 1:
 
       fprintf(stderr, "Job number: ");
 
       scanf("%d", &jobnum);
 
       printJob(distrenjob_get(&general_info.head, jobnum));
 
       fprintf(stderr, "Got frame: ");
 
       find_jobframe(conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr, "jobKey: %d, frameNum: %d\n",jobKey,frameNum);
 
       break;
 
     case 2:
 
       fprintf(stderr, "Job number: ");
 
       scanf("%d", &jobnum);
 
       printJobInfo(distrenjob_get(&general_info.head, jobnum));
 
       fprintf(stderr,"Job key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"New priority: ");
 
       scanf("%d", &tmp);
 
       newPriority = tmp;
 

	
 
       change_job_priority(conn, jobKey, newPriority);
 
       fprintf(stderr,"Changed!");
 
       break;
 
     case 3:
 
       general_info.total_render_power ++;
 
       if(!find_jobframe_again(&general_info, -1, 1, &tmp_job, &tmp_frame))
 
       {
 
           fprintf(stderr, "frame was found, details below\n");
 
           fprintf(stderr, "Job#:%d\n", tmp_job->jobnum);
 
           fprintf(stderr, "Frame#:%d\n", tmp_frame->num);
 
       }
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       finish_frame(conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Finished Frame!\n");
 
       break;
 
     case 4:
 
       name = NULL;
 
       submitter = NULL;
 
       email = NULL;
 

	
 
       fprintf(stderr, "\nType: \n\t 1 \t blender\n\t 2 \t povray\n"); scanf("%d", &type);
 
       fprintf(stderr, "\nName: ");        scanf("\n"); getline(&name, &read_buf, stdin);
 
       fprintf(stderr, "\nSubmitter: ");                getline(&submitter, &read_buf, stdin);
 
       fprintf(stderr, "\nEmail: ");                    getline(&email, &read_buf, stdin);
 
       fprintf(stderr, "\nPriority: ");                 scanf("%d", &priority);
 
       fprintf(stderr, "\nStart frame: ");              scanf("%d", &start_frame);
 
       fprintf(stderr, "\nEnd frame: ");                scanf("%d", &end_frame);
 
       fprintf(stderr, "\nWidth: ");                    scanf("%d", &width);
 
       fprintf(stderr, "\nHeight: ");                   scanf("%d", &height);
 
       prepare_distrenjob(&general_info, type, name, submitter, priority, start_frame, end_frame, width, height);
 
       return 0;
 
       break;
 
     case 5:
 
       fprintf(stderr, "\nJob number: ");
 
       scanf("%d", &jobnum);
 
       distrenjob_remove(&general_info, distrenjob_get(&general_info.head, jobnum));
 
       break;
 
     case 6:
 
       printAllJobnums(&general_info.head);
 
       break;
 
     case 7:
 
       fprintf(stderr, "\nHighest job number: %d", general_info.highest_jobnum);
 
       fprintf(stderr, "\nJobs in queue: %d", general_info.jobs_in_queue);
 
       fprintf(stderr, "\nTotal frames rendered: %d", general_info.total_frames_rendered);
 
       fprintf(stderr, "\nTimestamp: %lu", (long)general_info.timestamp);
 
       fprintf(stderr, "\nTotal priority pieces: %ld", general_info.total_priority_pieces);
 
       fprintf(stderr, "\nTotal render power: %ld\n", general_info.total_render_power);
 
       break;
 
     case 8:
 
       fprintf(stderr,"Goodbye.\n");
 
       test = 0;
 
     default:
 
       fprintf(stderr, "Invalid input, please try again.\n");
 
     }
 
  return 0;
 
   }
 
  return 0;
 
}
 
*/
 

	
 

	
 
/**
 
   @TODO: reuse for constructing data paths?
 
   constructs the filename for a distrenjob's serialized XML
 
   to be stored/retrieved from/in.
 
   @arg filename pointer to filename allocated using something
 
           malloc() and free() compatible. Must be free()ed by the
 
	   caller
 
   @return 0 on success
 
 */
 
int job_getserialfilename(char **filename, struct general_info *geninfo, unsigned int jobnum, int createdir)
 
{
 
  _distren_asprintf(filename, "%s/stor/job/%d/distrenjob.xml",
 
		    geninfo->config->datadir,
 
		    jobnum);
 
  if(!*filename)
 
    return 1;
 
  
 
  if(createdir)
 
    return distren_mkdir_recurse(*filename);
 
  
 
  return 0;
 
}
 

	
src/server/mysql.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
#include "mysql.h"
 
#include <mysql/mysql.h>
 

	
 
#include "common/asprintf.h"
 
#include "common/protocol.h"
 

	
 
#include <stdio.h>
 
#include <string.h>
 
#include <unistd.h>
 
#include <stdlib.h>
 
#include <sys/stat.h>
 
#include <fcntl.h>
 

	
 
/**
 
   local types
 
 */
 

	
 
#define SEVENTY_FIVE 75
 
#define FORTY_TWO 42
 

	
 
/**
 
   performs mysql query.
 
   errors will be logged to the user by this function.
 
   @return pointer to query handle on success, NULL on failure
 
 */
 
distrend_mysql_result_t mysqlQuery(distrend_mysql_conn_t conn, char *query);
 

	
 
/**
 
   frees mysql query result. Accepts a NULL pointer and ignores it to
 
   help deal with one-shot calls to mysqlQuery so that you don't have to
 
   check if it returned NULL or not.
 
   @return 0 on success
 
 */
 
int mysqlResultFree(distrend_mysql_result_t result);
 

	
 
/**
 
   reads an integer mysql field value
 
   @return 0 on success
 
*/
 
int distrend_mysql_getint(MYSQL_ROW row, MYSQL_FIELD_OFFSET column, int32_t *theint)
 
{
 
  *theint = atol(row[column]);
 

	
 
  return 0;
 
}
 

	
 

	
 
struct distrend_mysql_conn
 
{
 
  MYSQL *mysqlconn;
 
  short pointlesscheck;
 
};
 

	
 
struct distrend_mysql_result
 
{
 
  MYSQL_RES *mysqlresult;
 
  short pointlesscheck;
 
};
 

	
 
/**
 
    funcs
 
 */
 

	
 
int mysqlConnect(distrend_mysql_conn_t *conn)
 
{
 
  MYSQL *mysqlconn;
 

	
 
  char *server = "zserver1";
 
  char *server = "zserver2";
 
  char *user = "distren";
 
  char *password = "secretpassword";
 
  char *password = "HwLxuBqTvJ6J7FWj";
 
  char *database = "distren";
 

	
 
  mysqlconn = mysql_init(NULL);
 
  mysql_options(mysqlconn, MYSQL_OPT_RECONNECT,"true");
 

	
 
  if(!mysql_real_connect(mysqlconn, server, user, password, database, 0, NULL, 0))
 
    {
 
      fprintf(stderr, "%s\n", mysql_error(mysqlconn));
 
      return 1;
 
    }
 

	
 
  *conn = malloc(sizeof(struct distrend_mysql_conn));
 
  if(!*conn)
 
    {
 
      mysql_close(mysqlconn);
 
      mysql_server_end();
 
      return 2;
 
    }
 
  (*conn)->mysqlconn = mysqlconn;
 
  (*conn)->pointlesscheck = SEVENTY_FIVE;
 

	
 
  return 0;
 
}
 

	
 
int mysqlDisconnect(distrend_mysql_conn_t conn)
 
{
 
  /**
 
     check if this handle is valid
 
   */
 
  if(conn->pointlesscheck != SEVENTY_FIVE)
 
    fprintf(stderr, "warning, I was passed a bad struct distrend_mysql_conn...\n");
 

	
 
  /**
 
     invalidate handle :-D
 
   */
 
  conn->pointlesscheck ++;
 

	
 
  mysql_close(conn->mysqlconn);
 
  mysql_server_end();
 
  free(conn);
 

	
 
  return 0;
 
}
 

	
 

	
 
distrend_mysql_result_t mysqlQuery(distrend_mysql_conn_t conn, char *query)
 
{
 
  MYSQL_RES *result;
 
  distrend_mysql_result_t distrenresult;
 

	
 
  /**
 
     pointless sanity check
 
  */
 
  if(conn->pointlesscheck != SEVENTY_FIVE)
 
    fprintf(stderr, "warning, I was passed a bad struct distrend_mysql_conn...\n");
 

	
 
  /** make sure that connection is still alive
 
   */
 
  if(mysql_ping(conn->mysqlconn))
 
    fprintf(stderr, "MySQL Connection _was_ broken or may be broken, I'm not sure exactly what this return value means\n");
 

	
 
  if (mysql_query(conn->mysqlconn, query))
 
     fprintf(stderr, "%s\n", mysql_error(conn->mysqlconn));
 

	
 
  result = mysql_use_result(conn->mysqlconn);
 

	
 
  distrenresult = malloc(sizeof(struct distrend_mysql_result));
 
  if(!distrenresult)
 
    {
 
      mysql_free_result(result);
 
      return NULL;
 
    }
 

	
 
  distrenresult->mysqlresult = result;
 
  distrenresult->pointlesscheck = FORTY_TWO;
 

	
 
  return distrenresult;
 
}
 

	
 
int mysqlResultFree(distrend_mysql_result_t result)
 
{
 
  if(!result)
 
    return 0;
 

	
 
  if(result->pointlesscheck != FORTY_TWO)
 
    fprintf(stderr, "%s:%d: I didn't get the type of handle I wanted\n", __FILE__, __LINE__);
 

	
 
  mysql_free_result(result->mysqlresult);
 
  free(result);
 

	
 
  return 0;
 
}
 

	
 
/*
 
   Individual query functions:
 
 */
 

	
 
void finish_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  char *query;
 
  distrend_mysql_result_t result;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Complete`(%d,%d,%d);", slavekey, jobkey, framenum);
 

	
 
  result = mysqlQuery(conn, query);
 
  free(query);
 

	
 
  mysqlResultFree(result);
 
}
 

	
 
void start_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  char *query;
 
  distrend_mysql_result_t result;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Start`(%d,%d,%d);", slavekey, jobkey, framenum);
 

	
 
  result = mysqlQuery(conn, query);
 
  free(query);
 

	
 
  mysqlResultFree(result);
 
}
 

	
 
int change_job_priority(distrend_mysql_conn_t conn, int32_t jobkey, int32_t newpriority)
 
{
 
  char *query;
 
  distrend_mysql_result_t result;
 

	
 
  _distren_asprintf(&query, "UPDATE `distren`.`Job` SET `Priority`=%d WHERE `Job_Key`=%d",
 
		    newpriority, jobkey);
 
  result = mysqlQuery(conn, query);
 
  mysqlResultFree(result);
 

	
 
  return 0;
 
}
 

	
 
int find_jobframe(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  distrend_mysql_result_t result;
 
  char *query;
 
  MYSQL_ROW row;
 

	
 
  _distren_asprintf(&query, "`distren`.`Frame_Get`( %d, @Job_Key, @Frame_Key); SELECT @Job_Key, @Frame_Key",
 
		    slavekey);
 
  result = mysqlQuery(conn, query);
 
  free(query);
 

	
 
  if(!result)
 
    return 1;
 

	
 
  row = mysql_fetch_row(result->mysqlresult);
 
  if(!row)
 
    return 1;
 

	
 
  distrend_mysql_getint(row, 1, &jobkey); 
 
  distrend_mysql_getint(row, 2, &framenum);
 
  
 
  mysqlResultFree(result);
 

	
 
  return 0;
 
}
src/server/slavefuncs.c
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#include "distrenjob.h"
 
#include "slavefuncs.h"
 

	
 
#include "common/asprintf.h"
 
#include "common/execio.h"
 
#include "common/protocol.h"
 
#include "common/remoteio.h"
 

	
 
#include <curl/curl.h>
 
#include <curl/easy.h>
 
#include <curl/types.h>
 

	
 
#include <stdio.h>
 
#include <string.h>
 
#include <unistd.h>
 
#include <stdlib.h>
 
#include <sys/stat.h>
 
#include <fcntl.h>
 

	
 

	
 
/**
 
   Sends the server a single request (see protocol.h)
 
*/
 
int sendSignal(struct remoteio *rem, char signal)
 
{
 
  size_t written;
 
  size_t towrite;
 
  char *ssignal;
 

	
 
  _distren_asprintf(&ssignal, "%c", signal);
 
  towrite = strlen(ssignal);
 
  while( towrite
 
	 && !remoteio_write(rem, ssignal, towrite, &written))
 
    {
 
      fprintf(stderr, "Sending request...\n");
 
      towrite -= written;
 
    }
 
  if(written)
 
    return 0;
 

	
 
  /**
 
     if remoteio_write returned 1, the connection
 
     is probably dead or there was a real error
 
   */
 
  return 1;
 
}
 

	
 
/**
 
   Sends the server an extended signal (request + data)
 
*/
 
int sendExtSignal(struct remoteio *rem, char signal, char *data){
 
  size_t written;
 
  size_t towrite;
 
  char *ssignal;
 
  _distren_asprintf(&ssignal, "%c%s", signal, data); // Just append the data FIXME: We should do this differently
 
  towrite = strlen(ssignal);
 
  while( towrite
 
          && !remoteio_write(rem, ssignal, towrite, &written))
 
     {
 
       fprintf(stderr, "Sending request...\n");
 
       towrite -= written;
 
     }
 
   if(written)
 
     return 0;
 

	
 
   /**
 
      if remoteio_write returned 1, the connection
 
      is probably dead or there was a real error
 
    */
 
   return 1;
 
}
 

	
 

	
 

	
 
/**
 
   Grabs the xml DOM node reached by an XPath.
 

	
 
   @param path an XPath that lead to DOM node
 
   @return the first node associated with the path or NULL if there is no match
 
 */
 
xmlNodePtr xml_quickxpath(xmlXPathContextPtr xpathctxt, xmlChar *path)
 
{
 
  xmlNodePtr toreturn;
 

	
 
  xmlXPathObjectPtr xmlxpathobjptr;
 
  xmlxpathobjptr = xmlXPathEval(path, xpathctxt);
 
  if(!xmlxpathobjptr
 
     || !xmlxpathobjptr->nodesetval->nodeNr)
 
    {
 
      fprintf(stderr, "XPath resolution failed for ``%s'' in ``%s'' (``%s'')\n", path, xpathctxt->doc->name, xpathctxt->doc->URL);
 
      return (xmlNodePtr)NULL;
 
    }
 

	
 
  toreturn = *(xmlxpathobjptr->nodesetval->nodeTab);
 

	
 
  xmlXPathFreeObject(xmlxpathobjptr);
 

	
 
  return toreturn;
 
}
 

	
 

	
 
/** Stub for deleting job data from the disk. @TODO: unstubify me! */
 
int delete_jobdata(int jobnum, char *datadir)
 
{
 
  char *jobpath;
 
  _distren_asprintf(&jobpath, "%s/%d", datadir, jobnum);
 
  // rmdir(jobpath);
 
  fprintf(stderr, "Please manually remove %s. Automatic removal is currently not implemented.\n", jobpath);
 
  return 0;
 
}
 

	
 
/** Function referenced by curlget() to write data to disk. */
 
size_t curl_writetodisk(void *ptr, size_t size, size_t nmemb, FILE *stream)
 
 {
 
    return fwrite(ptr, size, nmemb, stream);
 
  }
 

	
 
/** Helper function for cURL's progress display */
 
int curl_progress( char *Bar,double t,double d,double ultotal,double ulnow)
 
{
 
fprintf(stderr,"Downloading: %f%% complete\r",d/t*100);
 
return 0;
 
}
 

	
 
/** Retrieves a URL with cURL and saves it to disk */
 
int curlget(char *url, char *out){
 
  fprintf(stderr,"Preparing to download %s",url);
 
  double *Bar; // Stores cURL progress display info
 
  CURL *curl;
 
  CURLcode res;
 
  FILE *outfile;
 

	
 
  curl = curl_easy_init();
 
  if(curl) {
 
	outfile = fopen(out, "w"); // Open where we're writing to
 

	
 
  curl_easy_setopt(curl, CURLOPT_URL, url);
 
  curl_easy_setopt(curl, CURLOPT_WRITEDATA, outfile);
 
  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_writetodisk); // this MUST be set for win32 compat.
 
  curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0);
 
  curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, curl_progress);
 
  curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, &Bar);
 
  res = curl_easy_perform(curl);
 
  curl_easy_cleanup(curl);
 
  }
 
  fprintf(stderr,"\n"); // Clears out he progressbar's carriage return
 
  return res; // 0 is OK, 1 is 404 or other error
 
}
 

	
 
/** Posts a file to a url with cUrl */
 
int curlpost(char *filename, char *url){
 
  char *targetname = "uploadedfile"; // Name of the target in the php file on the server (Don't change me unless you have different PHP code)
 
  CURL *curl;
 
  CURLcode res;
 
  struct curl_httppost *formpost=NULL;
 
  struct curl_httppost *lastptr=NULL;
 
  struct curl_slist *headerlist=NULL;
 
  static const char buf[] = "Expect:";
 

	
 
  curl_global_init(CURL_GLOBAL_ALL);
 

	
 
  /* upload field... */
 
  curl_formadd(&formpost,
 
               &lastptr,
 
               CURLFORM_COPYNAME, targetname,
 
               CURLFORM_FILE, filename,
 
               CURLFORM_END);
 
  /* filename field... */
 
  curl_formadd(&formpost,
 
               &lastptr,
 
               CURLFORM_COPYNAME, "filename",
 
               CURLFORM_COPYCONTENTS, filename,
 
               CURLFORM_END);
 
  /* submit field, not usually needed, just in case... */
 
  curl_formadd(&formpost,
 
               &lastptr,
 
               CURLFORM_COPYNAME, "submit",
 
               CURLFORM_COPYCONTENTS, "send",
 
               CURLFORM_END);
 

	
 
  curl = curl_easy_init();
 
  headerlist = curl_slist_append(headerlist, buf);
 
  if(curl) {
 
    /* Setting the URL to get the post, and the contents of the post */
 
    curl_easy_setopt(curl, CURLOPT_URL, url);
 
    curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost);
 
    res = curl_easy_perform(curl);
 

	
 
    curl_easy_cleanup(curl);
 
    /* cleanup the formpost junk */
 
    curl_formfree(formpost);
 
    curl_slist_free_all (headerlist);
 
  }
 
  return res;
 
}
 

	
 

	
 
/** Logs the user into the server after ensuring that keys exist */
 
int login_user(char *username)
 
{
 
  // @TODO: Put some telnet-style auth code here unless this is obselete
 
  return 1; // success
 
}
 

	
 
/** Replaces wordtoreplace with replacewith in conffile (relative to SYSCONFDIR) */
 
int conf_replace(char *conffile, char *wordtoreplace, char *replacewith){
 
  int maxlinelen = 120;
 
  char *fileOrig;
 
  char *fileRepl;
 
  _distren_asprintf(&fileOrig, "%s/%s", SYSCONFDIR, conffile);
 
  _distren_asprintf(&fileRepl, "%s%s.edited", SYSCONFDIR, conffile);
 
  char buffer[maxlinelen+2];
 
  char *buff_ptr, *find_ptr;
 
  FILE *fp1, *fp2;
 
  size_t find_len = strlen(wordtoreplace);
 
  fp1 = fopen(fileOrig,"r");
 
  fp2 = fopen(fileRepl,"w");
 
  if (fp1 ==NULL){
 
    fprintf(stderr, "%s doesn't exist\n",fileOrig);
 
    return 0;
 
  }
 
  else if(fp2 ==NULL){
 
    fprintf(stderr, "Can't write a file to disk! Check permissions.\n");
 
    return 0;
 
  }
 
  else{
 
    while(fgets(buffer,maxlinelen+2,fp1))
 
      {
 
        buff_ptr = buffer;
 
        while ((find_ptr = strstr(buff_ptr,wordtoreplace)))
 
        {
 
           while(buff_ptr < find_ptr)
 
             fputc((int)*buff_ptr++,fp2);
 
           fputs(replacewith,fp2);
 
           buff_ptr += find_len;
 
         }
 
         fputs(buff_ptr,fp2);
 
      }
 
    rename(fileRepl, fileOrig);
 
  }
 
  fclose(fp2);
 
  fclose(fp1);
 
  fprintf(stderr,"Wrote conf file...\n");
 
return 1; // Success
 
}
 

	
 

	
 
/* Executors */
 

	
 
/** Executor function for Blender operations */
 
int exec_blender(char *input, char *output, int frame)
 
{
 
  fprintf(stderr,"Preparing to execute...\n");
 
  int ret;
 
  char *frame_str;
 

	
 
  char *command = "blender"; // @TODO: We currently expect this to be in PATH
 
  char *cmd[] = { command, "-b", "-o", output, input, "-f", frame_str, (char *)NULL }; // arguments for blender
 

	
 
  char buf[10];
 
  struct execio *testrem;
 
  size_t readlen;
 

	
 
  _distren_asprintf(&frame_str, "%i", frame);
 

	
 
  fprintf(stderr,"Executing: %s\n", frame_str);
 
  ret = execio_open(&testrem, command, cmd);
 
  buf[9] = '\0';
 
  while(!execio_read(testrem, buf, 9, &readlen))
 
    {
 
      if(readlen > 9) {
 
        fprintf(stderr, "Something is terribly wrong!\n");
 
       }
 
       buf[readlen] = '\0';
 
       fprintf(stderr, "read \"%s\"\n", buf);
 
    }
 
  execio_close(testrem);
 

	
 
  return ret;
 
}
 

	
 
void xmlinit()
 
{
 
  xmlInitParser();
 
  xmlXPathInit();
 
}
 

	
 
void xmlcleanup()
 
{
 
  xmlCleanupParser();
 
}
 

	
 

	
 
/** Creates directories recursively */
 
int distren_mkdir_recurse(char *dirname)
 
{
 
  size_t counter;
 
  char *nextdir;
 

	
 
  nextdir = strdup(dirname);
 
  for(counter = 0; nextdir[counter]; counter ++)
 
    {
 
      /** @TODO OS-portabalize the path-separators */
 
      if(nextdir[counter] == '/')
 
        {
 
          nextdir[counter] = '\0';
 
          mkdir(nextdir, S_IRWXU | S_IRGRP | S_IROTH);
 
          nextdir[counter] = '/';
 
        }
 
    }
 

	
 
  return 0;
 
}
 

	
 
/**
 
 @TODO: Use for constructing path to job data locally and/or on data.distren.org
 
 */
 
int job_build_path(char *filename, unsigned int jobnum)
 
{
 
  return 0;
 
}
src/server/slavefuncs.h
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
#ifndef _DISTREN_SLAVEFUNCS_H
 
#define _DISTREN_SLAVEFUNCS_H
 

	
 
#include "distrenjob.h"
 

	
 
#include "common/remoteio.h"
 

	
 
#include <libxml/xpath.h>
 
#include <stdio.h>
 

	
 

	
 
struct msg;
 
int sendSignal(struct remoteio *rem, char signal);
 
int sendExtSignal(struct remoteio *rem, char signal, char *data);
 
xmlNodePtr xml_quickxpath(xmlXPathContextPtr xpathctxt, xmlChar *path);
 
int software_updatecheck();
 
int delete_jobdata(int jobnum, char *datadir);
 
size_t curl_writetodisk(void *ptr, size_t size, size_t nmemb, FILE *stream);
 
int curlget(char *url, char *out);
 
int curlpost(char *filename, char *url);
 
int ssh_keygen();
 
int register_user(char *username, char *email);
 
int login_user(char *username);
 
int conf_replace(char *conffile, char *wordtoreplace, char *replacewith);
 
int exec_blender(char *input, char *output, int frame);
 
void xmlinit();
 
void xmlcleanup();
 
int distren_mkdir_recurse(char *dirname);
 
int job_build_path(char *filename, unsigned int jobnum);
 

	
 
#endif
0 comments (0 inline, 0 general)