Changeset - a16eb3847e57
[Not reviewed]
default
0 3 0
Nathan Brink (binki) - 15 years ago 2010-07-31 01:12:43
ohnobinki@ohnopublishing.net
Quieter operation and dots to show upload progress for libdistren.
3 files changed with 18 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/client/libdistren.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink, Ethan Zonca, Matt Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 
*/
 

	
 
/*
 
  Implementation of distren_* functions from distren.h excluding distren_job_* functions.
 
 */
 

	
 
#include "common/config.h"
 
#include "common/options.h"
 
#include "common/protocol.h"
 
#include "common/remoteio.h"
 
#include "common/request.h"
 

	
 
#include "libdistren.h"
 

	
 
#include <errno.h>
 
#include <libgen.h>
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 

	
 
/**
 
 * Handle common cleanup actions for distren_init().
 
 */
 
static void distren_init_cleanup(distren_t distren);
 

	
 
int distren_init(distren_t *handle)
 
{
 
  int tmp;
 

	
 
  struct distren_request *req;
 
  void *data;
 

	
 
  if(!handle)
 
    return 1;
 

	
 
  *handle = malloc(sizeof(struct distren));
 
  if(!*handle)
 
    return 1;
 

	
 
  memset(*handle, 0, sizeof(struct distren));
 

	
 
  /* now the environment is ready for general use */
 
  if(_distren_getoptions(*handle))
 
    {
 
      fprintf(stderr, "error getting configuration\n");
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 

	
 
  tmp = remoteio_open_server(&(*handle)->rem,
 
			     (*handle)->options->remoteio,
 
			     (remoteio_read_handle_func_t)&libdistren_remoteio_read_handle,
 
			     *handle,
 
			     (remoteio_close_handle_func_t)&libdistren_remoteio_close_handle,
 
			     (*handle)->server);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "error: unable to connect to server\n");
 

	
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 

	
 
  /* send off a DISTREN_REQUEST_VERSION to the server */
 
  tmp = distren_request_version(&req, &data, DISTREN_SERVERTYPE_CLIENT, PACKAGE_STRING);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "error: unable to allocate request");
 

	
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 
  distren_request_send((*handle)->rem, req, data);
 
  distren_request_free_with_data(req, data);
 

	
 
  while((*handle)->rem
 
	&& (*handle)->state == DISTREN_STATE_VERSION)
 
      multiio_poll((*handle)->multiio, 500);
 

	
 
  if(!(*handle)->rem)
 
    {
 
      distren_init_cleanup(*handle);
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
static void distren_init_cleanup(distren_t distren)
 
{
 
  if(distren->rem)
 
    remoteio_close(distren->rem);
 
  distren->rem = NULL;
 
  distren_free(distren);
 
}
 

	
 
#ifdef _WIN32
 
#define FOPEN_BINARY "b"
 
#else
 
#define FOPEN_BINARY ""
 
#endif
 
int distren_submit_file(distren_t handle, distren_job_t *job, const char *filename)
 
{
 
  FILE *in;
 
  char buf[DISTREN_REQUEST_FILE_POST_DATA_LEN];
 
  size_t len;
 
  void *data;
 

	
 
  struct distren_request *req;
 
  struct distren_request_file_post data_header;
 
  distren_request_file_post_context_t post_context;
 

	
 
  /* for basename() to play with */
 
  char *my_filename;
 

	
 
  size_t sendq_dots;
 

	
 
  errno = 0;
 
  in = fopen(filename, "r" FOPEN_BINARY);
 
  if(!in)
 
    {
 
      perror("fopen");
 
      return 1;
 
    }
 

	
 
  fprintf(stderr, "info: Starting upload of %s...\n", filename);
 

	
 
  /* prepare the server for a file upload */
 
  my_filename = strdup(filename);
 
  distren_request_file_post_start(&req,
 
				  &data,
 
				  &post_context,
 
				  handle->file_post_id,
 
				  basename(my_filename));
 
  free(my_filename);
 
  distren_request_send(handle->rem, req, data);
 
  distren_request_free_with_data(req, data);
 
  handle->file_post_id ++;
 

	
 
  /* send file body */
 
  while(in
 
	&& !ferror(in)
 
	&& !feof(in)
 
	&& handle->rem)
 
    {
 
      len = fread(buf, 1, DISTREN_REQUEST_FILE_POST_DATA_LEN, in);
 
      if(len == 0)
 
	continue;
 
      distren_request_file_post(&req,
 
				&data_header,
 
				post_context,
 
				buf,
 
				len);
 
      remoteio_write(handle->rem, req, sizeof(struct distren_request));
 
      /* should we check of handle->rem is NULL or not here...? */
 
      remoteio_write(handle->rem, &data_header, sizeof(struct distren_request_file_post));
 
      remoteio_write(handle->rem, buf, len);
 
      distren_request_free(req);
 

	
 
      /* ensure we have no more than a megabyte waiting to be sent. */
 
      while(handle->rem
 
	    && remoteio_sendq_len(handle->rem) / 3 > (1024 * 1024 / DISTREN_REQUEST_FILE_POST_DATA_LEN))
 
	    && (sendq_dots = remoteio_sendq_len(handle->rem) / 3) > (1024 * 1024 / DISTREN_REQUEST_FILE_POST_DATA_LEN))
 
	{
 
	  fprintf(stderr, "info: %d packets waiting to be sent...\n", remoteio_sendq_len(handle->rem) / 3);
 
	  multiio_poll(handle->multiio, -1);
 
	  if(handle->rem)
 
	    while(sendq_dots > remoteio_sendq_len(handle->rem) / 3)
 
	      {
 
		sendq_dots --;
 
		fputc('.', stderr);
 
	      }
 
	}
 
    }
 

	
 
  /*
 
   * declare the upload as finished, marking it as cancelled on I/O
 
   * error.
 
  */
 
  distren_request_file_post_finish(&req, &data, post_context, (!in || ferror(in)) ? 1 : 0);
 
  if(handle->rem)
 
    distren_request_send(handle->rem, req, data);
 
    distren_request_free_with_data(req, data);
 

	
 
  if(in)
 
    fclose(in);
 
  if(!handle->rem)
 
    return 1;
 

	
 
  /* let's block until the file's gone. */
 
  while(handle->rem
 
	&& remoteio_sendq_len(handle->rem))
 
	&& (sendq_dots = remoteio_sendq_len(handle->rem)))
 
    {
 
      multiio_poll(handle->multiio, -1);
 
      if(handle->rem)
 
	while(sendq_dots / 3 > remoteio_sendq_len(handle->rem) / 3)
 
    {
 
      fprintf(stderr, "info: %d packets waiting to be sent...\n", remoteio_sendq_len(handle->rem) / 3);
 
      multiio_poll(handle->multiio, -1);
 
	    sendq_dots -= 3;
 
	    fputc('.', stderr);
 
    }
 
    }
 
  fputc('\n', stderr);
 
  fprintf(stderr, "info: %s successfully uploaded, as far as we know.\n", filename);
 

	
 
  return 0;
 
}
 

	
 
