Changeset - a16eb3847e57
[Not reviewed]
default
0 3 0
Nathan Brink (binki) - 15 years ago 2010-07-31 01:12:43
ohnobinki@ohnopublishing.net
Quieter operation and dots to show upload progress for libdistren.
3 files changed with 17 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/client/libdistren.c
Show inline comments
 
@@ -37,180 +37,193 @@
 

	
 
/**
 
 * Handle common cleanup actions for distren_init().
 
 */
 
static void distren_init_cleanup(distren_t distren);
 

	
 
int distren_init(distren_t *handle)
 
{
 
  int tmp;
 

	
 
  struct distren_request *req;
 
  void *data;
 

	
 
  if(!handle)
 
    return 1;
 

	
 
  *handle = malloc(sizeof(struct distren));
 
  if(!*handle)
 
    return 1;
 

	
 
  memset(*handle, 0, sizeof(struct distren));
 

	
 
  /* now the environment is ready for general use */
 
  if(_distren_getoptions(*handle))
 
    {
 
      fprintf(stderr, "error getting configuration\n");
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 

	
 
  tmp = remoteio_open_server(&(*handle)->rem,
 
			     (*handle)->options->remoteio,
 
			     (remoteio_read_handle_func_t)&libdistren_remoteio_read_handle,
 
			     *handle,
 
			     (remoteio_close_handle_func_t)&libdistren_remoteio_close_handle,
 
			     (*handle)->server);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "error: unable to connect to server\n");
 

	
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 

	
 
  /* send off a DISTREN_REQUEST_VERSION to the server */
 
  tmp = distren_request_version(&req, &data, DISTREN_SERVERTYPE_CLIENT, PACKAGE_STRING);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "error: unable to allocate request");
 

	
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 
  distren_request_send((*handle)->rem, req, data);
 
  distren_request_free_with_data(req, data);
 

	
 
  while((*handle)->rem
 
	&& (*handle)->state == DISTREN_STATE_VERSION)
 
      multiio_poll((*handle)->multiio, 500);
 

	
 
  if(!(*handle)->rem)
 
    {
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
static void distren_init_cleanup(distren_t distren)
 
{
 
  if(distren->rem)
 
    remoteio_close(distren->rem);
 
  distren->rem = NULL;
 
  distren_free(distren);
 
}
 

	
 
#ifdef _WIN32
 
#define FOPEN_BINARY "b"
 
#else
 
#define FOPEN_BINARY ""
 
#endif
 
int distren_submit_file(distren_t handle, distren_job_t *job, const char *filename)
 
{
 
  FILE *in;
 
  char buf[DISTREN_REQUEST_FILE_POST_DATA_LEN];
 
  size_t len;
 
  void *data;
 

	
 
  struct distren_request *req;
 
  struct distren_request_file_post data_header;
 
  distren_request_file_post_context_t post_context;
 

	
 
  /* for basename() to play with */
 
  char *my_filename;
 

	
 
  size_t sendq_dots;
 

	
 
  errno = 0;
 
  in = fopen(filename, "r" FOPEN_BINARY);
 
  if(!in)
 
    {
 
      perror("fopen");
 
      return 1;
 
    }
 

	
 
  fprintf(stderr, "info: Starting upload of %s...\n", filename);
 

	
 
  /* prepare the server for a file upload */
 
  my_filename = strdup(filename);
 
  distren_request_file_post_start(&req,
 
				  &data,
 
				  &post_context,
 
				  handle->file_post_id,
 
				  basename(my_filename));
 
  free(my_filename);
 
  distren_request_send(handle->rem, req, data);
 
  distren_request_free_with_data(req, data);
 
  handle->file_post_id ++;
 

	
 
  /* send file body */
 
  while(in
 
	&& !ferror(in)
 
	&& !feof(in)
 
	&& handle->rem)
 
    {
 
      len = fread(buf, 1, DISTREN_REQUEST_FILE_POST_DATA_LEN, in);
 
      if(len == 0)
 
	continue;
 
      distren_request_file_post(&req,
 
				&data_header,
 
				post_context,
 
				buf,
 
				len);
 
      remoteio_write(handle->rem, req, sizeof(struct distren_request));
 
      /* should we check of handle->rem is NULL or not here...? */
 
      remoteio_write(handle->rem, &data_header, sizeof(struct distren_request_file_post));
 
      remoteio_write(handle->rem, buf, len);
 
      distren_request_free(req);
 

	
 
      /* ensure we have no more than a megabyte waiting to be sent. */
 
      while(handle->rem
 
	    && remoteio_sendq_len(handle->rem) / 3 > (1024 * 1024 / DISTREN_REQUEST_FILE_POST_DATA_LEN))
 
	    && (sendq_dots = remoteio_sendq_len(handle->rem) / 3) > (1024 * 1024 / DISTREN_REQUEST_FILE_POST_DATA_LEN))
 
	{
 
	  fprintf(stderr, "info: %d packets waiting to be sent...\n", remoteio_sendq_len(handle->rem) / 3);
 
	  multiio_poll(handle->multiio, -1);
 
	  if(handle->rem)
 
	    while(sendq_dots > remoteio_sendq_len(handle->rem) / 3)
 
	      {
 
		sendq_dots --;
 
		fputc('.', stderr);
 
	      }
 
	}
 
    }
 

	
 
  /*
 
   * declare the upload as finished, marking it as cancelled on I/O
 
   * error.
 
  */
 
  distren_request_file_post_finish(&req, &data, post_context, (!in || ferror(in)) ? 1 : 0);
 
  if(handle->rem)
 
    distren_request_send(handle->rem, req, data);
 
    distren_request_free_with_data(req, data);
 

	
 
  if(in)
 
    fclose(in);
 
  if(!handle->rem)
 
    return 1;
 

	
 
  /* let's block until the file's gone. */
 
  while(handle->rem
 
	&& remoteio_sendq_len(handle->rem))
 
	&& (sendq_dots = remoteio_sendq_len(handle->rem)))
 
    {
 
      fprintf(stderr, "info: %d packets waiting to be sent...\n", remoteio_sendq_len(handle->rem) / 3);
 
      multiio_poll(handle->multiio, -1);
 
      if(handle->rem)
 
	while(sendq_dots / 3 > remoteio_sendq_len(handle->rem) / 3)
 
	  {
 
	    sendq_dots -= 3;
 
	    fputc('.', stderr);
 
	  }
 
    }
 
  fputc('\n', stderr);
 
  fprintf(stderr, "info: %s successfully uploaded, as far as we know.\n", filename);
 

	
 
  return 0;
 
}
 

	
 
