version 0.3.2

This commit is contained in:
2024-12-16 11:41:33 +03:00
parent 19c14db40b
commit 9e2bfe4456
9 changed files with 223 additions and 93 deletions

124
socket.c
View File

@@ -32,7 +32,11 @@
// max clients amount
static int maxclients = 32;
// too much clients handler; it is running for client connected with number>maxclients (before closing its fd)
static sl_sock_maxclh_t toomuchclients = NULL;
static sl_sock_maxclh_t toomuch_handler = NULL;
// new client connected handler; it will be run each new connection
static sl_sock_connh_t newconnect_handler = NULL;
// client disconnected handler
static sl_sock_disch_t disconnect_handler = NULL;
/**
* @brief sl_sock_changemaxclients - change amount of max simultaneously connected clients
@@ -44,9 +48,18 @@ void sl_sock_changemaxclients(int val){
}
int sl_sock_getmaxclients(){ return maxclients; }
// setter of "too much clients handler"
// in each next function changed default handlers you can set h to NULL to remove your handler
// setter of "too much clients" handler
void sl_sock_maxclhandler(sl_sock_maxclh_t h){
toomuchclients = h;
toomuch_handler = h;
}
// setter of "new client connected" handler
void sl_sock_connhandler(sl_sock_connh_t h){
newconnect_handler = h;
}
// setter of "client disconnected" handler
void sl_sock_dischandler(sl_sock_disch_t h){
disconnect_handler = h;
}
// text messages for `hresult`
@@ -116,8 +129,9 @@ void sl_sock_delete(sl_sock_t **sock){
* @return NULL
*/
static void *clientrbthread(void *d){
char buf[512];
sl_sock_t *s = (sl_sock_t*) d;
size_t buflen = s->buffer->length;
char *buf = MALLOC(char, buflen);
DBG("Start client read buffer thread");
while(s && s->connected){
pthread_mutex_lock(&s->mutex);
@@ -126,8 +140,9 @@ static void *clientrbthread(void *d){
usleep(1000);
continue;
}
ssize_t n = read(s->fd, buf, 511);
DBG("read %zd from %d, unlock", n, s->fd);
ssize_t n = read(s->fd, buf, buflen);
//DBG("read %zd from fd=%d, unlock", n, s->fd);
//DBG("buf=%s", buf);
pthread_mutex_unlock(&s->mutex);
if(n < 1){
WARNX(_("Server disconnected"));
@@ -233,6 +248,32 @@ static void *serverthread(void _U_ *d){
// ZERO - listening server socket
poll_set[0].fd = sockfd;
poll_set[0].events = POLLIN;
// disconnect client
void disconnect_(sl_sock_t *c, int N){
DBG("client \"%s\" (fd=%d) disconnected", c->IP, c->fd);
if(disconnect_handler) disconnect_handler(c);
pthread_mutex_lock(&c->mutex);
DBG("close fd %d", c->fd);
c->connected = 0;
close(c->fd);
sl_RB_clearbuf(c->buffer);
// now move all data of last client to disconnected
if(nfd > 2 && N != nfd - 1){ // don't move the only or the last
DBG("lock fd=%d", clients[nfd-1]->fd);
pthread_mutex_lock(&clients[nfd-1]->mutex);
clients[N] = clients[nfd-1];
clients[nfd-1] = c;
DBG("unlock fd=%d", clients[N]->fd);
pthread_mutex_unlock(&clients[N]->mutex); // now N is nfd-1
poll_set[N] = poll_set[nfd - 1];
}
DBG("unlock fd=%d", c->fd);
pthread_mutex_unlock(&c->mutex);
--nfd;
}
// allocate buffer with size not less than RB size
size_t bufsize = s->buffer->length; // as RB should be 1 byte less, this is OK
uint8_t *buf = MALLOC(uint8_t, bufsize);
while(s && s->connected){
poll(poll_set, nfd, 1);
if(poll_set[0].revents & POLLIN){ // check main for accept()
@@ -240,10 +281,9 @@ static void *serverthread(void _U_ *d){
socklen_t len = sizeof(struct sockaddr);
int client = accept(sockfd, &a, &len);
DBG("New connection, nfd=%d, len=%d", nfd, len);
LOGMSG("SERVER got connection, fd=%d", client);
if(nfd == maxclients + 1){
WARNX(_("Limit of connections reached"));
if(toomuchclients) toomuchclients(client);
if(toomuch_handler) toomuch_handler(client);
close(client);
}else{
memset(&poll_set[nfd], 0, sizeof(struct pollfd));
@@ -262,6 +302,7 @@ static void *serverthread(void _U_ *d){
*c->IP = 0;
}
DBG("got IP:%s", c->IP);
if(newconnect_handler) newconnect_handler(c);
if(!c->buffer){ // allocate memory for client's ringbuffer
DBG("allocate ringbuffer");
c->buffer = sl_RB_new(s->buffer->length); // the same size as for master
@@ -269,55 +310,47 @@ static void *serverthread(void _U_ *d){
++nfd;
}
}
#define SBUFSZ (SL_KEY_LEN+SL_VAL_LEN+2)
uint8_t buf[SBUFSZ];
// scan connections
for(int fdidx = 1; fdidx < nfd; ++fdidx){
if((poll_set[fdidx].revents & POLLIN) == 0) continue;
int fd = poll_set[fdidx].fd;
sl_sock_t *c = clients[fdidx];
pthread_mutex_lock(&c->mutex);
int nread = sl_RB_freesize(c->buffer);
if(nread > SBUFSZ) nread = SBUFSZ;
else if(nread < 1){
size_t nread = sl_RB_freesize(c->buffer);
if(nread > bufsize) nread = bufsize;
else if(nread < 1){ // no space in ringbuffer
pthread_mutex_unlock(&c->mutex);
// check for RB overflow
if(sl_RB_hasbyte(c->buffer, '\n') < 0){ // -1 - buffer empty (can't be), -2 - buffer overflow
WARNX(_("Server thread: ring buffer overflow for fd=%d"), fd);
disconnect_(c, fdidx);
--fdidx;
}
continue;
}
ssize_t got = read(fd, buf, nread);
DBG("can read %zd bytes, got %zd bytes, buf=_%s_", nread, got, buf);
pthread_mutex_unlock(&c->mutex);
if(got <= 0){ // client disconnected
DBG("client \"%s\" (fd=%d) disconnected", c->IP, fd);
pthread_mutex_lock(&c->mutex);
DBG("close fd %d", fd);
c->connected = 0;
close(fd);
sl_RB_clearbuf(c->buffer);
// now move all data of last client to disconnected
if(nfd > 2 && fdidx != nfd - 1){ // don't move the only or the last
DBG("lock fd=%d", clients[nfd-1]->fd);
pthread_mutex_lock(&clients[nfd-1]->mutex);
clients[fdidx] = clients[nfd-1];
clients[nfd-1] = c;
DBG("unlock fd=%d", clients[fdidx]->fd);
pthread_mutex_unlock(&clients[fdidx]->mutex); // now fdidx is nfd-1
poll_set[fdidx] = poll_set[nfd - 1];
}
DBG("unlock fd=%d", c->fd);
pthread_mutex_unlock(&c->mutex);
--nfd;
disconnect_(c, fdidx);
--fdidx;
}else{
sl_RB_write(c->buffer, buf, got);
if(sl_RB_write(c->buffer, buf, got) < (size_t) got){
WARNX(_("Server thread: can't write data to ringbuffer: overflow from fd=%d"), fd);
disconnect_(c, fdidx);
--fdidx;
}
}
}
// and now check all incoming buffers
for(int fdidx = 1; fdidx < nfd; ++fdidx){
sl_sock_t *c = clients[fdidx];
if(!c->connected) continue;
ssize_t got = sl_RB_readline(c->buffer, (char*)buf, SBUFSZ);
ssize_t got = sl_RB_readline(c->buffer, (char*)buf, bufsize);
if(got < 0){ // buffer overflow
ERRX(_("Server thread: buffer overflow from \"%s\""), c->IP);
sl_RB_clearbuf(c->buffer);
WARNX(_("Server thread: buffer overflow from fd=%d"), c->fd);
disconnect_(c, fdidx);
--fdidx;
continue;
}else if(got == 0) continue;
sl_sock_hresult_e r = msgparser(c, (char*)buf);
@@ -325,6 +358,7 @@ static void *serverthread(void _U_ *d){
}
}
// clear memory
FREE(buf);
FREE(poll_set);
for(int i = maxclients; i > 0; --i){
DBG("Clear %dth client data", i);
@@ -360,6 +394,7 @@ static sl_sock_t *sl_sock_open(sl_socktype_e type, const char *path, sl_sock_hit
struct addrinfo ai = {0}, *res = &ai;
struct sockaddr_un unaddr = {0};
char *str = NULL;
ai.ai_socktype = SOCK_STREAM;
switch(type){
case SOCKT_UNIX:
str = convunsname(path);
@@ -369,7 +404,7 @@ static sl_sock_t *sl_sock_open(sl_socktype_e type, const char *path, sl_sock_hit
ai.ai_addrlen = sizeof(unaddr);
memcpy(unaddr.sun_path, str, 106);
ai.ai_family = AF_UNIX;
ai.ai_socktype = SOCK_SEQPACKET;
//ai.ai_socktype = SOCK_SEQPACKET;
break;
case SOCKT_NET:
case SOCKT_NETLOCAL:
@@ -514,10 +549,18 @@ ssize_t sl_sock_sendbinmessage(sl_sock_t *socket, const uint8_t *msg, size_t l){
DBG("lock");
pthread_mutex_lock(&socket->mutex);
DBG("SEND");
ssize_t r = send(socket->fd, msg, l, MSG_NOSIGNAL);
ssize_t sent = 0;
do{
ssize_t r = send(socket->fd, msg+sent, l, MSG_NOSIGNAL);
if(r < 0){
sent = -1;
break;
}else sent += r;
DBG("sent %zd bytes", r);
} while((size_t)sent != l);
DBG("unlock");
pthread_mutex_unlock(&socket->mutex);
return r;
return sent;
}
ssize_t sl_sock_sendstrmessage(sl_sock_t *socket, const char *msg){
@@ -589,7 +632,7 @@ sl_sock_hresult_e sl_sock_strhandler(sl_sock_t *client, sl_sock_hitem_t *hitem,
char buf[SL_VAL_LEN + SL_KEY_LEN + 3];
sl_sock_string_t *s = (sl_sock_string_t*) hitem->data;
if(!str){ // getter
sprintf(buf, "%s=%s\n", hitem->key, s->val);
snprintf(buf, SL_VAL_LEN + SL_KEY_LEN + 2, "%s=%s\n", hitem->key, s->val);
sl_sock_sendstrmessage(client, buf);
return RESULT_SILENCE;
}
@@ -598,5 +641,6 @@ sl_sock_hresult_e sl_sock_strhandler(sl_sock_t *client, sl_sock_hitem_t *hitem,
s->len = l;
s->timestamp = sl_dtime();
memcpy(s->val, str, l);
s->val[l] = 0;
return RESULT_OK;
}