/*
Copyright 2010 Nathan Phillip Brink
This file is a part of DistRen.
DistRen is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DistRen is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with DistRen. If not, see .
*/
#include "listen.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* local */
struct distrend_clientset
{
LIST *clients;
int nfds;
};
struct distrend_packet
{
size_t len;
char *data;
};
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
int distrend_client_free(struct distrend_client *client);
void distrend_packet_free(struct distrend_packet *packet);
int distrend_makesocknonblock(int sock);
int distrend_packets_collapse(QUEUE *queue, size_t len);
/**
clean up after a partially completed distrend_listen()
call which ends in error.
*/
int distrend_listen_unwind(struct distrend_config *config, struct distrend_clientset *clients, size_t counter)
{
int sock;
for(counter ++; counter > 0; counter --)
{
sock = config->listens[counter - 1].sock;
if(sock >= 0)
close(sock);
}
list_free(clients->clients, NULL);
free(clients);
return 0;
}
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
{
int tmp;
size_t counter;
struct sockaddr_in6 sockaddr =
{
.sin6_family = AF_INET6,
.sin6_port = 0,
.sin6_flowinfo = 0,
.sin6_addr = IN6ADDR_ANY_INIT,
.sin6_scope_id = 0
};
*clients = malloc(sizeof(struct distrend_clientset));
(*clients)->clients = list_init();
(*clients)->nfds = 1;
for(counter = 0; config->listens[counter].port; counter ++)
{
sockaddr.sin6_port = htons(config->listens[counter].port);
config->listens[counter].sock = socket(AF_INET6, SOCK_STREAM, 0);
tmp = bind(config->listens[counter].sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
if(tmp == -1)
{
perror("bind");
distrend_listen_unwind(config, *clients, counter);
return 1;
}
tmp = listen(config->listens->sock, 1);
if(tmp == -1)
{
perror("listen");
distrend_listen_unwind(config, *clients, counter);
return 1;
}
tmp = distrend_makesocknonblock(config->listens->sock);
if(tmp)
{
/* error is already printed to user by distrend_makesocknonblock() */
distrend_listen_unwind(config, *clients, counter);
return 1;
}
}
return 0;
}
int distrend_handleread(struct distrend_config *config,
struct distrend_client *client,
distrend_handle_request_t handlereq,
void *handlereqdata)
{
struct distrend_packet *packet;
ssize_t readlen;
char buf[8192];
struct distren_request *req;
void *reqdata;
/**
we have to check that there's new data so that
we may call ourself recursively
*/
fd_set readfd;
struct timeval selecttimeout;
packet = malloc(sizeof(struct distrend_packet));
if(!packet)
{
fprintf(stderr, "OOM\n");
return 1;
}
packet->len = 0;
packet->data = NULL;
FD_ZERO(&readfd);
FD_SET(client->sock, &readfd);
memset(&selecttimeout, '\0', sizeof(selecttimeout));
select(client->sock + 1, &readfd, NULL, NULL, &selecttimeout);
if(FD_ISSET(client->sock, &readfd))
{
readlen = read(client->sock, buf, sizeof(buf));
if(readlen == -1)
{
perror("read");
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
client->state = DISTREND_CLIENT_DEAD;
free(packet);
return 1;
}
}
if(readlen == 0)
/* handle EOF */
{
client->state = DISTREND_CLIENT_DEAD;
free(packet);
return 1;
}
packet->len = readlen;
packet->data = malloc(readlen);
if(!packet->data)
{
fprintf(stderr, "OOM!\n");
free(packet);
return 1;
}
memcpy(packet->data, buf, readlen);
q_enqueue(client->inmsgs, packet, 0);
client->inlen += packet->len;
packet = NULL;
} /* if(FD_ISSET(client->sock, &readfd)) */
/**
Manage input, etc.
*/
if(client->expectlen == 0)
{
/* search out header from input so far */
distrend_packets_collapse(client->inmsgs, client->inlen);
packet = q_front(client->inmsgs);
if(packet->len > sizeof(struct distren_request))
{
if(distren_request_new_fromdata(&req, packet->data, packet->len))
{
fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
client->state = DISTREND_CLIENT_DEAD;
return 1;
}
client->expectlen = req->len + sizeof(struct distren_request);
distren_request_free(req);
}
}
if(client->expectlen
&& client->inlen >= client->expectlen)
{
/** essentially un-queue-ize the queue ;-) */
distrend_packets_collapse(client->inmsgs, client->inlen);
packet = q_front(client->inmsgs);
if(distren_request_new_fromdata(&req, packet->data, packet->len))
{
fprintf(stderr, "error handling data from client\n");
client->state = DISTREND_CLIENT_DEAD;
return 1;
}
if(packet->len - sizeof(struct distren_request) < req->len)
{
fprintf(stderr, "error handling some data from client\n");
distren_request_free(req);
client->state = DISTREND_CLIENT_DEAD;
return 1;
}
reqdata = malloc(req->len);
if(!reqdata)
{
fprintf(stderr, "OOM\n");
distren_request_free(req);
return 1;
}
memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len);
memmove(packet->data,
((void *)packet->data) + client->expectlen,
packet->len - client->expectlen);
packet->len -= client->expectlen;
client->inlen -= client->expectlen;
client->expectlen = 0;
(*handlereq)(client, req, reqdata, handlereqdata);
distren_request_free(req);
}
return 0;
}
struct distrend_accept_fdset_prepare_data
{
fd_set readfds;
fd_set writefds;
/**
stores the highest FD number which is one less than the first
arfgument to select(). So do select(maxfd + 1, ...)
*/
int maxfd;
};
int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data,
struct distrend_client *client)
{
if(client->state == DISTREND_CLIENT_DEAD)
return TRUE;
if(client->sock > data->maxfd)
data->maxfd = client->sock;
FD_SET(client->sock, &data->readfds);
/**
Only select on an outgoing socket if we have something to
say.
*/
if(!q_empty(client->outmsgs))
FD_SET(client->sock, &data->writefds);
return TRUE;
}
struct distrend_accept_client_proc_data
{
fd_set *fdset;
struct distrend_config *config;
distrend_handle_request_t handlereq;
void *handlereqdata;
enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
};
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
struct distrend_client *client)
{
struct distrend_packet *packet;
ssize_t written_amount;
if(client->state == DISTREND_CLIENT_DEAD)
return TRUE;
if(!FD_ISSET(client->sock, data->fdset))
/** continue iteration through the list */
return TRUE;
fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
__FILE__, __LINE__, client->sock,
(data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "writ");
switch(data->mode)
{
case DISTREND_ACCEPT_CLIENT_WRITE:
if(q_empty(client->outmsgs))
return TRUE;
packet = q_front(client->outmsgs);
written_amount = write(client->sock, packet->data, packet->len);
/**
Disconnect in case of write error.
*/
if(written_amount == -1)
{
perror("write");
/*
distrend_client_free();
Until liblist has the ability to delete nodes
during traversal, we'll have to free the client
during a client-list cleanup/pruning cycle.
*/
client->state = DISTREND_CLIENT_DEAD;
}
if(packet->len == written_amount)
{
q_dequeue(client->outmsgs);
distrend_packet_free(packet);
}
else
{
/**
shifting seems the simplest solution.
*/
packet->len -= written_amount;
}
break;
case DISTREND_ACCEPT_CLIENT_READ:
distrend_handleread(data->config, client, data->handlereq, data->handlereqdata);
break;
}
/** continue iteration through the list */
return TRUE;
}
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
{
int tmp;
int newclientsock;
struct distrend_accept_fdset_prepare_data fdsets;
struct distrend_accept_client_proc_data travinfo;
struct distrend_client *newclient;
FD_ZERO(&fdsets.readfds);
FD_ZERO(&fdsets.writefds);
FD_SET(config->listens->sock, &fdsets.readfds);
fdsets.maxfd = config->listens->sock;
list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR);
fprintf(stderr, "select()...");
tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL);
if(tmp == -1)
{
perror("select");
return 1;
}
/**
Deal first with data waiting to be sent and then
with input data.
*/
travinfo.config = config;
travinfo.fdset = &fdsets.writefds;
travinfo.handlereq = handlereq;
travinfo.handlereqdata = handlereqdata;
travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
travinfo.fdset = &fdsets.readfds;
travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
/**
Handle new connections.
*/
if(FD_ISSET(config->listens->sock, &fdsets.readfds))
{
newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH))
{
fprintf(stderr, "error allocating client struct\n");
return 1;
}
clients->nfds ++;
list_insert_after(clients->clients, newclient, 0);
fprintf(stderr, "accepted new connection\n");
distrend_client_write(newclient, "hello\n", 6);
}
/**
Until liblist supports deleting nodes during a traversal,
we clean dead clients here:
*/
list_mvfront(clients->clients);
newclient = list_curr(clients->clients);
while(newclient)
{
if(newclient->state == DISTREND_CLIENT_DEAD)
{
distrend_client_free(newclient);
list_remove_curr(clients->clients);
fprintf(stderr, "removed dead connection\n");
}
list_mvnext(clients->clients);
/* provide for termination of this loop */
if(newclient == list_curr(clients->clients))
newclient = NULL;
else
newclient = list_curr(clients->clients);
}
return 0;
}
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients)
{
fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
return 1;
}
/**
This is probably just NOT a placeholder for remotio
*/
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
{
fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
}
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
{
int tmp;
tmp = distrend_makesocknonblock(sock);
if(tmp)
{
fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
return 1;
}
*client = malloc(sizeof(struct distrend_client));
if(!*client)
{
fprintf(stderr, "OOM\n");
return 1;
}
(*client)->sock = sock;
(*client)->state = state;
(*client)->inlen = 0;
(*client)->expectlen = 0;
(*client)->inmsgs = q_init();
(*client)->outmsgs = q_init();
return 0;
}
int distrend_client_free(struct distrend_client *client)
{
q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
close(client->sock);
return 0;
}
void distrend_packet_free(struct distrend_packet *packet)
{
free(packet->data);
free(packet);
}
int distrend_makesocknonblock(int sock)
{
int fdstatus;
int tmp;
fdstatus = fcntl(sock, F_GETFL);
fdstatus |= O_NONBLOCK;
tmp = fcntl(sock, F_SETFL, fdstatus);
if(tmp == -1)
{
perror("fcntl");
return 1;
}
return 0;
}
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen)
{
struct distrend_packet *packet;
packet = malloc(sizeof(struct distrend_packet));
if(!packet)
{
fprintf(stderr, "OOM\n");
return 1;
}
packet->len = msglen;
packet->data = malloc(msglen);
if(!packet->data)
{
free(packet);
fprintf(stderr, "OOM\n");
return 1;
}
memcpy(packet->data, towrite, msglen);
q_enqueue(client->outmsgs, packet, 0);
return 0;
}
int distrend_client_write_request(struct distrend_client *client, struct distren_request *req, void *data)
{
char *towrite;
int ret;
size_t msglen;
msglen = sizeof(struct distren_request) + req->len;
towrite = malloc(msglen);
if(!towrite)
{
fprintf(stderr, "OOM\n");
return 1;
}
memcpy(towrite, req, sizeof(struct distren_request));
memcpy(towrite + sizeof(struct distren_request), data, req->len);
ret = distrend_client_write(client, towrite, msglen);
free(towrite);
return ret;
}
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
{
struct distrend_packet *packet;
*lenread = 0;
*toread = NULL;
if(q_empty(client->inmsgs))
return 1;
packet = q_dequeue(client->inmsgs);
*lenread = packet->len;
*toread = packet->data;
free(packet);
return 0;
}
/**
collapse a QUEUE of struct distrend_packet with a collective length
of len into a single struct packet.
*/
int distrend_packets_collapse(QUEUE *queue, size_t len)
{
struct distrend_packet *newpacket;
struct distrend_packet *oldpacket;
size_t copiedlen;
newpacket = malloc(sizeof(struct distrend_packet));
if(!newpacket)
{
fprintf(stderr, "OOM\n");
return 1;
}
newpacket->data = malloc(len);
if(!newpacket->data)
{
fprintf(stderr, "OOM\n");
perror("malloc");
free(newpacket);
return 1;
}
for(copiedlen = 0;
copiedlen < len && !q_empty(queue);
)
{
oldpacket = q_dequeue(queue);
memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len);
copiedlen += oldpacket->len;
distrend_packet_free(oldpacket);
}
newpacket->len = copiedlen;
if(copiedlen < len || !q_empty(queue))
{
fprintf(stderr, "inconsistency!\n");
return 1;
}
q_enqueue(queue, newpacket, 0);
return 0;
}