Changeset - 12d457aff611
[Not reviewed]
default
0 3 0
Nathan Brink (binki) - 16 years ago 2010-02-18 22:23:35
ohnobinki@ohnopublishing.net
fix find_jobframe() call for real
3 files changed with 7 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/server/distrend.c
Show inline comments
 
@@ -72,193 +72,193 @@ struct general_info
 
  time_t timestamp;
 
  unsigned long total_render_power;
 
  unsigned long total_priority_pieces;
 
};
 

	
 

	
 
/* *********************************************
 
   Function Prototypes
 
   ********************************************* */
 

	
 
/* ************General Functions************* */
 
int distrend_do();
 
void frame_watchdog(struct distrenjob *distrenjob_head);
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config);
 
int distrend_config_free(struct distrend_config *config);
 

	
 
/* **************XML Functions**************** */
 
void update_general_info(struct general_info *geninfo);
 
int import_general_info(struct general_info *general_info);
 
int update_xml_joblist(struct general_info *geninfo);
 

	
 
/* **************Test Functions**************** */
 
int interactiveTest(int test, struct general_info general_info, distrend_mysql_conn_t conn);
 

	
 
/* **************** Main ********************* */
 
int main(int argc, char *argv[])
 
{
 
  /* Parse arguments */
 
  int counter;
 
  int test = 0; /*< Interactive mode if 1 */
 
  struct general_info general_info;
 
  struct distrend_clientset *clients;
 

	
 
  enum clientstatus
 
  {
 
    CLIENTSTATUS_UNINITIALIZED = 0,
 
    CLIENTSTATUS_BUSY = 1,
 
    CLIENTSTATUS_IDLE = 2
 
  } clientstatus;
 

	
 
  // xmlinit();
 

	
 
  for(counter = 0; counter < argc; counter ++)
 
    {
 
      if(strcmp(argv[counter], "-h") == 0)
 
      {
 
    	  fprintf(stderr, "Usage: distrend [option] \nStarts the distrend server\n\t-h\tshow this help\n\t-t\tlaunches queue testing interface \n");
 
	  return 2;
 
      }
 

	
 
      else if(strcmp(argv[counter], "-t") == 0)
 
      {
 
    	  fprintf(stderr, "Entering into test mode...\n\n");
 
    	  test = 1;
 
      }
 
    }
 

	
 

	
 
  if(distrend_do_config(argc, argv, &general_info.config))
 
    return 1;
 

	
 
  /** preset paths */
 
  _distren_asprintf(&general_info.files.geninfo, "%s/general_info.xml",
 
		    general_info.config->datadir);
 

	
 
  /** MySQL Connection */
 
  fprintf(stderr,"Connecting to mysql...\n");
 
  if(mysqlConnect(&general_info.conn))
 
    {
 
      fprintf(stderr, "%s:%d: mysqlConnect() failed\n", __FILE__, __LINE__);
 
      return 1;
 
    }
 
  fprintf(stderr,"Finished connecting!\n");
 

	
 
  /** Execute test function */
 
  interactiveTest(test, general_info, general_info.conn);
 

	
 
  distrend_listen(general_info.config, &clients);
 

	
 
  int slaveKey = 0; // Remotio should set me on a per-slave basis
 
  /* Main Loop */
 
  while(!general_info.config->die)
 
    {
 
      int clientsays = 0; /*< temporary example variable, will be replaced when we can handle messages */
 

	
 
      distrend_accept(general_info.config, clients);
 

	
 
      /* Make the following code more event-driven */
 
      frame_watchdog(&general_info.head);
 

	
 
      struct frameset *frame;
 
      struct distrenjob *job;
 

	
 
      /* If the client is idle, must be modified for climbing through linked list of clients (client->clientnum) */
 
      if(clientstatus == CLIENTSTATUS_IDLE)
 
	{
 
	  int returnnum = find_jobframe(general_info.conn, slaveKey, job->jobnum, frame->num); // Finds a frame to render @FIXME: Slavenum :D
 
	  int returnnum = find_jobframe(general_info.conn, slaveKey, &job->jobnum, &frame->num); // Finds a frame to render @FIXME: Slavenum :D
 
	  if(returnnum)
 
	    {
 
	      fprintf(stderr,"No frames are available to render at this time. Idling...\n");
 
	      sleep(10);
 
	    }
 
	  else
 
	    remotio_send_to_client(frame->num, job->jobnum); // Pseudo-sends data to client
 
	}
 
      /* If the client states that they finished the frame */
 
      	if(clientsays == DISTREN_REQUEST_DONEFRAME){
 
      	  clientstatus = CLIENTSTATUS_IDLE; // Sets the client back to idle
 
      	  finish_frame(general_info.conn, 0, job->jobnum, frame->num); // @TODO: Make sure this actually works.
 
      	}
 
    }
 

	
 
  distrend_unlisten(general_info.config->listens, clients);
 
  distrend_config_free(general_info.config);
 

	
 
  xmlcleanup();
 

	
 
  /** free() paths */
 
  free(general_info.files.geninfo);
 
  mysqlDisconnect(general_info.conn);
 

	
 
  return 0;
 
}
 

	
 