int distren_free(distren_t handle)
 
{
 
  if(handle->rem)
 
    remoteio_close(handle->rem);
 
  free(handle);
 
  return 0;
 
}
src/common/remoteio.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
#include "common/config.h"
 

	
 
#include "common/libremoteio.h"
 
#include "common/execio.h"
 
#include "common/asprintf.h"
 

	
 
#include <list.h>
 

	
 
#include <errno.h>
 
#ifndef _WIN32
 
#include <netdb.h>
 
#endif
 
#include <poll.h>
 
#include <stdlib.h>
 
#include <stdio.h>
 
#include <string.h>
 
#include <sys/types.h>
 
#ifdef _WIN32
 
#include <winsock2.h>
 
#include <ws2tcpip.h>
 
#else
 
#include <sys/socket.h>
 
#endif
 
#include <unistd.h>
 
#include <queue.h>
 

	
 
#ifndef _WIN32
 
#include <sys/un.h>
 
#endif
 

	
 
/* local */
 

	
 
#define REMOTEIO_DEFAULT_PORT "4050"
 

	
 
int _remoteio_handle_write(multiio_context_t multiio,
 
			   int fd,
 
			   short revent,
 
			   struct remoteio_opts *opts,
 
			   struct remoteio *rem);
 
int _remoteio_handle_read(multiio_context_t multiio,
 
			  int fd,
 
			  short revent,
 
			  struct remoteio_opts *opts,
 
			  struct remoteio *rem);
 

	
 
int _remoteio_ssh_open(struct remoteio *rem, struct remoteio_server *server);
 
int _remoteio_ssh_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread);
 
int _remoteio_ssh_write(struct remoteio *rem, const void *buf, size_t len, size_t *byteswritten);
 
int _remoteio_ssh_close(struct remoteio *rem);
 

	
 
/**
 
   ``named pipes''
 
 */
 
#ifndef _WIN32
 
int _remoteio_sock_open(struct remoteio *rem, struct remoteio_server *server);
 
int _remoteio_sock_close(struct remoteio *rem);
 
#endif
 
int _remoteio_sock_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread);
 
int _remoteio_sock_write(struct remoteio *rem, const void *buf, size_t len, size_t *byteswritten);
 

	
 
/**
 
   These borrow from _remoteio_sock_read() and _remoteio_sock_write().
 
 */
 
int _remoteio_tcp_open(struct remoteio *rem, struct remoteio_server *server);
 
int _remoteio_tcp_close(struct remoteio *rem);
 

	
 
/**
 
  lookup table for different methods of remoteio:
 
  the enum remoteio_method is the index of the entry to use for that method. 
 
  Regardless, a NULL terminator is required because the configuration function
 
  searches through this table for the method specified in the config file.
 
*/
 
struct remoteio_method_funcmap funcmap[] = 
 
  {
 
    /* [REMOTEIO_METHOD_SSH] */
 
    {REMOTEIO_METHOD_SSH, &_remoteio_ssh_open, &_remoteio_ssh_read, &_remoteio_ssh_write, &_remoteio_ssh_close, "ssh"},
 
#ifndef _WIN32
 
    {REMOTEIO_METHOD_UNIX, &_remoteio_sock_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close, "unix"},
 
#endif
 
    {REMOTEIO_METHOD_TCP, &_remoteio_tcp_open, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_tcp_close, "tcp"},
 
    {REMOTEIO_METHOD_SOCKET, NULL, &_remoteio_sock_read, &_remoteio_sock_write, &_remoteio_sock_close},
 
    {REMOTEIO_METHOD_MAX, NULL, NULL, NULL, NULL, NULL}
 
  };
 

	
 
struct remoteio_server *remoteio_getserver(const struct remoteio_opts *opts, const char *servername);
 

	
 
void remoteio_packet_free(struct remoteio_packet *packet);
 

	
 
int remoteio_config(cfg_t *cfg, struct remoteio_opts *opts)
 