int distren_free(distren_t handle)
 
{
 
  if(handle->rem)
 
    remoteio_close(handle->rem);
 
  free(handle);
 
  return 0;
 
}
src/common/remoteio.c
Show inline comments
 
@@ -337,195 +337,192 @@ int _remoteio_handle_read(multiio_contex
 
   *
 
   * Call the read_handler. Set careful_free, see remoteio_close(), so
 
   * that rem->read_handler() may call remoteio_close() without
 
   * segfaulting us ;-).
 
   */
 
  rem->careful_free = 1;
 
  readlen = (*rem->read_handler)(rem, rem->opts->generic_handler_data, rem->inbuf.data, rem->inbuf.len, rem->read_handler_data);
 
  if(rem->careful_free == 2)
 
    {
 
      rem->careful_free = 0;
 
      remoteio_close(rem);
 

	
 
      return 0;
 
    }
 
  rem->careful_free = 0;
 

	
 
  memmove(rem->inbuf.data, rem->inbuf.data + readlen, rem->inbuf.len - readlen);
 
  rem->inbuf.len -= readlen;
 

	
 
  return 0;
 
}
 

	
 

	
 
int remoteio_write(struct remoteio *rem, const void *buf, size_t len)
 
{
 
  struct remoteio_packet *packet;
 
  struct pollfd pollfd;
 
  size_t bytes_written;
 
  int err;
 

	
 
  /**
 
   * This is probably about the only optimization that exists in
 
   * distren.... :-D
 
   *
 
   * Write to the client immediately if there are no other messages
 
   * waiting and if the client will accept it.
 
   */
 
  if(q_empty(rem->outmsgs))
 
    {
 
      pollfd.fd = rem->sock;
 
      pollfd.revents = POLLOUT;
 
      pollfd.events = 0;
 
      poll(&pollfd, 1, 0);
 
      if(pollfd.events & POLLOUT)
 
	{
 
	  err = funcmap[rem->method].write_func(rem, buf, len, &bytes_written);
 
	  if(bytes_written > 0)
 
	    {
 
	      len -= bytes_written;
 
	      buf += bytes_written;
 
	    }
 
	}
 
    }
 

	
 
  /**
 
   * zero length is easy... and might be possible if the above
 
   * optimization works ;-)
 
   */
 
  if(!len)
 
    return 0;
 

	
 
  packet = malloc(sizeof(struct remoteio_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  packet->len = len;
 
  packet->data = malloc(len);
 
  if(!packet->data)
 
    {
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, buf, len);
 

	
 
  q_enqueue(rem->outmsgs, packet, 0);
 
  multiio_socket_event_enable(rem->opts->multiio, rem->sock, POLLOUT);
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_handle_write(multiio_context_t multiio,
 
			   int fd,
 
			   short revent,
 
			   struct remoteio_opts *opts,
 
			   struct remoteio *rem)
 
{
 
  struct remoteio_packet *packet;
 
  size_t written_amount;
 

	
 
  int tmp;
 

	
 
  fprintf(stderr, "%s:%d: <del>My</del><ins>Someone else's</ins> traversal says that sock %d is available for writing\n",
 
	  __FILE__, __LINE__, fd);
 

	
 
  /*
 
   * check if we're out of stuff to write.
 
   */
 
  if(q_empty(rem->outmsgs))
 
    {
 
      multiio_socket_event_disable(multiio, fd, POLLOUT);
 
      return 0;
 
    }
 

	
 
  packet = q_front(rem->outmsgs);
 
  tmp = funcmap[rem->method].write_func(rem, packet->data, packet->len, &written_amount);
 

	
 
  /**
 
     Disconnect in case of write error.
 
  */
 
  if(tmp)
 
    {
 
      fprintf(stderr, __FILE__ ":%d: error handling for write() needs to be inserted into remoteio.... perhaps.. ;-)\n", __LINE__);
 
    }
 
  if(packet->len == written_amount)
 
    {
 
      q_dequeue(rem->outmsgs);
 
      remoteio_packet_free(packet);
 

	
 
      if(q_empty(rem->outmsgs))
 
	multiio_socket_event_disable(multiio, fd, POLLOUT);
 
    }
 
  else
 
    {
 
      /**
 
       * shifting seems the simplest solution.
 
       */
 
      packet->len -= written_amount;
 
      memmove(packet->data, packet->data + written_amount, packet->len);
 
    }
 

	
 
  return 0;
 
}
 

	
 

	
 
int remoteio_close(struct remoteio *rem)
 
{
 
  int rtn;
 
  
 
  /**
 
   * See careful_free's and _remoteio_handle_read()'s docs.  If
 
   * careful_free is nonzero, then we shouldn't free it here because
 
   * such a free would cause a segfault. However, whoever set
 
   * rem->careful_free to nonzero will handle resetting
 
   * rem->careful_free to zero and calling remoteio_close() if
 
   * necessary.
 
   */
 
  if(rem->careful_free)
 
    {
 
      rem->careful_free = 2;
 
      return 0;
 
    }
 

	
 
  /* call close handler */
 
  if(rem->close_handler)
 
    (*rem->close_handler)(rem->opts->generic_handler_data, rem->read_handler_data);
 

	
 
  /* cleanup multiiio stuff */
 
  multiio_socket_del(rem->opts->multiio, rem->sock);
 

	
 
  /* backend-specific cleanup */
 
  rtn = funcmap[rem->method].close_func(rem);
 

	
 
  /* this part is normal ;-) */
 
  free(rem->inbuf.data);
 
  q_free(rem->outmsgs, (list_dealloc_func_t)remoteio_packet_free);
 

	
 
  free(rem);
 

	
 
  return rtn;
 
}
 

	
 
/**
 
 * Frees an entire packet, including the passed pointer. If you just
 
 * want the contents of the packet free()ed, just do
 
 * free(packet.data);
 
 */
 
void remoteio_packet_free(struct remoteio_packet *packet)
 
{
 
  free(packet->data);
 
  free(packet); 
 
}
 

	
 

	
 
int _remoteio_getserver_traverse(char *servername, struct remoteio_server *aserver)
 
{
 
  if(!strcmp(aserver->name, servername))
 
    return FALSE; /* stop traversal */
 

	
 
  return TRUE;
 
}
src/server/mysql.c
Show inline comments
 
@@ -69,201 +69,198 @@ int distrend_mysql_getint(MYSQL_ROW row,
 
  return 0;
 
}
 

	
 

	
 
struct distrend_mysql_conn
 
{
 
  MYSQL *mysqlconn;
 
  short pointlesscheck;
 
};
 

	
 
struct distrend_mysql_result
 
{
 
  MYSQL_RES *mysqlresult;
 
  short pointlesscheck;
 
};
 

	
 
/**
 
    funcs
 
 */
 

	
 
int mysqlConnect(distrend_mysql_conn_t *conn, const char *user, const char *host, const char *pass, const char *database)
 
{
 
  MYSQL *mysqlconn;
 

	
 
  my_bool mybool;
 

	
 
  mysqlconn = mysql_init(NULL);
 

	
 
  if(!mysql_real_connect(mysqlconn, host, user, pass, database, 0, NULL, CLIENT_MULTI_STATEMENTS))
 
    {
 
      fprintf(stderr, "%s\n", mysql_error(mysqlconn));
 
      return 1;
 
    }
 

	
 
  /**
 
     Called after mysql_real_connect() due to bug fixed in MySQL 5.1.6 and later
 
  */
 
  mybool = 1;
 
  mysql_options(mysqlconn, MYSQL_OPT_RECONNECT, (char *)&mybool);
 

	
 

	
 
  *conn = malloc(sizeof(struct distrend_mysql_conn));
 
  if(!*conn)
 
    {
 
      mysql_close(mysqlconn);
 
      mysql_server_end();
 
      return 2;
 
    }
 
  (*conn)->mysqlconn = mysqlconn;
 
  (*conn)->pointlesscheck = SEVENTY_FIVE;
 

	
 
  return 0;
 
}
 

	
 
int mysqlDisconnect(distrend_mysql_conn_t conn)
 
{
 
  /**
 
     check if this handle is valid
 
   */
 
  if(conn->pointlesscheck != SEVENTY_FIVE)
 
    fprintf(stderr, "warning, I was passed a bad struct distrend_mysql_conn...\n");
 

	
 
  /**
 
     invalidate handle :-D
 
   */
 
  conn->pointlesscheck ++;
 

	
 
  mysql_close(conn->mysqlconn);
 
  mysql_server_end();
 
  free(conn);
 

	
 
  return 0;
 
}
 

	
 

	
 
distrend_mysql_result_t mysqlQuery(distrend_mysql_conn_t conn, char *query, MYSQL_FIELD_OFFSET expected_columns)
 
{
 
  MYSQL_RES *result;
 
  distrend_mysql_result_t distrenresult;
 
  MYSQL_FIELD_OFFSET num_columns;
 

	
 
  /**
 
     pointless sanity check
 
  */
 
  if(conn->pointlesscheck != SEVENTY_FIVE)
 
    fprintf(stderr, "warning, I was passed a bad struct distrend_mysql_conn...\n");
 

	
 
  /** make sure that connection is still alive
 
   */
 
  
 
  if(mysql_ping(conn->mysqlconn))
 
    {
 
      fprintf(stderr, "mysql_ping() failed: %s\n", mysql_error(conn->mysqlconn));
 
      return NULL;
 
    }
 

	
 
  fprintf(stderr,"Querying... ");
 
  if (mysql_query(conn->mysqlconn, query))
 
    {
 
      fprintf(stderr, "calling mysql_query() resulted in: %s\n", mysql_error(conn->mysqlconn));
 
      return NULL;
 
    }
 
  fprintf(stderr,"Queried!\n");
 

	
 
  fprintf(stderr,"Getting results... ");
 
  result = mysql_use_result(conn->mysqlconn);
 
  
 
  /**
 
     Flush stuff out when an empty set is expected.
 
   */
 
  if(!expected_columns)
 
    {
 
      if(result)
 
	{
 
	  while(mysql_fetch_row(result))
 
	    ;
 
	  mysql_free_result(result);
 
	}
 
      return NULL;
 
    }
 

	
 
  if(!result)
 
    {
 
      fprintf(stderr, "expected response/result for query ``%s'', got nothing\n", query);
 
      return NULL;
 
    }
 

	
 
  /**
 
     Sanity check
 
   */
 
  num_columns = mysql_num_fields(result);
 
  if(num_columns != expected_columns)
 
    {
 
      fprintf(stderr, "Expected %d columns, got %d for ``%s''\n", expected_columns, num_columns, query);
 
      
 
      mysql_free_result(result);
 
      return NULL;
 
    }
 
  fprintf(stderr, "Done!\n");
 

	
 
  /**
 
     Prepare data for return.
 
   */
 
  distrenresult = malloc(sizeof(struct distrend_mysql_result));
 
  if(!distrenresult)
 
    {
 
      while(mysql_fetch_row(result))
 
	;
 
        mysql_free_result(result);
 
      return NULL;
 
    }
 

	
 
  distrenresult->mysqlresult = result;
 
  distrenresult->pointlesscheck = FORTY_TWO;
 

	
 
  return distrenresult;
 
}
 

	
 
int mysqlResultFree(distrend_mysql_conn_t conn, distrend_mysql_result_t result)
 
{
 
  size_t counter;
 
  MYSQL_RES *mysqlresult;
 

	
 
  if(!result)
 
    {
 
      fprintf(stderr, "mysqlResultFree(): warning, passed NULL parameter\n");
 
      return 0;
 
    }
 
  
 
  if(result->pointlesscheck != FORTY_TWO)
 
    fprintf(stderr, "%s:%d: I didn't get the type of handle I wanted\n", __FILE__, __LINE__);
 

	
 
  mysqlresult = result->mysqlresult;
 
  free(result);
 

	
 
  /**
 
     Must flush the resultset buffer.
 
   */
 
  for(counter = 0;
 
      mysql_fetch_row(mysqlresult);
 
      counter ++)
 
    ;
 
  if(counter)
 
    fprintf(stderr, "Calling function did not flush %d rows\n", (int)counter);
 

	
 
  while(mysql_more_results(conn->mysqlconn))
 
    {
 
      fprintf(stderr, "flushing an extraneous result set\n");
 
      mysql_next_result(conn->mysqlconn);
 
      mysqlresult = mysql_use_result(conn->mysqlconn);
 
      if(mysqlresult)
 
	{
 
	  while(mysql_fetch_row(mysqlresult))
 
	    ;
 
	  mysql_free_result(mysqlresult);
 
	}
 
    }
 

	
 
  return 0;
 
}
 

	
0 comments (0 inline, 0 general)