/*
Copyright 2010 Nathan Phillip Brink
This file is a part of DistRen.
DistRen is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DistRen is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with DistRen. If not, see .
*/
#include "listen.h"
#include "common/protocol.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* local */
struct distrend_packet
{
size_t len;
char *data;
};
struct distrend_request_handler_info
{
enum distren_request_type request_type;
distrend_handle_request_func_t handler;
};
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state);
int distrend_client_free(struct distrend_client *client);
void distrend_packet_free(struct distrend_packet *packet);
int distrend_makesocknonblock(int sock);
int distrend_packets_collapse(QUEUE *queue, size_t len);
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata);
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset);
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events);
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock);
struct distrend_listens *distrend_listens_new(struct general_info *geninfo)
{
struct distrend_listens *listens;
listens = malloc(sizeof(struct distrend_listens));
if(!listens)
return NULL;
listens->listen_socks = list_init();
listens->request_handlers = list_init();
listens->clients = list_init();
listens->pollfds = malloc(sizeof(struct pollfd) * 2); /*< what's this? a hardcoded value? well, it's an insane value... */
listens->pollfds_len = 2;
listens->nfds = 0;
listens->geninfo = geninfo;
if(!listens->listen_socks
|| !listens->request_handlers
|| !listens->clients
|| !listens->pollfds)
{
if(listens->listen_socks)
list_free(listens->listen_socks, LIST_NODEALLOC);
if(listens->request_handlers)
list_free(listens->request_handlers, LIST_NODEALLOC);
if(listens->clients)
list_free(listens->clients, LIST_NODEALLOC);
free(listens->pollfds);
free(listens);
return NULL;
}
return listens;
}
/**
clean up after a partially completed distrend_listen()
call which ends in error.
*/
int distrend_listen_add_unwind(struct distrend_listen_sock *sockinfo)
{
int sock;
sock = sockinfo->sock.sock;
if(sock >= 0)
close(sock);
free(sockinfo);
return 0;
}
int distrend_listen_add(struct distrend_listens *listens, int port)
{
int tmp;
struct distrend_listen_sock *newsock;
struct sockaddr_in6 sockaddr =
{
.sin6_family = AF_INET6,
.sin6_port = 0,
.sin6_flowinfo = 0,
.sin6_addr = IN6ADDR_ANY_INIT,
.sin6_scope_id = 0
};
sockaddr.sin6_port = htons(port);
newsock = malloc(sizeof(struct distrend_listen_sock));
if(!newsock)
return 1;
newsock->port = port;
newsock->sock.sock = socket(AF_INET6, SOCK_STREAM, 0);
tmp = bind(newsock->sock.sock, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
if(tmp == -1)
{
perror("bind");
distrend_listen_add_unwind(newsock);
return 1;
}
tmp = listen(newsock->sock.sock, 1);
if(tmp == -1)
{
perror("listen");
distrend_listen_add_unwind(newsock);
return 1;
}
tmp = distrend_makesocknonblock(newsock->sock.sock);
if(tmp)
{
/* error is already printed to user by distrend_makesocknonblock() */
distrend_listen_add_unwind(newsock);
return 1;
}
distrend_listen_poll_newfd(listens, &newsock->sock, POLLRDNORM);
list_insert_after(listens->listen_socks, newsock, 0);
return 0;
}
int distrend_handleread(struct distrend_listens *listens,
struct distrend_client *client)
{
struct distrend_packet *packet;
ssize_t readlen;
char buf[8192];
struct distren_request *req;
void *reqdata;
packet = malloc(sizeof(struct distrend_packet));
if(!packet)
{
fprintf(stderr, "OOM\n");
return 1;
}
packet->len = 0;
packet->data = NULL;
readlen = read(client->sock.sock, buf, sizeof(buf));
if(readlen == -1)
{
perror("read");
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
client->state = DISTREND_CLIENT_DEAD;
free(packet);
return 1;
}
}
if(readlen == 0)
/* handle EOF */
{
client->state = DISTREND_CLIENT_DEAD;
free(packet);
return 1;
}
packet->len = readlen;
packet->data = malloc(readlen);
if(!packet->data)
{
fprintf(stderr, "OOM!\n");
free(packet);
return 1;
}
memcpy(packet->data, buf, readlen);
q_enqueue(client->inmsgs, packet, 0);
client->inlen += packet->len;
packet = NULL;
/**
Manage input, etc.
*/
if(client->expectlen == 0)
{
/* search out header from input so far */
distrend_packets_collapse(client->inmsgs, client->inlen);
packet = q_front(client->inmsgs);
if(packet->len > sizeof(struct distren_request))
{
if(distren_request_new_fromdata(&req, packet->data, packet->len))
{
fprintf(stderr, "Error handling data from client (magic likely did not match), closing connection\n");
client->state = DISTREND_CLIENT_DEAD;
return 1;
}
client->expectlen = req->len + sizeof(struct distren_request);
distren_request_free(req);
}
}
if(client->expectlen
&& client->inlen >= client->expectlen)
{
/** essentially un-queue-ize the queue ;-) */
distrend_packets_collapse(client->inmsgs, client->inlen);
packet = q_front(client->inmsgs);
if(distren_request_new_fromdata(&req, packet->data, packet->len))
{
fprintf(stderr, "error handling data from client\n");
client->state = DISTREND_CLIENT_DEAD;
return 1;
}
if(packet->len - sizeof(struct distren_request) < req->len)
{
fprintf(stderr, "error handling some data from client\n");
distren_request_free(req);
client->state = DISTREND_CLIENT_DEAD;
return 1;
}
reqdata = malloc(req->len);
if(!reqdata)
{
fprintf(stderr, "OOM\n");
distren_request_free(req);
return 1;
}
memcpy(reqdata, ((void *)packet->data) + sizeof(struct distren_request), req->len);
memmove(packet->data,
((void *)packet->data) + client->expectlen,
packet->len - client->expectlen);
packet->len -= client->expectlen;
client->inlen -= client->expectlen;
client->expectlen = 0;
distrend_dispatch_request(listens, client, req, reqdata);
free(reqdata);
distren_request_free(req);
}
return 0;
}
struct distrend_accept_client_proc_data
{
struct distrend_listens *listens;
time_t current_time;
};
int distrend_accept_client_proc(struct distrend_accept_client_proc_data *data,
struct distrend_client *client)
{
struct distrend_packet *packet;
ssize_t written_amount;
short revents;
if(client->state == DISTREND_CLIENT_DEAD)
return TRUE;
/**
Manage timing-out clients.
*/
if(data->current_time > client->cleanup_time)
switch(client->state)
{
case DISTREND_CLIENT_PREVERSION:
distrend_send_disconnect(data->listens, client, "You have failed to present version information in a timely manner. Cya :-p");
break;
case DISTREND_CLIENT_PREAUTH:
distrend_send_disconnect(data->listens, client, "You have failed to present authentication information in a timely manner. Cya ;-)");
break;
case DISTREND_CLIENT_GOOD:
distrend_send_disconnect(data->listens, client, "Ping timeout :-p");
break;
case DISTREND_CLIENT_BAD:
client->state = DISTREND_CLIENT_DEAD;
return TRUE;
default:
break;
}
revents = data->listens->pollfds[client->sock.pollfd_offset].revents;
if(revents & POLLRDNORM)
/* handle reading */
{
fprintf(stderr, "%s:%d: My traversal says that sock %d is available for reading\n",
__FILE__, __LINE__, client->sock.sock);
distrend_handleread(data->listens, client);
}
if(revents & POLLWRNORM)
{
fprintf(stderr, "%s:%d: My traversal says that sock %d is available for writing\n",
__FILE__, __LINE__, client->sock.sock);
if(q_empty(client->outmsgs))
{
data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
return TRUE;
}
packet = q_front(client->outmsgs);
written_amount = write(client->sock.sock, packet->data, packet->len);
/**
Disconnect in case of write error.
*/
if(written_amount == -1)
{
perror("write");
/*
distrend_client_free();
Until liblist has the ability to delete nodes
during traversal, we'll have to free the client
during a client-list cleanup/pruning cycle.
*/
client->state = DISTREND_CLIENT_DEAD;
}
if(packet->len == written_amount)
{
q_dequeue(client->outmsgs);
distrend_packet_free(packet);
if(q_empty(client->outmsgs))
data->listens->pollfds[client->sock.pollfd_offset].events &= ~POLLWRNORM;
}
else
{
/**
shifting seems the simplest solution.
*/
packet->len -= written_amount;
memmove(packet->data, packet->data + written_amount, packet->len);
}
}
/** continue iteration through the list */
return TRUE;
}
int distrend_accept_newclient_proc(struct distrend_listens *listens, struct distrend_listen_sock *listen_sock)
{
short revents;
int newclientsock;
revents = listens->pollfds[listen_sock->sock.pollfd_offset].revents;
if(revents & POLLRDNORM)
{
newclientsock = accept(listen_sock->sock.sock, (struct sockaddr *)NULL, (socklen_t *)NULL);
if(distrend_client_add(listens, newclientsock, DISTREND_CLIENT_PREVERSION))
{
fprintf(stderr, "error allocating/adding client struct\n");
return 1;
}
fprintf(stderr, "accepted new connection\n");
}
/**
Check other listen blocks too.
*/
return TRUE;
}
int distrend_accept(struct distrend_listens *listens)//, struct distrend_clientset *clients, distrend_handle_request_t handlereq, void *handlereqdata)
{
int tmp;
struct distrend_accept_client_proc_data travinfo;
struct distrend_client *newclient;
fprintf(stderr, "poll(*, %d, -1)...\n", (int)listens->nfds);
poll(listens->pollfds, listens->nfds, -1);
if(tmp == -1)
{
perror("select");
return 1;
}
/**
Deal first with data waiting to be sent and then
with input data.
*/
travinfo.listens = listens;
travinfo.current_time = time(NULL); /*< cache the time */
list_traverse(listens->clients, &travinfo, (list_traverse_func_t)&distrend_accept_client_proc, LIST_FRNT | LIST_SAVE);
/**
Handle new connections.
*/
list_traverse(listens->listen_socks, listens, (list_traverse_func_t)&distrend_accept_newclient_proc, LIST_FRNT | LIST_SAVE);
/**
Until liblist supports deleting nodes during a traversal,
we clean dead clients here:
*/
list_mvfront(listens->clients);
newclient = list_curr(listens->clients);
while(newclient)
{
if(newclient->state == DISTREND_CLIENT_DEAD)
{
distrend_listen_poll_deletefd(listens, &newclient->sock);
distrend_client_free(newclient);
list_remove_curr(listens->clients);
fprintf(stderr, "removed dead connection\n");
}
list_mvnext(listens->clients);
/* provide for termination of this loop */
if(newclient == list_curr(listens->clients))
newclient = NULL;
else
newclient = list_curr(listens->clients);
}
return 0;
}
int distrend_listen_free(struct distrend_listens *listens)
{
fprintf(stderr, "%s:%d: I am a stub that needn't be implemented 'til later\n", __FILE__, __LINE__);
return 1;
}
/**
This is probably just NOT a placeholder for remotio
*/
void remotio_send_to_client(struct distrend_client *client, const char *msg, size_t len)
{
fprintf(stderr, "%s:%d: STUB I should queue data for writing to a client.... or should I? :-p\n", __FILE__, __LINE__);
}
/**
Adds a newly connected distrend_client to the listens->clients list
*/
int distrend_client_add(struct distrend_listens *listens, int sock, enum distrend_client_state state)
{
int tmp;
struct distrend_client *client;
tmp = distrend_makesocknonblock(sock);
if(tmp)
{
fprintf(stderr, "Not accepting this connection because I cannot make the sock non-blocking\n");
close(sock);
return 1;
}
client = malloc(sizeof(struct distrend_client));
if(!client)
{
fprintf(stderr, "OOM\n");
close(sock);
return 1;
}
client->sock.sock = sock;
distrend_listen_poll_newfd(listens, &client->sock, POLLRDNORM); /*< error checking? bah! */
client->state = state;
client->cleanup_time = time(NULL) + DISTREND_LISTEN_AUTHTIME;
client->inlen = 0;
client->expectlen = 0;
client->inmsgs = q_init();
client->outmsgs = q_init();
list_insert_after(listens->clients, client, 0);
/**
For those using netcat/telnet to debug their internets.
*/
#ifndef PACKAGE_URL
#define PACKAGE_URL "http://ohnopub.net/distren/"
#endif
#define DISTREN_GREETING PACKAGE_STRING " " PACKAGE_URL " : Nathan Phillip Brink && Ethan Michael Zonca\n"
/* using sizeof() - 1 because the sizeof() includes a NULL byte we want to ignore. */
distrend_client_write(listens, client, DISTREN_GREETING, sizeof(DISTREN_GREETING) - 1);
return 0;
}
/**
This function shall only be called after the appropriate
distrend_listen_poll_deletefd() has been called
*/
int distrend_client_free(struct distrend_client *client)
{
q_free(client->inmsgs, (list_dealloc_func_t)distrend_packet_free);
q_free(client->outmsgs, (list_dealloc_func_t)distrend_packet_free);
close(client->sock.sock);
return 0;
}
void distrend_packet_free(struct distrend_packet *packet)
{
free(packet->data);
free(packet);
}
int distrend_makesocknonblock(int sock)
{
int fdstatus;
int tmp;
fdstatus = fcntl(sock, F_GETFL);
fdstatus |= O_NONBLOCK;
tmp = fcntl(sock, F_SETFL, fdstatus);
if(tmp == -1)
{
perror("fcntl");
return 1;
}
return 0;
}
int distrend_client_write(struct distrend_listens *listens, struct distrend_client *client, char *towrite, size_t msglen)
{
struct distrend_packet *packet;
packet = malloc(sizeof(struct distrend_packet));
if(!packet)
{
fprintf(stderr, "OOM\n");
return 1;
}
packet->len = msglen;
packet->data = malloc(msglen);
if(!packet->data)
{
free(packet);
fprintf(stderr, "OOM\n");
return 1;
}
memcpy(packet->data, towrite, msglen);
q_enqueue(client->outmsgs, packet, 0);
listens->pollfds[client->sock.pollfd_offset].events |= POLLWRNORM;
return 0;
}
int distrend_client_write_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *data)
{
char *towrite;
int ret;
size_t msglen;
msglen = sizeof(struct distren_request) + req->len;
towrite = malloc(msglen);
if(!towrite)
{
fprintf(stderr, "OOM\n");
return 1;
}
memcpy(towrite, req, sizeof(struct distren_request));
memcpy(towrite + sizeof(struct distren_request), data, req->len);
ret = distrend_client_write(listens, client, towrite, msglen);
free(towrite);
return ret;
}
int distrend_client_read(struct distrend_client *client, char **toread, size_t *lenread)
{
struct distrend_packet *packet;
*lenread = 0;
*toread = NULL;
if(q_empty(client->inmsgs))
return 1;
packet = q_dequeue(client->inmsgs);
*lenread = packet->len;
*toread = packet->data;
free(packet);
return 0;
}
/**
collapse a QUEUE of struct distrend_packet with a collective length
of len into a single struct packet.
*/
int distrend_packets_collapse(QUEUE *queue, size_t len)
{
struct distrend_packet *newpacket;
struct distrend_packet *oldpacket;
size_t copiedlen;
newpacket = malloc(sizeof(struct distrend_packet));
if(!newpacket)
{
fprintf(stderr, "OOM\n");
return 1;
}
newpacket->data = malloc(len);
if(!newpacket->data)
{
fprintf(stderr, "OOM\n");
perror("malloc");
free(newpacket);
return 1;
}
for(copiedlen = 0;
copiedlen < len && !q_empty(queue);
)
{
oldpacket = q_dequeue(queue);
memcpy(newpacket->data + copiedlen, oldpacket->data, oldpacket->len);
copiedlen += oldpacket->len;
distrend_packet_free(oldpacket);
}
newpacket->len = copiedlen;
if(copiedlen < len || !q_empty(queue))
{
fprintf(stderr, "inconsistency!\n");
return 1;
}
q_enqueue(queue, newpacket, 0);
return 0;
}
int distrend_send_disconnect(struct distrend_listens *listens, struct distrend_client *client, char *quit_msg)
{
struct distren_request *req;
distren_request_new(&req, strlen(quit_msg), DISTREN_REQUEST_DISCONNECT);
distrend_client_write_request(listens, client, req, quit_msg);
distren_request_free(req);
client->state = DISTREND_CLIENT_BAD;
client->cleanup_time = time(NULL) + DISTREND_LISTEN_DISCONNECT_GRACE;
return 0;
}
int distrend_listen_handler_add(struct distrend_listens *listens, enum distren_request_type type, distrend_handle_request_func_t handler)
{
struct distrend_request_handler_info *handler_info;
handler_info = malloc(sizeof(struct distrend_request_handler_info));
if(!handler_info)
return 1;
handler_info->request_type = type;
handler_info->handler = handler;
list_insert_after(listens->request_handlers, handler_info, 0);
return 0;
}
struct distrend_dispatch_request_data
{
struct general_info *geninfo;
struct distrend_client *client;
struct distren_request *req;
void *req_data;
};
/**
traversal function for distrend_dispatch_request().
*/
int _distrend_dispatch_request_trav(struct distrend_dispatch_request_data *data, struct distrend_request_handler_info *handler_info)
{
if(handler_info->request_type == data->req->type)
(*handler_info->handler)(data->geninfo, data->client, data->req->len, data->req_data);
return TRUE;
}
/**
helper for distrend_handleread() which looks up the correct
request handler and handles handing the the request to the
handler. :-p
*/
int distrend_dispatch_request(struct distrend_listens *listens, struct distrend_client *client, struct distren_request *req, void *reqdata)
{
struct distrend_dispatch_request_data data;
data.geninfo = listens->geninfo;
data.client = client;
data.req = req;
data.req_data = reqdata;
list_traverse(listens->request_handlers, &data, (list_traverse_func_t)&_distrend_dispatch_request_trav, LIST_FRNT | LIST_SAVE);
return 0;
}
/**
Helper for the distrend_listen_add() and functions that
call accept() to maintain the struct pollfd list in listens.
@param listens The info related to listening to sockets.
@param fd the FD to add to the list of sockets.
@param poll_events The poll() events to register in the list of sockets.
@param entry_offset Will be set to the index of the struct pollfd that is created. This is not a pointer to struct pollfd because the pointer may change whereas the index will only change under controlled circumstances.
*/
int distrend_listen_poll_newfd(struct distrend_listens *listens, struct distrend_polled_sock *sock, short poll_events)
{
size_t new_len;
struct pollfd *new_pollfds;
if(listens->nfds + 1 > listens->pollfds_len)
{
new_len = listens->pollfds_len * 2;
new_pollfds = malloc(sizeof(struct pollfd) * new_len);
if(!new_pollfds)
return 1;
memcpy(new_pollfds, listens->pollfds, sizeof(struct pollfd) * listens->pollfds_len);
free(listens->pollfds);
listens->pollfds_len = new_len;
listens->pollfds = new_pollfds;
}
sock->pollfd_offset = listens->nfds;
listens->pollfds[sock->pollfd_offset].fd = sock->sock;
listens->pollfds[sock->pollfd_offset].events = poll_events;
listens->nfds ++;
fprintf(stderr, "Added sock=%d events=%d to the pollfds\n", sock->sock, poll_events);
return 0;
}
/**
Removes a particular struct pollfd from the listens->pollfds list. This
will likely also require remapping at least one other existing pollfd
entry, so this will also traverse the list of clients and update the
entry_offset of the client which has to be remapped.
@param listens the struct distrend_listens
@param sock the relevent struct containing the offset of the pollfd that must be removed.
@return 0 on success, 1 on error.
*/
int distrend_listen_poll_deletefd(struct distrend_listens *listens, struct distrend_polled_sock *sock)
{
/*
the socket which is being moved from the end of the list
to the middle to fill in the hole left by removing sock
*/
struct distrend_polled_sock *displaced_sock;
/*
special case: we're deleting the last element and thus
don't need to remap an existing entry.
*/
if(sock->pollfd_offset == listens->nfds - 1)
{
listens->nfds --;
return 0;
}
/*
Shift whatever's at the end of the pollfds array into the position
of the pollfd being removed. Then update that client's struct distrend_client
entry to point to the new offset.
*/
memcpy(&listens->pollfds[sock->pollfd_offset], &listens->pollfds[listens->nfds - 1], sizeof(struct pollfd));
displaced_sock = distrend_polled_sock_get_by_offset(listens, listens->nfds - 1);
if(!displaced_sock)
{
fprintf(stderr, "Inconsistent state! Cannot find client or listen() port with pollfds offset of %d, nfds=%d\n",
(int)listens->nfds - 1, (int)listens->nfds);
listens->nfds --;
return 1;
}
displaced_sock->pollfd_offset = sock->pollfd_offset;
listens->nfds --;
return 0;
}
/**
For _distrend_polled_sock_get_by_offset_traverse()
*/
struct distrend_polled_sock_get_by_offset_data
{
struct distrend_polled_sock *sock;
size_t pollfd_offset;
};
/**
List traversal helper function for distrend_polled_sock_get_by_offset().
@param sock The second argument is casting either sturct distrend_client or
struct distrend_listen_sock to a struct distrend_polled_sock which works
because each of the two former structs as a sturct distrend_polled_sock
as its first member.
*/
int _distrend_polled_sock_get_by_offset_traverse(struct distrend_polled_sock_get_by_offset_data *data, struct distrend_polled_sock *sock)
{
if(data->pollfd_offset == sock->pollfd_offset)
{
data->sock = sock;
return FALSE;
}
return TRUE;
}
/**
Returns a struct distrend_client based on its offset in the listens->pollfds
array.
@param listens the listens.
@param pollfds_offset the index in the listens->pollfds array of the client we should find
@return NULL if the client is not found or a pointer to the struct distrend_client for the specified client. Generally, you would not free() this but delete the client by first removing it from the listen->client list.
*/
struct distrend_polled_sock *distrend_polled_sock_get_by_offset(struct distrend_listens *listens, size_t pollfds_offset)
{
struct distrend_polled_sock_get_by_offset_data data;
data.sock = NULL;
data.pollfd_offset = pollfds_offset;
/**
These traversals both depend on struct distrend_polled_sock being
the first entry of struct distrend_listens and struct distrend_client.
*/
list_traverse(listens->clients, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
if(data.sock == NULL)
list_traverse(listens->listen_socks, &data, (list_traverse_func_t)&_distrend_polled_sock_get_by_offset_traverse, LIST_FRNT | LIST_SAVE);
return data.sock;
}