{
 
  size_t numservers;
 
  size_t counter, counter2;
 
  static int haslisted_methods = 0;
 
  
 
  struct remoteio_server aserver;
 

	
 
  multiio_socket_type_register(opts->multiio, &opts->socket_type);
 

	
 
  multiio_event_handler_register(opts->multiio,
 
				 opts->socket_type,
 
				 POLLOUT,
 
				 (multiio_event_handler_func_t)&_remoteio_handle_write,
 
				 opts);
 
  multiio_event_handler_register(opts->multiio,
 
				 opts->socket_type,
 
				 POLLIN,
 
				 (multiio_event_handler_func_t)&_remoteio_handle_read,
 
				 opts);
 

	
 
  opts->servers = list_init();
 
  if(!opts->servers)
 
    {
 
      fprintf(stderr, "@todo cleanup!\n");
 
      abort();
 
    }
 
  
 
  numservers = cfg_size(cfg, "server");
 
  for(counter = 0; counter < numservers; counter ++)
 
    {
 
      cfg_t *cfg_aserver;
 
      char *method;
 
      
 
      cfg_aserver = cfg_getnsec(cfg, "server", counter);
 
      
 
      aserver.name = strdup(cfg_title(cfg_aserver));
 
      aserver.hostname = strdup(cfg_getstr(cfg_aserver, "hostname"));
 
      aserver.username = strdup(cfg_getstr(cfg_aserver, "username"));
 

	
 
      aserver.method = REMOTEIO_METHOD_MAX;
 
      method = cfg_getstr(cfg_aserver, "method");
 
      for(counter2 = 0; funcmap[counter2].name; counter2 ++)
 
	if(strcmp(method, funcmap[counter2].name) == 0)
 
	  aserver.method = funcmap[counter2].method;
 
      if(aserver.method == REMOTEIO_METHOD_MAX)
 
	{
 
	  fprintf(stderr, "No such method as %s\n", method);
 
	  if(!haslisted_methods)
 
	    {
 
	      fprintf(stderr, "Available methods:\n");
 
	      for(counter2 = 0; funcmap[counter2].name; counter2 ++)
 
		fprintf(stderr, "\t%s\n", funcmap[counter2].name);
 
	      
 
	      haslisted_methods ++;
 
	    }
 
	  abort();
 
	}
 
      list_insert_after(opts->servers, &aserver, sizeof(struct remoteio_server));
 
    }
 
  
 
  return 0;
 
}
 

	
 
int remoteio_generic_data_set(struct remoteio_opts *opts, void *generic_data)
 
{
 
  opts->generic_handler_data = generic_data;
 

	
 
  return 0;
 
}
 

	
 
int remoteio_open_common(struct remoteio **remoteio,
 
			 enum remoteio_method method,
 
			 struct remoteio_opts *opts,
 
			 remoteio_read_handle_func_t read_handler,
 
			 void *read_handler_data,
 
			 remoteio_close_handle_func_t close_handler)
 
{
 
  struct remoteio *rem;
 

	
 
  rem = malloc(sizeof(struct remoteio));
 
  if(!rem)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 2;
 
    }
 

	
 
  *remoteio = rem;
 

	
 
  rem->execio = NULL;
 
  rem->method = method;
 
  rem->opts = opts;
 
  rem->inbuf.data = NULL;
 
  rem->inbuf.len = 0;
 
  rem->outmsgs = q_init();
 
  rem->read_handler = read_handler;
 
  rem->read_handler_data = read_handler_data;
 
  rem->close_handler = close_handler;
 
  /*
 
   * the following initialization is very important... though the
 
   * others are too, I suppose :-p. See remoteio_close()
 
   */
 
  rem->careful_free = 0;
 

	
 
  return 0;
 
}
 

	
 
int remoteio_open_socket(struct remoteio **remoteio,
 
			 struct remoteio_opts *opts,
 
			 remoteio_read_handle_func_t read_handler,
 
			 void *read_handler_data,
 
			 remoteio_close_handle_func_t close_handler,
 
			 int fd)
 
{
 
  struct remoteio *rem;
 
  if(remoteio_open_common(remoteio, REMOTEIO_METHOD_SOCKET, opts, read_handler, read_handler_data, close_handler))
 
    return 1;
 
  rem = *remoteio;
 

	
 
  rem->sock = fd;
 
  multiio_socket_add(opts->multiio, rem->sock, opts->socket_type, rem, POLLIN);
 

	
 
  return 0;
 
}
 

	
 
int remoteio_open_server(struct remoteio **remoteio,
 
			 struct remoteio_opts *opts,
 
			 remoteio_read_handle_func_t read_handler,
 
			 void *read_handler_data,
 
			 remoteio_close_handle_func_t close_handler,
 
			 const char *servername)
 
{
 
  struct remoteio_server *theserver;
 
  struct remoteio *rem;
 

	
 
  int tmp;
 

	
 
  if(!opts)
 
    {
 
      fprintf(stderr, "%s:%d: no null opts!\n\tThis is a bug, please report it (after making sure it isn't already reported)\n", __FILE__, __LINE__);
 
      return 1;
 
    }
 

	
 
  theserver = remoteio_getserver(opts, servername);
 
  if(!theserver)
 
    {
 
      fprintf(stderr, "%s:%d: Could not find server named ``%s''\n", __FILE__, __LINE__, servername);
 
      return 1;
 
    }
 

	
 
  if(theserver->method >= REMOTEIO_METHOD_MAX
 
     || theserver->method < 0)
 
    {
 
      fprintf(stderr, "%s:%d: Unsupported remoteio method %d\n\tThis is a bug, probably indicating memory corruption. This is, of course, probably my fault (not your hardware's) ;-)\n", __FILE__, __LINE__, theserver->method);
 
      return 1;
 
    }
 

	
 
  if(remoteio_open_common(remoteio, theserver->method, opts, read_handler, read_handler_data, close_handler))
 
    return 1;
 
  rem = *remoteio;
 

	
 
  tmp = funcmap[theserver->method].open_func(rem, theserver);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "Error using method %s for server ``%s''", funcmap[theserver->method].name, servername);
 
      free(rem->inbuf.data);
 
      q_free(rem->outmsgs, QUEUE_NODEALLOC);
 
      free(rem);
 
      *remoteio = NULL;
 
      return tmp;
 
    }
 

	
 
  /**
 
   * @todo make this code slightly more generic... able to handle
 
   * execio's multi-sockets by letting execio register itself with
 
   * multiio instead of us registering here perhaps
 
   */
 
  multiio_socket_add(opts->multiio, rem->sock, opts->socket_type, rem, POLLIN);
 
  
 
  return 0;
 
}
 

	
 