/* ********************** Functions ************************* */
 

	
 
/**
 
   Performs command stored in a client's request. @TODO: Fill stub
 
*/
 
int distrend_do()
 
{
 
  return 0;
 
}
 

	
 

	
 
/** Checks for dead, latent, or stale slaves */
 
void frame_watchdog(struct distrenjob *distrenjob_head)
 
{
 
  // Replace with mysqlness, maybe
 
  // iterate through jobs
 
    // if the job has been started, checks by seeing if either to first or second frame has been started
 
    // FRAMESETSTATUS_UNASSIGNED
 
      // iterate through all frames for this job:
 
          //watchdog_forgiveness = seconds of forgiveness before frame is re-assigned:
 
             //   If frame is not completed within the number of seconds specified by watchdog_forgiveness
 
             //   Then change the frame status to unassigned
 
}
 

	
 

	
 
/* Grabs config info from confs */
 
int distrend_do_config(int argc, char *argv[], struct distrend_config **config)
 
{
 
  unsigned int counter;
 

	
 
  cfg_opt_t myopts_listen[] =
 
    {
 
      CFG_SIMPLE_STR("type", NULL),
 
      CFG_SIMPLE_STR("path", NULL),
 
      CFG_SIMPLE_INT("port", NULL),
 
      CFG_END()
 
    };
 
  cfg_opt_t myopts[] =
 
    {
 
      CFG_SEC("listen",  /* this must be imported into struct listens (which must still be declared) */
 
          myopts_listen,
 
          CFGF_MULTI),
 
      CFG_SIMPLE_STR("datadir", NULL),
 
      CFG_STR_LIST("render_types", NULL, CFGF_NONE),
 
      CFG_END()
 
    };
 

	
 
  cfg_t *cfg_listen;
 

	
 
  fprintf(stderr, "%s:%d: running config\n", __FILE__, __LINE__);
 
  *config = malloc(sizeof(struct distrend_config));
 
  myopts[1].simple_value = &(*config)->datadir;
 

	
 
  if(options_init(argc, argv, &(*config)->mycfg, myopts, "daemon", &(*config)->options))
 
    return 1;
 

	
 
  /**
 
     grab listen blocks:
 
   */
 
  (*config)->listens = malloc(sizeof(struct distrend_listen) * (cfg_size((*config)->mycfg, "listen") + 1));
 
  for(counter = 0; counter < cfg_size((*config)->mycfg, "listen"); counter ++)
 
    {
 
      cfg_listen = cfg_getnsec((*config)->mycfg, "listen", counter);
 
      (*config)->listens[counter].port = cfg_getint(cfg_listen, "port");
 
      (*config)->listens[counter].sock = -1;
 
    }
 
  memset(&(*config)->listens[counter], '\0', sizeof(struct distrend_listen));
 

	
 
  fprintf(stderr, "using %s as datadir\n", (*config)->datadir);
 
@@ -329,175 +329,175 @@ int import_general_info(struct general_i
 
  cur = cur->next;
 
  general_info->total_finished_jobs = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->total_frames_rendered = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 
  cur = cur->next;
 
  general_info->highest_jobnum = atoi((char*)xmlNodeListGetString(doc, cur->xmlChildrenNode, 1));
 

	
 
  general_info->hibernate = 0;
 
  general_info->free_clients = 0;
 
  general_info->rendering_clients = 0;
 
  general_info->timestamp = 0;
 
  general_info->total_render_power = 0;
 
  general_info->total_priority_pieces = 0;
 

	
 
  xmlFreeDoc(doc);
 

	
 
  return 0;
 
}
 

	
 

	
 
/**
 
   updates job_list.xml which lists all the jobs in the queue
 
   @return 0 means success
 
*/
 
// @QUERY: Likely obselete (don't remove at request of ohnobinki)
 
int update_xml_joblist(struct general_info *geninfo)
 
{
 
  struct distrenjob *job;
 
  xmlTextWriterPtr writer;
 
  char *tmp;
 
  int counter;
 

	
 
  /**
 
     update timestamp
 
  */
 
  geninfo->timestamp ++;
 
  if(geninfo->timestamp > 65530)
 
    geninfo->timestamp = 0;
 

	
 
  _distren_asprintf(&tmp, "%s/job_list.xml",
 
                    geninfo->config->datadir);
 
  writer = xmlNewTextWriterFilename(tmp, 0);
 
  free(tmp);
 

	
 
  xmlTextWriterStartDocument(writer, NULL, "utf-8", NULL);
 

	
 
  /**
 
     create root element job_list
 
  */
 
  xmlTextWriterStartElement(writer, (xmlChar*)"job_list");
 

	
 
  _distren_asprintf(&tmp, "%d", geninfo->timestamp);
 
  xmlTextWriterWriteAttribute(writer, (xmlChar*)"timestamp", (xmlChar*)tmp);
 
  free(tmp);
 

	
 
  geninfo->total_priority_pieces = 0;
 
  counter = 0;
 
  for(job = geninfo->head.next; job; job = job->next)
 
    {
 
      _distren_asprintf(&tmp, "%d", job->jobnum);
 
      xmlTextWriterWriteElement(writer, (xmlChar*)"jobnum", (xmlChar*)tmp);
 
      free(tmp);
 

	
 
      /**
 
        this is needed for the new frame finder to work
 
      
 
        Why the random constant numeral 11? --ohnobinki
 
      */
 
      geninfo->total_priority_pieces = geninfo->total_priority_pieces + job->priority;
 

	
 
      counter++;
 
    }
 

	
 
  xmlTextWriterEndElement(writer);
 

	
 
  /**
 
     close elements and end document
 
  */
 
  xmlTextWriterEndDocument(writer);
 

	
 
  /**
 
     free writer and save xml file to disk
 
  */
 
  xmlFreeTextWriter(writer);
 
  return 0;
 
}
 

	
 
/* ************************** Test Functions ************************* */
 

	
 

	
 
/** Interactive test for the queuing system */
 
/* @QUEUE: Test uses methods not present in C code using mysql web-based system */
 
int interactiveTest(int test, struct general_info general_info, distrend_mysql_conn_t conn){
 
  int command;
 
  fprintf(stderr,"Hello!\n");
 
  int32_t slaveKey = 1;
 
  int32_t jobKey = 0;
 
  jobnum_t jobKey = 0;
 
  int32_t frameNum = 0;
 
  int32_t newPriority = 0;
 
  int tmp = 0;
 

	
 
  while(test == 1)
 
   {
 
     fprintf(stderr, "Welcome to DistRen Alpha Interactive Test Mode\n\n");
 
     fprintf(stderr, "\t1 \tGet a frame to render\n");
 
     fprintf(stderr, "\t2 \tChange job priority\n");
 
     fprintf(stderr, "\t3 \tSet frame finished\n");
 
     fprintf(stderr, "\t4 \tSet frame started\n");
 
     fprintf(stderr, "\t5 \tQuit\n");
 

	
 
     scanf("%d", &command);
 

	
 
     switch(command)
 
     {
 
     case 1:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &slaveKey);
 
       fprintf(stderr, "Got frame: ");
 
       find_jobframe(conn, slaveKey, &jobKey, &frameNum);
 
       fprintf(stderr, "jobKey: %d, frameNum: %d\n",jobKey,frameNum);
 
       break;
 
     case 2:
 
       fprintf(stderr,"Job key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"New priority: ");
 
       scanf("%d", &tmp);
 
       newPriority = tmp;
 

	
 
       change_job_priority(conn, jobKey, newPriority);
 
       fprintf(stderr,"Changed!");
 
       break;
 
     case 3:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       finish_frame(conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Finished Frame!\n");
 
       break;
 
     case 4:
 
       fprintf(stderr,"Slave Key: ");
 
       scanf("%d", &tmp);
 
       slaveKey = tmp;
 

	
 
       fprintf(stderr,"Job Key: ");
 
       scanf("%d", &tmp);
 
       jobKey = tmp;
 

	
 
       fprintf(stderr,"Frame Number: ");
 
       scanf("%d", &tmp);
 
       frameNum = tmp;
 

	
 
       start_frame(conn, slaveKey, jobKey, frameNum);
 
       fprintf(stderr,"Started Frame!\n");
 
       break;
 
     case 5:
 
       test = 0;
 
       break;
 
     default:
 
       fprintf(stderr, "Invalid input, please try again.\n");
 
       break;
 
     }
 
   }
 
  return 0;
 
}
src/server/mysql.c
Show inline comments
 
@@ -151,126 +151,126 @@ distrend_mysql_result_t mysqlQuery(distr
 
  
 
  if(mysql_ping(conn->mysqlconn))
 
    fprintf(stderr, "MySQL Connection _was_ broken or may be broken, I'm not sure exactly what this return value means\n");
 

	
 

	
 
  fprintf(stderr,"Querying... ");
 
  if (mysql_query(conn->mysqlconn, query))
 
     fprintf(stderr, "%s\n", mysql_error(conn->mysqlconn));
 
  fprintf(stderr,"Queried!\n");
 

	
 
  fprintf(stderr,"Getting results... ");
 
  result = mysql_use_result(conn->mysqlconn);
 
  fprintf(stderr,"Done!\n");
 

	
 
  distrenresult = malloc(sizeof(struct distrend_mysql_result));
 
  if(!distrenresult)
 
    {
 
      mysql_free_result(result);
 
      return NULL;
 
    }
 

	
 
  distrenresult->mysqlresult = result;
 
  distrenresult->pointlesscheck = FORTY_TWO;
 

	
 
  return distrenresult;
 
}
 

	
 
int mysqlResultFree(distrend_mysql_result_t result)
 
{
 
  size_t counter;
 

	
 
  if(!result)
 
    return 0;
 

	
 
  if(result->pointlesscheck != FORTY_TWO)
 
    fprintf(stderr, "%s:%d: I didn't get the type of handle I wanted\n", __FILE__, __LINE__);
 

	
 
  /**
 
     Must flush the resultset buffer.
 
   */
 
  for(counter = 0;
 
      mysql_fetch_row(result->mysqlresult);
 
      counter ++)
 
    ;
 
  if(counter)
 
    fprintf(stderr, "Calling function did not flush all of the rows it should've\n");
 

	
 
  mysql_free_result(result->mysqlresult);
 
  free(result);
 

	
 
  return 0;
 
}
 

	
 
/*
 
   Individual query functions:
 
 */
 

	
 
void finish_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  char *query;
 
  distrend_mysql_result_t result;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Complete`(%d,%d,%d);", slavekey, jobkey, framenum);
 

	
 
  result = mysqlQuery(conn, query);
 
  free(query);
 

	
 
  mysqlResultFree(result);
 
}
 

	
 
void start_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  char *query;
 
  distrend_mysql_result_t result;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Start`(%d,%d,%d);", slavekey, jobkey, framenum);
 

	
 
  result = mysqlQuery(conn, query);
 
  free(query);
 

	
 
  mysqlResultFree(result);
 
}
 

	
 
int change_job_priority(distrend_mysql_conn_t conn, int32_t jobkey, int32_t newpriority)
 
{
 
  char *query;
 
  distrend_mysql_result_t result;
 

	
 
  _distren_asprintf(&query, "UPDATE `distren`.`Job` SET `Priority`=%d WHERE `Job_Key`=%d",
 
		    newpriority, jobkey);
 
  result = mysqlQuery(conn, query);
 
  mysqlResultFree(result);
 

	
 
  return 0;
 
}
 

	
 
int find_jobframe(distrend_mysql_conn_t conn, int32_t slavekey, int32_t *jobkey, int32_t *framenum)
 
int find_jobframe(distrend_mysql_conn_t conn, int32_t slavekey, jobnum_t *jobkey, int32_t *framenum)
 
{
 
  distrend_mysql_result_t result;
 
  char *query;
 
  MYSQL_ROW row;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Get`(%d);", slavekey);
 

	
 
  result = mysqlQuery(conn, query);
 
  free(query);
 

	
 
  if(!result)
 
    return 1;
 

	
 
  row = mysql_fetch_row(result->mysqlresult);
 
  if(!row)
 
    {
 
      mysqlResultFree(result);
 
      return 1;
 
    }
 

	
 
  distrend_mysql_getint(row, 0, jobkey);
 
  distrend_mysql_getint(row, 0, (int32_t *)jobkey);
 
  distrend_mysql_getint(row, 1, framenum);
 

	
 

	
 
  
 
  mysqlResultFree(result);
 

	
 
  return 0;
 
}
src/server/mysql.h
Show inline comments
 
/*
 
  Copyright 2009 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 

	
 
#ifndef MYSQL_H_
 
#define MYSQL_H_
 

	
 
#include "distrenjob.h"
 

	
 
#include <stdint.h>
 

	
 
struct distrend_mysql_conn;
 
struct distrend_mysql_result;
 

	
 
typedef struct distrend_mysql_conn *distrend_mysql_conn_t;
 
typedef struct distrend_mysql_result *distrend_mysql_result_t;
 

	
 
/**
 
   initiates a MySQL connection
 
   @param conn, pointer will be set to the struct
 
   @return 0 on success
 
 */
 
int mysqlConnect(distrend_mysql_conn_t *conn);
 

	
 
/**
 
   cleans and disconnects MySQL connection
 
   @param conn connection to clean
 
   @return 0 on success
 
*/
 
int mysqlDisconnect(distrend_mysql_conn_t conn);
 

	
 
/**
 
   Mark a frame as finished in the database and calculate the seconds spent on that frame.
 
 */
 
void finish_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum);
 

	
 
/**
 
   Mark a frame as started in the database and save the time at which it started.
 
 */
 
void start_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum);
 

	
 
/**
 
   Changes the priority of an existing (and maybe running) job. @arg head I may end up changing the head if job == head
 
*/
 
int change_job_priority(distrend_mysql_conn_t conn, int32_t jobkey, int32_t newpriority);
 

	
 
/**
 
  Frame Finder: Finds a frame for a slave to render
 
*/
 
int find_jobframe(distrend_mysql_conn_t conn, int32_t slaveKey, int32_t *jobKey, int32_t *frameNum);
 
int find_jobframe(distrend_mysql_conn_t conn, int32_t slaveKey, jobnum_t *jobKey, int32_t *frameNum);
 

	
 

	
 

	
 

	
 
#endif /* MYSQL_H_ */
0 comments (0 inline, 0 general)