diff --git a/main.c b/main.c index 7b33ac3..82e0469 100644 --- a/main.c +++ b/main.c @@ -57,7 +57,7 @@ onion_connection_status get(_U_ onion_handler *h, onion_request *req, onion_resp return OCS_CLOSE_CONNECTION; } -static onion *os = NULL;//, *ow = NULL; +static onion *os = NULL, *ow = NULL; static int STOP = 0; void signals(int signo){ signal(signo, SIG_IGN); @@ -65,9 +65,9 @@ void signals(int signo){ if(os){ onion_free(os); } - /*if(ow){ + if(ow){ onion_free(ow); - }*/ + } closeSQLite(); sleep(1); exit(signo); @@ -77,7 +77,7 @@ void signals(int signo){ static void *runPostGet(_U_ void *data){ FNAME(); if(STOP) return NULL; - os = onion_new(O_POOL); + os = onion_new(O_THREADED); if(!(onion_flags(os) & O_SSL_AVAILABLE)){ ONION_ERROR("SSL support is not available"); signals(1); @@ -94,13 +94,37 @@ static void *runPostGet(_U_ void *data){ onion_url_add_with_data(url, "", onion_shortcut_internal_redirect, "static/index.html", NULL); onion_url_add(url, "^auth/", auth); onion_url_add(url, "^get/", get); - onion_url_add(url, "ws", websocket_run); + onion_url_add(url, "^ws/", websocket_run); error = onion_listen(os); if(error) ONION_ERROR("Can't create POST/GET server: %s", strerror(errno)); onion_free(os); return NULL; } - +/* +static void *runWS(_U_ void *data){ + FNAME(); + if(STOP) return NULL; + ow = onion_new(O_THREADED); + DBG("here"); + if(!(onion_flags(ow) & O_SSL_AVAILABLE)){ + ONION_ERROR("SSL support is not available"); + signals(1); + } + int error = onion_set_certificate(ow, O_SSL_CERTIFICATE_KEY, G.certfile, G.keyfile); + if(error){ + ONION_ERROR("Can't set certificate and key files (%s, %s)", G.certfile, G.keyfile); + signals(1); + } + DBG("WS @ port %s", G.wsport); + onion_set_port(ow, G.wsport); + onion_url *url = onion_root_url(ow); + onion_url_add(url, "", websocket_run); + error = onion_listen(ow); + if(error) ONION_ERROR("Can't create WS server: %s", strerror(errno)); + onion_free(ow); + return NULL; +} +*/ static void runServer(){ signal(SIGTERM, signals); signal(SIGINT, signals); @@ -108,10 +132,13 @@ static void runServer(){ signal(SIGTSTP, SIG_IGN); signal(SIGHUP, SIG_IGN); // if(G.logfilename) Cl_createlog(); - pthread_t pg_thread, main_thread; + pthread_t pg_thread, main_thread;//, ws_thread; if(pthread_create(&pg_thread, NULL, runPostGet, NULL)){ ERR("pthread_create()"); } + /*if(pthread_create(&ws_thread, NULL, runWS, NULL)){ + ERR("pthread_create()"); + }*/ if(pthread_create(&main_thread, NULL, runMainProc, NULL)){ ERR("pthread_create()"); } @@ -126,6 +153,15 @@ static void runServer(){ ERR("pthread_create()"); } } + /* if(pthread_kill(ws_thread, 0) == ESRCH){ // server died + WARNX("WS thread died"); + putlog("WS thread died"); + pthread_join(ws_thread, NULL); + if(pthread_create(&ws_thread, NULL, runWS, NULL)){ + putlog("pthread_create() failed"); + ERR("pthread_create()"); + } + }*/ if(pthread_kill(main_thread, 0) == ESRCH){ // server died WARNX("Main thread died"); putlog("Main thread died"); diff --git a/netproto.c b/netproto.c index ffdddb6..285b23d 100644 --- a/netproto.c +++ b/netproto.c @@ -25,16 +25,28 @@ #include #include +const char NormClose[2] = {0x10, 0x00}; // Normal close + typedef struct list_{ - onion_websocket *ws; - struct list_ *next; - struct list_ *prev; - struct list_ *last; + onion_websocket *ws; // websocket + pthread_mutex_t mutex; // writing mutex + struct list_ *next; // next in list } WSlist; +// total amount of connected clients +static int Nconnected = 0; +// list of websockets static WSlist *wslist = NULL; +// mutex for main proc static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; -static char message[256]; + +// get the latest WS in list +static WSlist *getlast(){ + if(!wslist) return NULL; + WSlist *l = wslist; + while(l->next) l = l->next; + return l; +} /** * @brief runMainProc - main data process @@ -42,6 +54,7 @@ static char message[256]; * @return */ void *runMainProc(_U_ void *data){ + char message[256]; while(1){ int changed = 0; pthread_mutex_lock(&mutex); @@ -64,31 +77,43 @@ void *runMainProc(_U_ void *data){ * @param signal (i) - command received */ void process_WS_signal(onion_websocket *ws, char *signal){ + char buf[256]; char *eq = strchr(signal, '='); if(eq){ *eq++ = 0; - onion_websocket_printf(ws, "parameter: '%s', its value: '%s'", signal, eq); + snprintf(buf, 256, "parameter: '%s', its value: '%s'", signal, eq); + send_one_WS(ws, buf); }else{ - onion_websocket_printf(ws, "Echo: %s", signal); + snprintf(buf, 256, "Echo: %s", signal); + send_one_WS(ws, buf); } } /** * @brief register_WS - add recently opened websocket to common list * @param ws - websocket + * @return 0 if all OK */ -void register_WS(onion_websocket *ws){ - green("Add NEW websocket\n"); - if(!wslist){ - wslist = MALLOC(WSlist, 1); - wslist->ws = ws; - wslist->last = wslist; - }else{ - wslist->last->next = MALLOC(WSlist, 1); - wslist->last->next->ws = ws; - wslist->last->next->prev = wslist->last; - wslist->last = wslist->last->next; +int register_WS(onion_websocket *ws){ + if(Nconnected >= MAX_WSCLIENTS){ // no more connected + onion_websocket_printf(ws, "Maximum of %d clientsalready reached", MAX_WSCLIENTS); + onion_websocket_close(ws, NormClose); + return 1; } + DBG("Add NEW websocket\n"); + WSlist **curr = NULL; + if(!wslist){ // the first element + curr = &wslist; + }else{ + WSlist *last = getlast(); + curr = &last->next; + } + WSlist *tmp = MALLOC(WSlist, 1); + tmp->ws = ws; + pthread_mutex_init(&tmp->mutex, NULL); + *curr = tmp; + ++Nconnected; + return 0; } /** @@ -98,42 +123,82 @@ void register_WS(onion_websocket *ws){ void send_all_WS(char *data){ if(strlen(data) == 0) return; // zero length WSlist *l = wslist; - if(!l) return; - int cnt = 0; - unregister_WS(); // check for dead ws + if(!l){ + DBG("list is empty"); + return; + } + //int cnt = 0; while(l){ //DBG("try to send"); - if(onion_websocket_printf(l->ws, "%s", data) <= 0){ // dead websocket? remove it from list - DBG("Error printing - check for dead"); - unregister_WS(); - }else ++cnt; + if(0 == pthread_mutex_lock(&l->mutex)){ + if(onion_websocket_printf(l->ws, "%s", data) <= 0){ // dead websocket? remove it from list + DBG("Error printing - check for dead"); + pthread_mutex_unlock(&l->mutex); + unregister_WS(l->ws); + }//else ++cnt; + pthread_mutex_unlock(&l->mutex); + }else DBG("CANT LOCK"); l = l->next; } //DBG("Send message %s to %d clients", data, cnt); } -void unregister_WS(){ +void send_one_WS(onion_websocket *ws, char *data){ + if(strlen(data) == 0) return; // zero length WSlist *l = wslist; + if(!l) return; while(l){ - if(l->ws->req->connection.fd < 0){ - red("Found dead WS, remove it\n"); - if(l->prev){ - //DBG("l->prev->next = l->next"); - l->prev->next = l->next; - } - if(l->next){ - //DBG("l->next->prev = l->prev"); - l->next->prev = l->prev; - }else{ - //DBG("wslist->last = l->prev"); - wslist->last = l->prev; // we should delete last element - } - WSlist *nxt = l->next; - if(l == wslist) wslist = nxt; // delete the first element - FREE(l); - l = nxt; - continue; + if(l->ws == ws){ + if(0 == pthread_mutex_lock(&l->mutex)){ + if(onion_websocket_printf(l->ws, "%s", data) <= 0){ // dead websocket? remove it from list + DBG("Error printing - check for dead"); + pthread_mutex_unlock(&l->mutex); + unregister_WS(ws); + } + pthread_mutex_unlock(&l->mutex); + }else DBG("CANT LOCK"); + break; } l = l->next; } } + +// remove ws from list +void unregister_WS(onion_websocket *ws){ + WSlist *l = wslist, *prev = NULL; + while(l){ + if(l->ws == ws){ + pthread_mutex_lock(&l->mutex); + WSlist *next = l->next; + if(l == wslist) wslist = next; // delete the first element + else if(prev) prev->next = next; + pthread_mutex_destroy(&l->mutex); + FREE(l); + --Nconnected; + return; + } + prev = l; + l = l->next; + } +} + +// find bad websockets and remove them from list +void cleanup_WS(){ + WSlist *l = wslist, *prev = NULL; + while(l){ + if(onion_websocket_get_opcode(l->ws) == OWS_CONNECTION_CLOSE){ // closed + DBG("Found dead WS, remove it\n"); + pthread_mutex_lock(&l->mutex); + WSlist *next = l->next; + if(l == wslist) wslist = next; // delete the first element + else if(prev) prev->next = next; + pthread_mutex_destroy(&l->mutex); + FREE(l); + --Nconnected; + l = next; + continue; + } + prev = l; + l = l->next; + } +} diff --git a/netproto.h b/netproto.h index 9b6bf19..e1a33e2 100644 --- a/netproto.h +++ b/netproto.h @@ -21,12 +21,17 @@ #include "websockets.h" +// not more than 8 (or need to patch libonion) +#define MAX_WSCLIENTS (5) + void *runMainProc(void *data); void process_WS_signal(onion_websocket *ws, char *signal); -void register_WS(onion_websocket *ws); -void unregister_WS(); +int register_WS(onion_websocket *ws); +void unregister_WS(onion_websocket *ws); +void cleanup_WS(); void send_all_WS(char *data); +void send_one_WS(onion_websocket *ws, char *data); #define NETPROTO_H__ #endif // NETPROTO_H__ diff --git a/static/auth.js b/static/auth.js index 794889f..7afdbd6 100644 --- a/static/auth.js +++ b/static/auth.js @@ -68,16 +68,27 @@ auth = function(){ } // websockets var ws; + var reopenTimer; function wsinit(){ delete(ws); - ws = new WebSocket('wss://localhost:8080/ws'); - ws.onopen = function(){ws.send("Akey="+wsKey);}; // send key after init + console.log("Try to connect WS"); + ws = new WebSocket('wss://localhost:8080/ws/'); + clearTimeout(reopenTimer); + reopenTimer = setTimeout(wsinit, 5000); + ws.onopen = function(){ // send key after init + clearTimeout(reopenTimer); + console.log("WS connected"); + ws.send("Akey="+wsKey); + } ws.onclose = function(evt){ - var text = "WebSocket closed: "; - if(evt.wasClean) text += "by remote side"; - else text += "connection lost" - $('wsmsgs').innerHTML = text; - }; + var text = "websocket closed"; + if(evt.wasClean) text += " by remote side"; + else text += ", connection lost" + parseErr(text); + console.log("WS closed"); + clearTimeout(reopenTimer); + reopenTimer = setTimeout(wsinit, 1000); + } ws.onmessage = function(evt){ $('wsmsgs').innerHTML = evt.data; } diff --git a/static/requests.js b/static/requests.js index d42c0ad..5ff4de3 100644 --- a/static/requests.js +++ b/static/requests.js @@ -31,13 +31,15 @@ function sendrequest(req_STR, onOK, postdata){ } } request.send(postdata); - timeout_id = setTimeout(function(){request.onreadystatechange=null; request.abort(); parseErr("request timeout");}, 5000); + timeout_id = setTimeout(function(){request.onreadystatechange = null; request.abort(); parseErr("Request timeout");}, 5000); } +var errtmout; function parseErr(txt){ console.log("Error: " + txt); var msgDiv = $('errmsg'); if(!msgDiv) return; msgDiv.innerHTML = "Error: " + txt; - setTimeout(function(){msgDiv.innerHTML = "";}, 3500); + clearTimeout(errtmout); + errtmout = setTimeout(function(){msgDiv.innerHTML = " ";}, 3500); } diff --git a/websockets.c b/websockets.c index ddeac53..6b00ff5 100644 --- a/websockets.c +++ b/websockets.c @@ -56,10 +56,14 @@ static onion_connection_status websocket_cont(void *data, onion_websocket *ws, s if(dlen > BUFLEN) dlen = BUFLEN; int len = onion_websocket_read(ws, tmp, dlen); + if(OWS_CONNECTION_CLOSE == onion_websocket_get_opcode(ws)){ + unregister_WS(ws); + return OCS_CLOSE_CONNECTION; + } if(!len) return OCS_NEED_MORE_DATA; if(len < 0){ ONION_ERROR("Error reading data: %d: %s (%d)", errno, strerror(errno), dlen); - unregister_WS(); + unregister_WS(ws); return OCS_CLOSE_CONNECTION; } tmp[len] = 0; @@ -72,7 +76,7 @@ static onion_connection_status websocket_cont(void *data, onion_websocket *ws, s session = getSession(key); } if(!session){ - onion_websocket_printf(ws, AUTH_ANS_NEEDAUTH); + send_one_WS(ws, AUTH_ANS_NEEDAUTH); WARNX("Wrong websocket session ID"); return OCS_FORBIDDEN; } @@ -83,14 +87,14 @@ static onion_connection_status websocket_cont(void *data, onion_websocket *ws, s uint64_t UAhash = MurmurOAAT64(onion_dict_get(json, "User-Agent")); uint64_t IPhash = MurmurOAAT64(onion_dict_get(json, "User-IP")); if(wsdata->IPhash != IPhash || wsdata->UAhash != UAhash){ - onion_websocket_printf(ws, AUTH_ANS_WRONGIP); + send_one_WS(ws, AUTH_ANS_WRONGIP); WARNX("Websocket IP/UA are wrong"); return OCS_FORBIDDEN; } - red("WSdata checked!\n"); + DBG("WSdata checked!\n"); onion_dict_free(json); }else{ - onion_websocket_printf(ws, AUTH_ANS_NOUSERDATA); + send_one_WS(ws, AUTH_ANS_NOUSERDATA); WARNX("No user IP and/or UA in database"); return OCS_FORBIDDEN; } @@ -102,22 +106,21 @@ static onion_connection_status websocket_cont(void *data, onion_websocket *ws, s } onion_connection_status websocket_run(_U_ void *data, onion_request *req, onion_response *res){ - FNAME(); onion_websocket *ws = onion_websocket_new(req, res); if (!ws){ - DBG("Processed"); return OCS_PROCESSED; } - DBG("WS ready"); + if(register_WS(ws)){ + return OCS_CLOSE_CONNECTION; + } const char *host = onion_request_get_client_description(req); const char *UA = onion_request_get_header(req, "User-Agent"); - green("Got WS connection from %s (UA: %s)\n", host, UA); + DBG("Got WS connection from %s (UA: %s)\n", host, UA); WSdata *wsdata = calloc(1, sizeof(WSdata)); wsdata->flags = WS_FLAG_NOTAUTHORIZED; wsdata->IPhash = MurmurOAAT64(host); wsdata->UAhash = MurmurOAAT64(UA); onion_websocket_set_userdata(ws, (void*)wsdata, free); onion_websocket_set_callback(ws, websocket_cont); - register_WS(ws); return OCS_WEBSOCKET; }