/**
 
 * Implementation of multiio_event_handler_func_t
 
 */
 
int _remoteio_handle_read(multiio_context_t multiio,
 
			  int fd,
 
			  short revent,
 
			  struct remoteio_opts *opts,
 
			  struct remoteio *rem)
 
{
 
  struct remoteio_packet packet;
 
  size_t readlen;
 
  char buf[8192];
 

	
 
  int tmp;
 

	
 
  packet.len = 0;
 
  packet.data = NULL;
 

	
 
  if(rem->sock != fd)
 
    fprintf(stderr, "%d != %d\n", rem->sock, fd);
 

	
 
  tmp = funcmap[rem->method].read_func(rem, buf, sizeof(buf), &readlen);
 
  if(tmp)
 
    {
 
      remoteio_close(rem);
 
      return 1;
 
    }
 

	
 
  /* expand the input buffer */
 
  packet.len = rem->inbuf.len + readlen;
 
  packet.data = malloc(rem->inbuf.len + readlen);
 
  if(!packet.data)
 
    {
 
      fprintf(stderr, "OOM!\n");
 

	
 
      return 1;
 
    }
 
  if(rem->inbuf.data)
 
    memcpy(packet.data, rem->inbuf.data, rem->inbuf.len);
 
  memcpy(packet.data + rem->inbuf.len, buf, readlen);
 
  free(rem->inbuf.data);
 
  memcpy(&rem->inbuf, &packet, sizeof(struct remoteio_packet));
 

	
 
  /*
 
   * readlen wil now keeps track of how many bytes the handler
 
   * function has read.
 
   *
 
   * Call the read_handler. Set careful_free, see remoteio_close(), so
 
   * that rem->read_handler() may call remoteio_close() without
 
   * segfaulting us ;-).
 
   */
 
  rem->careful_free = 1;
 
  readlen = (*rem->read_handler)(rem, rem->opts->generic_handler_data, rem->inbuf.data, rem->inbuf.len, rem->read_handler_data);
 
  if(rem->careful_free == 2)
 
    {
 
      rem->careful_free = 0;
 
      remoteio_close(rem);
 

	
 
      return 0;
 
    }
 
  rem->careful_free = 0;
 

	
 
  memmove(rem->inbuf.data, rem->inbuf.data + readlen, rem->inbuf.len - readlen);
 
  rem->inbuf.len -= readlen;
 

	
 
  return 0;
 
}
 

	
 

	
 
int remoteio_write(struct remoteio *rem, const void *buf, size_t len)
 
