/*
Copyright 2009 Nathan Phillip Brink
This file is a part of DistRen.
DistRen is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DistRen is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with DistRen. If not, see .
*/
#include "listen.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* local */
struct distrend_clientset
{
LIST *clients;
int nfds;
};
struct distrend_packet
{
size_t len;
char *data;
};
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state);
int distrend_client_free(struct distrend_client *client);
void distrend_packet_free(struct distrend_packet *packet);
int distrend_makesocknonblock(int sock);
int distrend_listen(struct distrend_config *config, struct distrend_clientset **clients)
{
int tmp;
struct sockaddr_in6 sockaddr =
{
.sin6_family = AF_INET6,
.sin6_port = 0,
.sin6_flowinfo = 0,
.sin6_addr = IN6ADDR_ANY_INIT,
.sin6_scope_id = 0
};
*clients = malloc(sizeof(struct distrend_clientset));
(*clients)->clients = list_init();
(*clients)->nfds = 1;
sockaddr.sin6_port = htons(4050);
config->listens->sock = socket(AF_INET6, SOCK_STREAM, 0);
tmp = bind(config->listens->sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
if(tmp == -1)
{
perror("bind");
free(*clients);
return 1;
}
tmp = listen(config->listens->sock, 1);
if(tmp == -1)
{
perror("listen");
free(*clients);
return 1;
}
tmp = distrend_makesocknonblock(config->listens->sock);
if(tmp)
{
free(*clients);
return 1;
}
return 0;
}
int distrend_handleread(struct distrend_config *config,
struct distrend_client *client)
{
struct distrend_packet *packet;
ssize_t readlen;
char buf[8192];
packet = malloc(sizeof(struct distrend_packet));
if(!packet)
{
fprintf(stderr, "OOM\n");
return 1;
}
packet->len = 0;
packet->data = NULL;
readlen = read(client->sock, buf, sizeof(buf));
if(readlen == -1)
{
perror("read");
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
client->state = DISTREND_CLIENT_DEAD;
free(packet);
return 1;
}
}
if(readlen == 0)
/* handle EOF */
{
client->state = DISTREND_CLIENT_DEAD;
free(packet);
return 1;
}
packet->len = readlen;
packet->data = malloc(readlen);
if(!packet->data)
{
fprintf(stderr, "OOM!\n");
free(packet);
return 1;
}
memcpy(packet->data, buf, readlen);
return 0;
}
struct distrend_accept_fdset_prepare_data
{
fd_set readfds;
fd_set writefds;
/**
stores the highest FD number which is one less than the first
arfgument to select(). So do select(maxfd + 1, ...)
*/
int maxfd;
};
int distrend_accept_fdset_prepare(struct distrend_accept_fdset_prepare_data *data,
struct distrend_client *client)
{
if(client->state == DISTREND_CLIENT_DEAD)
return TRUE;
if(client->sock > data->maxfd)
data->maxfd = client->sock;
FD_SET(client->sock, &data->readfds);
/**
Only select on an outgoing socket if we have something to
say.
*/
if(!q_empty(client->outmsgs))
FD_SET(client->sock, &data->writefds);
return TRUE;
}
struct distrend_accept_client_proc_data
{
fd_set *fdset;
struct distrend_config *config;
enum {DISTREND_ACCEPT_CLIENT_READ, DISTREND_ACCEPT_CLIENT_WRITE} mode;
};
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
struct distrend_client *client)
{
struct distrend_packet *packet;
ssize_t written_amount;
if(client->state == DISTREND_CLIENT_DEAD)
return TRUE;
if(!FD_ISSET(client->sock, data->fdset))
/** continue iteration through the list */
return TRUE;
fprintf(stderr, "%s:%d: My traversal says that sock %d is available for %sing\n",
__FILE__, __LINE__, client->sock,
(data->mode == DISTREND_ACCEPT_CLIENT_READ) ? "read" : "write");
switch(data->mode)
{
case DISTREND_ACCEPT_CLIENT_WRITE:
if(q_empty(client->outmsgs))
return TRUE;
packet = q_front(client->outmsgs);
written_amount = write(client->sock, packet->data, packet->len);
/**
Disconnect in case of write error.
*/
if(written_amount == -1)
{
perror("write");
/*
distrend_client_free();
Until liblist has the ability to delete nodes
during traversal, we'll have to free the client
during a client-list cleanup/pruning cycle.
*/
client->state = DISTREND_CLIENT_DEAD;
}
if(packet->len == written_amount)
{
q_dequeue(client->outmsgs);
distrend_packet_free(packet);
}
else
{
/**
shifting seems the simplest solution.
*/
packet->len -= written_amount;
}
break;
case DISTREND_ACCEPT_CLIENT_READ:
distrend_handleread(data->config, client);
break;
}
/** continue iteration through the list */
return TRUE;
}
int distrend_accept(struct distrend_config *config, struct distrend_clientset *clients)
{
int tmp;
int newclientsock;
struct distrend_accept_fdset_prepare_data fdsets;
struct distrend_accept_client_proc_data travinfo;
struct distrend_client *newclient;
FD_ZERO(&fdsets.readfds);
FD_ZERO(&fdsets.writefds);
FD_SET(config->listens->sock, &fdsets.readfds);
fdsets.maxfd = config->listens->sock;
list_traverse(clients->clients, &fdsets, (list_traverse_func_t)&distrend_accept_fdset_prepare, LIST_FRNT | LIST_ALTR);
fprintf(stderr, "select()...");
tmp = select(fdsets.maxfd + 1, &fdsets.readfds, &fdsets.writefds, NULL, (struct timeval *)NULL);
if(tmp == -1)
{
perror("select");
return 1;
}
/**
Deal first with data waiting to be sent and then
with input data.
*/
travinfo.config = config;
travinfo.fdset = &fdsets.writefds;
travinfo.mode = DISTREND_ACCEPT_CLIENT_WRITE;
list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
travinfo.fdset = &fdsets.readfds;
travinfo.mode = DISTREND_ACCEPT_CLIENT_READ;
list_traverse(clients->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_ALTR);
/**
Handle new connections.
*/
if(FD_ISSET(config->listens->sock, &fdsets.readfds))
{
newclientsock = accept(config->listens->sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
if(distrend_client_new(&newclient, newclientsock, DISTREND_CLIENT_PREAUTH))
{
fprintf(stderr, "error allocating client struct\n");
return 1;
}
clients->nfds ++;
list_insert_after(clients->clients, newclient, 0);
fprintf(stderr, "accepted new connection\n");
distrend_client_write(newclient, "hello\n", 6);
}
/**
Until liblist supports deleting nodes during a traversal,
we clean dead clients here:
*/
list_mvfront(clients->clients);
newclient = list_curr(clients->clients);
while(newclient)
{
if(newclient->state == DISTREND_CLIENT_DEAD)
{
distrend_client_free(newclient);
list_remove_curr(clients->clients);
fprintf(stderr, "removed dead connection\n");
}
list_mvnext(clients->clients);
/* provide for termination of this loop */
if(newclient == list_curr(clients->clients))
newclient = NULL;
else
newclient = list_curr(clients->clients);
}
return 0;
}
int distrend_unlisten(struct distrend_listen *listens, struct distrend_clientset *clients)
{
fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
return 1;
}
/**
This is probably just NOT a placeholder for remotio
*/
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
{
fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client\n", __FILE__, __LINE__);
}
int distrend_client_new(struct distrend_client **client, int sock, enum distrend_client_state state)
{
int tmp;
tmp = distrend_makesocknonblock(sock);
if(tmp)
{
fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
return 1;
}
*client = malloc(sizeof(struct distrend_client));
if(!*client)
{
fprintf(stderr, "OOM\n");
return 1;
}
(*client)->sock = sock;
(*client)->state = state;
(*client)->inmsgs = q_init();
(*client)->outmsgs = q_init();
return 0;
}
int distrend_client_free(struct distrend_client *client)
{
q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
close(client->sock);
return 0;
}
void distrend_packet_free(struct distrend_packet *packet)
{
free(packet->data);
free(packet);
}
int distrend_makesocknonblock(int sock)
{
int fdstatus;
int tmp;
fdstatus = fcntl(sock, F_GETFL);
fdstatus |= O_NONBLOCK;
tmp = fcntl(sock, F_SETFL, fdstatus);
if(tmp == -1)
{
perror("fcntl");
return 1;
}
return 0;
}
int distrend_client_write(struct distrend_client *client, char *towrite, size_t msglen)
{
struct distrend_packet *packet;
packet = malloc(sizeof(struct distrend_packet));
if(!packet)
{
fprintf(stderr, "OOM\n");
return 1;
}
packet->len = msglen;
packet->data = malloc(msglen);
if(!packet->data)
{
free(packet);
fprintf(stderr, "OOM\n");
return 1;
}
memcpy(packet->data, towrite, msglen);
q_enqueue(client->outmsgs, packet, 0);
return 0;
}
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
{
struct distrend_packet *packet;
*lenread = 0;
*toread = NULL;
if(q_empty(client->inmsgs))
return 1;
packet = q_dequeue(client->inmsgs);
*lenread = packet->len;
*toread = packet->data;
free(packet);
return 0;
}