Files @ d73a30892ea0
Branch filter:

Location: DistRen/src/client/libdistren.c

binki
merge
/*
  Copyright 2010 Nathan Phillip Brink, Ethan Zonca, Matt Orlando

  This file is a part of DistRen.

  DistRen is free software: you can redistribute it and/or modify
  it under the terms of the GNU Affero General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  DistRen is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU Affero General Public License for more details.

  You should have received a copy of the GNU Affero General Public License
  along with DistRen.  If not, see <http://www.gnu.org/licenses/>.
*/

/*
  Implementation of distren_* functions from distren.h excluding distren_job_* functions.
 */

#include "common/config.h"
#include "common/options.h"
#include "common/protocol.h"
#include "common/remoteio.h"
#include "common/request.h"

#include "libdistren.h"

#include <errno.h>
#include <libgen.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/**
 * Handle common cleanup actions for distren_init().
 */
static void distren_init_cleanup(distren_t distren);

int distren_init(distren_t *handle)
{
  int tmp;

  struct distren_request *req;
  void *data;

  if(!handle)
    return 1;

  *handle = malloc(sizeof(struct distren));
  if(!*handle)
    return 1;

  memset(*handle, 0, sizeof(struct distren));

  /* now the environment is ready for general use */
  if(_distren_getoptions(*handle))
    {
      fprintf(stderr, "error getting configuration\n");
      distren_init_cleanup(*handle);
      return 1;
    }

  tmp = remoteio_open_server(&(*handle)->rem,
			     (*handle)->options->remoteio,
			     (remoteio_read_handle_func_t)&libdistren_remoteio_read_handle,
			     *handle,
			     (remoteio_close_handle_func_t)&libdistren_remoteio_close_handle,
			     (*handle)->server);
  if(tmp)
    {
      fprintf(stderr, "error: unable to connect to server\n");

      distren_init_cleanup(*handle);
      return 1;
    }

  /* send off a DISTREN_REQUEST_VERSION to the server */
  tmp = distren_request_version(&req, &data, DISTREN_SERVERTYPE_CLIENT, PACKAGE_STRING);
  if(tmp)
    {
      fprintf(stderr, "error: unable to allocate request");

      distren_init_cleanup(*handle);
      return 1;
    }
  distren_request_send((*handle)->rem, req, data);
  distren_request_free_with_data(req, data);

  while((*handle)->rem
	&& (*handle)->state == DISTREN_STATE_VERSION)
      multiio_poll((*handle)->multiio, 500);

  if(!(*handle)->rem)
    {
      distren_init_cleanup(*handle);
      return 1;
    }

  return 0;
}

static void distren_init_cleanup(distren_t distren)
{
  if(distren->rem)
    remoteio_close(distren->rem);
  distren->rem = NULL;
  distren_free(distren);
}

#ifdef _WIN32
#define FOPEN_BINARY "b"
#else
#define FOPEN_BINARY ""
#endif
int distren_submit_file(distren_t handle, distren_job_t *job, const char *filename)
{
  FILE *in;
  char buf[DISTREN_REQUEST_FILE_POST_DATA_LEN];
  size_t len;
  void *data;

  struct distren_request *req;
  struct distren_request_file_post data_header;
  distren_request_file_post_context_t post_context;

  /* for basename() to play with */
  char *my_filename;

  size_t sendq_dots;

  errno = 0;
  in = fopen(filename, "r" FOPEN_BINARY);
  if(!in)
    {
      perror("fopen");
      return 1;
    }

  fprintf(stderr, "info: Starting upload of %s...\n", filename);

  /* prepare the server for a file upload */
  my_filename = strdup(filename);
  distren_request_file_post_start(&req,
				  &data,
				  &post_context,
				  handle->file_post_id,
				  basename(my_filename));
  free(my_filename);
  distren_request_send(handle->rem, req, data);
  distren_request_free_with_data(req, data);
  handle->file_post_id ++;

  /* send file body */
  while(in
	&& !ferror(in)
	&& !feof(in)
	&& handle->rem)
    {
      len = fread(buf, 1, DISTREN_REQUEST_FILE_POST_DATA_LEN, in);
      if(len == 0)
	continue;
      distren_request_file_post(&req,
				&data_header,
				post_context,
				buf,
				len);
      remoteio_write(handle->rem, req, sizeof(struct distren_request));
      /* should we check of handle->rem is NULL or not here...? */
      remoteio_write(handle->rem, &data_header, sizeof(struct distren_request_file_post));
      remoteio_write(handle->rem, buf, len);
      distren_request_free(req);

      /* ensure we have no more than a megabyte waiting to be sent. */
      while(handle->rem
	    && (sendq_dots = remoteio_sendq_len(handle->rem) / 3) > (1024 * 1024 / DISTREN_REQUEST_FILE_POST_DATA_LEN))
	{
	  multiio_poll(handle->multiio, -1);
	  if(handle->rem)
	    while(sendq_dots > remoteio_sendq_len(handle->rem) / 3)
	      {
		sendq_dots --;
		fputc('.', stderr);
	      }
	}
    }

  /*
   * declare the upload as finished, marking it as cancelled on I/O
   * error.
  */
  distren_request_file_post_finish(&req, &data, post_context, (!in || ferror(in)) ? 1 : 0);
  if(handle->rem)
    distren_request_send(handle->rem, req, data);
    distren_request_free_with_data(req, data);

  if(in)
    fclose(in);
  if(!handle->rem)
    return 1;

  /* let's block until the file's gone. */
  while(handle->rem
	&& (sendq_dots = remoteio_sendq_len(handle->rem)))
    {
      multiio_poll(handle->multiio, -1);
      if(handle->rem)
	while(sendq_dots / 3 > remoteio_sendq_len(handle->rem) / 3)
	  {
	    sendq_dots -= 3;
	    fputc('.', stderr);
	  }
    }
  fputc('\n', stderr);
  fprintf(stderr, "info: %s successfully uploaded, as far as we know.\n", filename);

  return 0;
}

int distren_free(distren_t handle)
{
  if(handle->rem)
    remoteio_close(handle->rem);
  free(handle);
  return 0;
}