{
 
  struct remoteio_packet *packet;
 
  struct pollfd pollfd;
 
  size_t bytes_written;
 
  int err;
 

	
 
  /**
 
   * This is probably about the only optimization that exists in
 
   * distren.... :-D
 
   *
 
   * Write to the client immediately if there are no other messages
 
   * waiting and if the client will accept it.
 
   */
 
  if(q_empty(rem->outmsgs))
 
    {
 
      pollfd.fd = rem->sock;
 
      pollfd.revents = POLLOUT;
 
      pollfd.events = 0;
 
      poll(&pollfd, 1, 0);
 
      if(pollfd.events & POLLOUT)
 
	{
 
	  err = funcmap[rem->method].write_func(rem, buf, len, &bytes_written);
 
	  if(bytes_written > 0)
 
	    {
 
	      len -= bytes_written;
 
	      buf += bytes_written;
 
	    }
 
	}
 
    }
 

	
 
  /**
 
   * zero length is easy... and might be possible if the above
 
   * optimization works ;-)
 
   */
 
  if(!len)
 
    return 0;
 

	
 
  packet = malloc(sizeof(struct remoteio_packet));
 
  if(!packet)
 
    {
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  packet->len = len;
 
  packet->data = malloc(len);
 
  if(!packet->data)
 
    {
 
      free(packet);
 
      fprintf(stderr, "OOM\n");
 
      return 1;
 
    }
 

	
 
  memcpy(packet->data, buf, len);
 

	
 
  q_enqueue(rem->outmsgs, packet, 0);
 
  multiio_socket_event_enable(rem->opts->multiio, rem->sock, POLLOUT);
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_handle_write(multiio_context_t multiio,
 
			   int fd,
 
			   short revent,
 
			   struct remoteio_opts *opts,
 
			   struct remoteio *rem)
 
{
 
  struct remoteio_packet *packet;
 
  size_t written_amount;
 

	
 
  int tmp;
 

	
 
  fprintf(stderr, "%s:%d: <del>My</del><ins>Someone else's</ins> traversal says that sock %d is available for writing\n",
 
	  __FILE__, __LINE__, fd);
 

	
 
  /*
 
   * check if we're out of stuff to write.
 
   */
 
  if(q_empty(rem->outmsgs))
 
    {
 
      multiio_socket_event_disable(multiio, fd, POLLOUT);
 
      return 0;
 
    }
 

	
 
  packet = q_front(rem->outmsgs);
 
  tmp = funcmap[rem->method].write_func(rem, packet->data, packet->len, &written_amount);
 

	
 
  /**
 
     Disconnect in case of write error.
 
  */
 
  if(tmp)
 
    {
 
      fprintf(stderr, __FILE__ ":%d: error handling for write() needs to be inserted into remoteio.... perhaps.. ;-)\n", __LINE__);
 
    }
 
  if(packet->len == written_amount)
 
    {
 
      q_dequeue(rem->outmsgs);
 
      remoteio_packet_free(packet);
 

	
 
      if(q_empty(rem->outmsgs))
 
	multiio_socket_event_disable(multiio, fd, POLLOUT);
 
    }
 
  else
 
    {
 
      /**
 
       * shifting seems the simplest solution.
 
       */
 
      packet->len -= written_amount;
 
      memmove(packet->data, packet->data + written_amount, packet->len);
 
    }
 

	
 
  return 0;
 
}
 

	
 

	
 
int remoteio_close(struct remoteio *rem)
 
{
 
  int rtn;
 
  
 
  /**
 
   * See careful_free's and _remoteio_handle_read()'s docs.  If
 
   * careful_free is nonzero, then we shouldn't free it here because
 
   * such a free would cause a segfault. However, whoever set
 
   * rem->careful_free to nonzero will handle resetting
 
   * rem->careful_free to zero and calling remoteio_close() if
 
   * necessary.
 
   */
 
  if(rem->careful_free)
 
    {
 
      rem->careful_free = 2;
 
      return 0;
 
    }
 

	
 
  /* call close handler */
 
  if(rem->close_handler)
 
    (*rem->close_handler)(rem->opts->generic_handler_data, rem->read_handler_data);
 

	
 
  /* cleanup multiiio stuff */
 
  multiio_socket_del(rem->opts->multiio, rem->sock);
 

	
 
  /* backend-specific cleanup */
 
  rtn = funcmap[rem->method].close_func(rem);
 

	
 
  /* this part is normal ;-) */
 
  free(rem->inbuf.data);
 
  q_free(rem->outmsgs, (list_dealloc_func_t)remoteio_packet_free);
 

	
 
  free(rem);
 

	
 
  return rtn;
 
}
 

	
 
/**
 
 * Frees an entire packet, including the passed pointer. If you just
 
 * want the contents of the packet free()ed, just do
 
 * free(packet.data);
 
 */
 
void remoteio_packet_free(struct remoteio_packet *packet)
 
{
 
  free(packet->data);
 
  free(packet); 
 
}
 

	
 

	
 
int _remoteio_getserver_traverse(char *servername, struct remoteio_server *aserver)
 
{
 
  if(!strcmp(aserver->name, servername))
 
    return FALSE; /* stop traversal */
 

	
 
  return TRUE;
 
}
 

	
 
struct remoteio_server *remoteio_getserver(const struct remoteio_opts *opts, const char *servername)
 
{
 
  int traversal_result;
 
  char *dispensible_servername;
 

	
 
  dispensible_servername = strdup(servername); /* for the sake of constness... */
 
  traversal_result = list_traverse(opts->servers, dispensible_servername, (list_traverse_func_t)&_remoteio_getserver_traverse, LIST_FRNT | LIST_ALTR);
 
  free(dispensible_servername);
 

	
 
  if(traversal_result == LIST_OK)
 
    return (struct remoteio_server *)list_curr(opts->servers);
 

	
 
  return (struct remoteio_server *)NULL;
 
}
 

	
 
size_t remoteio_sendq_len(const struct remoteio *rem)
 
{
 
  return (size_t)q_size(rem->outmsgs);
 
}
 

	
 
/**
 
   different remoteio methods' implementations:
 
 */
 

	
 
/*
 
  SSH, via execio
 
*/
 

	
 
int _remoteio_ssh_open(struct remoteio *rem, struct remoteio_server *server)
 
{
 
  char *userhost;
 
  char *sshargs[] = {rem->opts->ssh_command, NULL /* userhost */, "distrend", "-d", (char *)NULL};
 

	
 
  int rtn;
 

	
 
  if(server->username)
 
    _distren_asprintf(&userhost, "%s@%s", server->username, server->hostname);
 
  else
 
    userhost = strdup(server->hostname);
 
  sshargs[1] = userhost;
 

	
 
  rtn = execio_open( &rem->execio, "ssh", sshargs);
 
  if(rtn)
 
    {
 
      fprintf(stderr, "error opening remoteio channel to ssh userhost ``%s''\n" , userhost);
 
      free(userhost);
 
      return 1;
 
    }
 
  free(userhost);
 
  
 
  return 0;
 
}
 

	
 
int _remoteio_ssh_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread)
 
{
 
  return execio_read(rem->execio, buf, len, bytesread);
 
}
 

	
 
int _remoteio_ssh_write(struct remoteio *rem, const void *buf, size_t len, size_t *byteswritten)
 
{
 
  return execio_write(rem->execio, buf, len, byteswritten);
 
}
 

	
 
int _remoteio_ssh_close(struct remoteio *rem)
 
{
 
  int rtn;
 
  
 
  rtn = execio_close(rem->execio);
 
  if(rtn)
 
    fprintf(stderr, "%s:%d: error closing execio\n", __FILE__, __LINE__);
 
  
 
  return rtn;
 
}
 

	
 
#ifndef _WIN32
 
/*
 
  local sockets implementation (``named pipes''), unix-only
 
 */
 
int _remoteio_sock_open(struct remoteio *rem, struct remoteio_server *server)
 
{
 
  int sock;
 
  struct sockaddr_un sockaddr;
 

	
 
  /*
 
    The POSIX docs pretty much say that I can't depend on sockpath being able to be longer than 
 
    some proprietary length. So, if the compiler specifies a long path for RUNSTATEDIR, it could
 
    cause a buffer overflow.
 
   */
 
  char *sockpath = RUNSTATEDIR "/distrend.sock";
 
  unsigned int sockaddr_len;
 

	
 
  sock = socket(AF_UNIX, SOCK_STREAM, 0);
 
  if(sock == -1)
 
    {
 
      perror("socket");
 
      return 1;
 
    }
 

	
 
  sockaddr.sun_family = AF_UNIX;
 
  /*
 
    The terminating NULL should not be included in what's copied to sun_path,
 
    although it won't hurt as long as strlen(sockpath) < max socket length
 
   */
 
  for(sockaddr_len = 0; sockpath[sockaddr_len]; sockaddr_len ++)
 
    sockaddr.sun_path[sockaddr_len] = sockpath[sockaddr_len];
 

	
 
  if(connect(sock, (struct sockaddr *)&sockaddr, sockaddr_len) == -1)
 
    {
 
      perror("connect");
 
      close(sock);
 
      return 1;
 
    }
 

	
 
  rem->sock = sock;
 

	
 
  return 0;
 
}
 

	
 
#endif /* _WIN32 */
 

	
 
int _remoteio_sock_close(struct remoteio *rem)
 
{
 
  close(rem->sock);
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_sock_read(struct remoteio *rem, void *buf, size_t len, size_t *bytesread)
 
{
 
  ssize_t readrtn;
 

	
 
  *bytesread = 0;
 
  readrtn = read(rem->sock, buf, len);
 
  /*
 
    The following is valid for blocking sockets:
 
   */
 
  if(readrtn == -1)
 
    {
 
      /*
 
	in this case, we may have been interrupted by a signal and errno == EINTR
 
	or the connection was reset and errno = ECONNRESET
 

	
 
	Some of these are not error conditions:
 
       */
 
      perror("read");
 

	
 
      if(errno != EINTR)
 
	{
 
	  fprintf(stderr, "error reading socket in remoteio_sock_read\n");
 
	  return 1;
 
	}
 

	
 
      return 0;
 
    }
 

	
 
  *bytesread = readrtn;
 
  if(!readrtn)
 
    {
 
      /*
 
	means EOF except when FD is in ``message-nondiscard'' or ``message-discard''
 
	modes.
 
       */
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_sock_write(struct remoteio *rem, const void *buf, size_t len, size_t *byteswritten)
 
{
 
  ssize_t writertn;
 

	
 
  writertn = write(rem->sock, buf, len);
 

	
 
  if(writertn == -1)
 
    {
 
      perror("write");
 
      if(errno != EINTR)
 
	{
 
	  fprintf(stderr, "error writing to socket in remoteio_sock_write()\n");
 
	  return 1;
 
	}
 
    }
 

	
 
  *byteswritten = writertn;
 
  if(!writertn)
 
    {
 
      /*
 
	should we consider this an error? I'm pretty
 
	sure we should :-/
 
       */
 
      fprintf(stderr, "write() returned 0 in remoteio_sock_write()\n");
 
      return 1;
 
    }
 

	
 
  return 0;
 
}
 

	
 
/**
 
   TCP, taking advantage of the two generic socks read/write functions:
 
 */
 
int _remoteio_tcp_open(struct remoteio *rem, struct remoteio_server *server)
 
{
 
  int tmp;
 
  int tmp2;
 

	
 
  int sock;
 

	
 
  char *hostname;
 
  char *port;
 

	
 
  struct addrinfo addrinfo_hints;
 
  struct addrinfo *addrinfo_res;
 

	
 
  static char *default_port = REMOTEIO_DEFAULT_PORT;
 

	
 
  /**
 
     only hostname should be free()-ed, not port,
 
     because both are from the same block of malloc()-ed
 
     memory
 
   */
 
  hostname = strdup(server->hostname);
 
  for(port = hostname;
 
      *port && *port != ':';
 
      port ++)
 
    ;
 
  if(*port)
 
    {
 
      *port = '\0';
 
      port ++;
 
    }
 
  else
 
    port = default_port;
 

	
 
  memset(&addrinfo_hints, '\0', sizeof(struct addrinfo));
 
  addrinfo_hints.ai_family = AF_UNSPEC;
 
#ifdef _WIN32
 
  /* windows lacks stuff documented in POSIX, I guess :-( */
 
  addrinfo_hints.ai_flags = 0;
 
#else
 
  addrinfo_hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
 
#endif
 
  addrinfo_hints.ai_socktype = SOCK_STREAM;
 

	
 
  tmp = getaddrinfo(hostname, port, &addrinfo_hints, &addrinfo_res);
 
  if(tmp)
 
    {
 
      fprintf(stderr, "error resolving %s:%s: %s\n", server->hostname, port, gai_strerror(tmp));
 
      free(hostname);
 
      return 1;
 
    }
 
  fprintf(stderr, "connecting to %s[%s]:%s\n", server->hostname, addrinfo_res->ai_canonname, port);
 

	
 
  free(hostname);
 

	
 
  sock = socket(addrinfo_res->ai_family, SOCK_STREAM, addrinfo_res->ai_protocol);
 
  if(sock == -1)
 
    {
 
      perror("socket");
 
      freeaddrinfo(addrinfo_res);
 
  
 
      return 1;
 
    }
 

	
 
  tmp = connect(sock, addrinfo_res->ai_addr, addrinfo_res->ai_addrlen);
 
  tmp2 = errno;
 
  freeaddrinfo(addrinfo_res);
 
  errno = tmp2;
 
  if(tmp == -1)
 
    {
 
      perror("connect");
 
      close(sock);
 

	
 
      return 1;
 
    }
 

	
 
  rem->sock = sock;
 

	
 
  return 0;
 
}
 

	
 
int _remoteio_tcp_close(struct remoteio *rem)
 
{
 
  close(rem->sock);
 

	
 
  return 0;
 
}
src/server/mysql.c
Show inline comments
 
/*
 
  Copyright 2010 Nathan Phillip Brink, Ethan Zonca, Matthew Orlando
 

	
 
  This file is a part of DistRen.
 

	
 
  DistRen is free software: you can redistribute it and/or modify
 
  it under the terms of the GNU Affero General Public License as published by
 
  the Free Software Foundation, either version 3 of the License, or
 
  (at your option) any later version.
 

	
 
  DistRen is distributed in the hope that it will be useful,
 
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  GNU Affero General Public License for more details.
 

	
 
  You should have received a copy of the GNU Affero General Public License
 
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
*/
 

	
 
#include "common/config.h"
 

	
 
#include "mysql.h"
 
#include <mysql/mysql.h>
 

	
 
#include "common/asprintf.h"
 
#include "common/protocol.h"
 

	
 
#include <stdio.h>
 
#include <string.h>
 
#include <unistd.h>
 
#include <stdlib.h>
 
#include <sys/stat.h>
 
#include <fcntl.h>
 

	
 
/**
 
   local types
 
 */
 

	
 
#define SEVENTY_FIVE 75
 
#define FORTY_TWO 42
 

	
 
/**
 
   performs mysql query.
 
   errors will be logged to the user by this function.
 
   @return pointer to query handle on success, NULL on failure or if expected_columns=0 (no result set expected)
 
 */
 
distrend_mysql_result_t mysqlQuery(distrend_mysql_conn_t conn, char *query, MYSQL_FIELD_OFFSET expected_columns);
 

	
 
/**
 
   frees mysql query result. Accepts a NULL pointer and ignores it to
 
   help deal with one-shot calls to mysqlQuery so that you don't have to
 
   check if it returned NULL or not.
 
   @return 0 on success
 
 */
 
int mysqlResultFree(distrend_mysql_conn_t conn, distrend_mysql_result_t result);
 

	
 
/**
 
   reads an integer mysql field value
 
   @return 0 on success
 
*/
 
int distrend_mysql_getint(MYSQL_ROW row, MYSQL_FIELD_OFFSET column, int32_t *theint)
 
{
 
  if(!row[column])
 
    return 1;
 

	
 
  *theint = atol(row[column]);
 

	
 
  return 0;
 
}
 

	
 

	
 
struct distrend_mysql_conn
 
{
 
  MYSQL *mysqlconn;
 
  short pointlesscheck;
 
};
 

	
 
struct distrend_mysql_result
 
{
 
  MYSQL_RES *mysqlresult;
 
  short pointlesscheck;
 
};
 

	
 
/**
 
    funcs
 
 */
 

	
 
int mysqlConnect(distrend_mysql_conn_t *conn, const char *user, const char *host, const char *pass, const char *database)
 
{
 
  MYSQL *mysqlconn;
 

	
 
  my_bool mybool;
 

	
 
  mysqlconn = mysql_init(NULL);
 

	
 
  if(!mysql_real_connect(mysqlconn, host, user, pass, database, 0, NULL, CLIENT_MULTI_STATEMENTS))
 
    {
 
      fprintf(stderr, "%s\n", mysql_error(mysqlconn));
 
      return 1;
 
    }
 

	
 
  /**
 
     Called after mysql_real_connect() due to bug fixed in MySQL 5.1.6 and later
 
  */
 
  mybool = 1;
 
  mysql_options(mysqlconn, MYSQL_OPT_RECONNECT, (char *)&mybool);
 

	
 

	
 
  *conn = malloc(sizeof(struct distrend_mysql_conn));
 
  if(!*conn)
 
    {
 
      mysql_close(mysqlconn);
 
      mysql_server_end();
 
      return 2;
 
    }
 
  (*conn)->mysqlconn = mysqlconn;
 
  (*conn)->pointlesscheck = SEVENTY_FIVE;
 

	
 
  return 0;
 
}
 

	
 
int mysqlDisconnect(distrend_mysql_conn_t conn)
 
{
 
  /**
 
     check if this handle is valid
 
   */
 
  if(conn->pointlesscheck != SEVENTY_FIVE)
 
    fprintf(stderr, "warning, I was passed a bad struct distrend_mysql_conn...\n");
 

	
 
  /**
 
     invalidate handle :-D
 
   */
 
  conn->pointlesscheck ++;
 

	
 
  mysql_close(conn->mysqlconn);
 
  mysql_server_end();
 
  free(conn);
 

	
 
  return 0;
 
}
 

	
 

	
 
distrend_mysql_result_t mysqlQuery(distrend_mysql_conn_t conn, char *query, MYSQL_FIELD_OFFSET expected_columns)
 
{
 
  MYSQL_RES *result;
 
  distrend_mysql_result_t distrenresult;
 
  MYSQL_FIELD_OFFSET num_columns;
 

	
 
  /**
 
     pointless sanity check
 
  */
 
  if(conn->pointlesscheck != SEVENTY_FIVE)
 
    fprintf(stderr, "warning, I was passed a bad struct distrend_mysql_conn...\n");
 

	
 
  /** make sure that connection is still alive
 
   */
 
  
 
  if(mysql_ping(conn->mysqlconn))
 
    {
 
      fprintf(stderr, "mysql_ping() failed: %s\n", mysql_error(conn->mysqlconn));
 
      return NULL;
 
    }
 

	
 
  fprintf(stderr,"Querying... ");
 
  if (mysql_query(conn->mysqlconn, query))
 
    {
 
      fprintf(stderr, "calling mysql_query() resulted in: %s\n", mysql_error(conn->mysqlconn));
 
      return NULL;
 
    }
 
  fprintf(stderr,"Queried!\n");
 

	
 
  fprintf(stderr,"Getting results... ");
 
  result = mysql_use_result(conn->mysqlconn);
 
  
 
  /**
 
     Flush stuff out when an empty set is expected.
 
   */
 
  if(!expected_columns)
 
    {
 
      if(result)
 
	{
 
	  while(mysql_fetch_row(result))
 
	    ;
 
	  mysql_free_result(result);
 
	}
 
      return NULL;
 
    }
 

	
 
  if(!result)
 
    {
 
      fprintf(stderr, "expected response/result for query ``%s'', got nothing\n", query);
 
      return NULL;
 
    }
 

	
 
  /**
 
     Sanity check
 
   */
 
  num_columns = mysql_num_fields(result);
 
  if(num_columns != expected_columns)
 
    {
 
      fprintf(stderr, "Expected %d columns, got %d for ``%s''\n", expected_columns, num_columns, query);
 
      
 
      mysql_free_result(result);
 
      return NULL;
 
    }
 
  fprintf(stderr, "Done!\n");
 

	
 
  /**
 
     Prepare data for return.
 
   */
 
  distrenresult = malloc(sizeof(struct distrend_mysql_result));
 
  if(!distrenresult)
 
    {
 
      while(mysql_fetch_row(result))
 
	;
 
        mysql_free_result(result);
 
      return NULL;
 
    }
 

	
 
  distrenresult->mysqlresult = result;
 
  distrenresult->pointlesscheck = FORTY_TWO;
 

	
 
  return distrenresult;
 
}
 

	
 
int mysqlResultFree(distrend_mysql_conn_t conn, distrend_mysql_result_t result)
 
{
 
  size_t counter;
 
  MYSQL_RES *mysqlresult;
 

	
 
  if(!result)
 
    {
 
      fprintf(stderr, "mysqlResultFree(): warning, passed NULL parameter\n");
 
      return 0;
 
    }
 
  
 
  if(result->pointlesscheck != FORTY_TWO)
 
    fprintf(stderr, "%s:%d: I didn't get the type of handle I wanted\n", __FILE__, __LINE__);
 

	
 
  mysqlresult = result->mysqlresult;
 
  free(result);
 

	
 
  /**
 
     Must flush the resultset buffer.
 
   */
 
  for(counter = 0;
 
      mysql_fetch_row(mysqlresult);
 
      counter ++)
 
    ;
 
  if(counter)
 
    fprintf(stderr, "Calling function did not flush %d rows\n", (int)counter);
 

	
 
  while(mysql_more_results(conn->mysqlconn))
 
    {
 
      fprintf(stderr, "flushing an extraneous result set\n");
 
      mysql_next_result(conn->mysqlconn);
 
      mysqlresult = mysql_use_result(conn->mysqlconn);
 
      if(mysqlresult)
 
	{
 
	  while(mysql_fetch_row(mysqlresult))
 
	    ;
 
	  mysql_free_result(mysqlresult);
 
	}
 
    }
 

	
 
  return 0;
 
}
 

	
 
/*
 
   Individual query functions:
 
 */
 

	
 
void finish_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  char *query;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Complete`(%d,%d,%d);", slavekey, jobkey, framenum);
 

	
 
  mysqlQuery(conn, query, 0);
 
  free(query);
 
}
 

	
 
void start_frame(distrend_mysql_conn_t conn, int32_t slavekey, int32_t jobkey, int32_t framenum)
 
{
 
  char *query;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Start`(%d,%d,%d);", slavekey, jobkey, framenum);
 

	
 
  mysqlQuery(conn, query, 0);
 
  free(query);
 
}
 

	
 
void set_renderpower(distrend_mysql_conn_t conn, int32_t slavekey, int32_t renderpower)
 
{
 
  char *query;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Slave_Render_Power_Update`(%d,%d);", slavekey, renderpower);
 

	
 
  mysqlQuery(conn, query, 0);
 
  free(query);
 
}
 

	
 
int change_job_priority(distrend_mysql_conn_t conn, int32_t jobkey, int32_t newpriority)
 
{
 
  char *query;
 

	
 
  _distren_asprintf(&query, "UPDATE `distren`.`Job` SET `Priority`=%d WHERE `Job_Key`=%d",
 
		    newpriority, jobkey);
 
  mysqlQuery(conn, query, 0);
 

	
 
  return 0;
 
}
 

	
 
int find_jobframe(distrend_mysql_conn_t conn, int32_t slavekey, jobnum_t *jobkey, int32_t *framenum)
 
{
 
  distrend_mysql_result_t result;
 
  char *query;
 
  MYSQL_ROW row;
 

	
 
  int rtn;
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Frame_Get`(%d);", slavekey);
 

	
 
  result = mysqlQuery(conn, query, 2);
 
  free(query);
 

	
 
  if(!result)
 
    return 1;
 

	
 
  row = mysql_fetch_row(result->mysqlresult);
 
  if(!row)
 
    {
 
      mysqlResultFree(conn, result);
 
      return 1;
 
    }
 

	
 
  rtn = distrend_mysql_getint(row, 0, (int32_t *)jobkey);
 
  rtn += distrend_mysql_getint(row, 1, framenum);
 
  
 
  mysqlResultFree(conn, result);
 

	
 
  return rtn;
 
}
 

	
 
int auth_slave(distrend_mysql_conn_t conn, int32_t slavekey, char *slavepass)
 
{
 
  // Return values:
 
  // -- 2-user doesn't exist
 
  // -- 1-pass is wrong
 
  // -- 0-parameters correct
 

	
 
  distrend_mysql_result_t result;
 
  char *query;
 
  MYSQL_ROW row;
 
  int32_t *isAuth = 0;
 

	
 
  int rtn;
 
  mysql_real_escape_string(conn->mysqlconn,slavepass,slavepass,(unsigned int)strlen(slavepass));
 

	
 
  _distren_asprintf(&query, "CALL `distren`.`Slave_Check`(%d,'%s');", slavekey, slavepass);
 

	
 
  result = mysqlQuery(conn, query, 2);
 
  free(query);
 

	
 
  if(!result)
 
    return 1;
 

	
 
  row = mysql_fetch_row(result->mysqlresult);
 
  if(!row)
 
    {
 
      mysqlResultFree(conn, result);
 
      return 1;
 
    }
 

	
 
  rtn = distrend_mysql_getint(row, 0, (int32_t *)isAuth);
 
  
 
  mysqlResultFree(conn, result);
 

	
 
  return rtn;
 
}
 

	
 
/** Runs frame watchdog to reassign stale frames */
 
void frame_watchdog(distrend_mysql_conn_t conn)
 
{
 
  char *query;
 
  _distren_asprintf(&query, "CALL `distren`.`Watchdog_Delete`()");
 
  mysqlQuery(conn, query, 0);
 
  free(query);
 
}
0 comments (0 inline, 0 general)