From b28832fe41a4246d341368b75e5b6bd06ead1d14 Mon Sep 17 00:00:00 2001 From: Edward Emelianov Date: Thu, 21 May 2026 16:31:59 +0300 Subject: [PATCH] seems like it works --- Daemons/weather_logger/server.c | 305 ++++++++++++++++++++++++++++---- 1 file changed, 272 insertions(+), 33 deletions(-) diff --git a/Daemons/weather_logger/server.c b/Daemons/weather_logger/server.c index b57c553..423baae 100644 --- a/Daemons/weather_logger/server.c +++ b/Daemons/weather_logger/server.c @@ -27,9 +27,14 @@ #include "server.h" +// max available delta for timestamp if it's in future to current time +// USE NTP everywhere!!11111 +#define TIME_IN_FUTURE_MAX (60) + // some "standard" keys from server static const char *key_pluginname = "PLUGIN"; static const char *key_nvalues = "NVALUES"; +static const char *key_timestamp = "TWEATH"; // sensor's db filename mask: DBpath/weatherXX.log static const char *dbfilename_mask = "%s/weather%02d.log"; @@ -46,10 +51,15 @@ typedef struct{ int fd; // log file descriptor int idx; // index of station for requests int nvalues; // maximal amount of values + bool initialized; // `sensor` is fully initialized char path[PATH_MAX];// path to file char *sensname; // sensor's name char **keys; // keyword names for header and index in string int *levels; // array of `weather level` for each keyword + time_t lasttimestamp;// timestamp of last received data + char buf[BUFSIZ]; // string buffer for file writing operations + int buflen; // current length of `buf` + int lastvalidx; // index of last value stored in `buf` } senslog_t; static int Nsensors = 0; // amount of sensors for logging @@ -82,6 +92,10 @@ static void sensors_delete(){ if(!sensors) return; for(int i = 0; i < Nsensors; ++i){ senslog_t *sensor = &sensors[i]; + if(sensor->fd > -1){ + close(sensor->fd); + sensor->fd = -1; + } delete_senskeys(sensor); FREE(sensor->levels); FREE(sensor->sensname); @@ -105,6 +119,25 @@ void set_nettimeout(double dt){ net_timeout = dt; } +/** + * @brief write2fd - tries to write into file; in case of error close this file and set `sensor->fd=-1` + * @param sensor - "sensor" to write + * @param str - input string + * @param len - length of string + * @return true if all OK + */ +static bool write2fd(senslog_t *sensor, const char *str, size_t len){ + if(!sensor || !str || len < 1) return false; + if(write(sensor->fd, str, len) != (ssize_t)len){ + LOGERR("Can't write '%s' to file %s: %s", str, sensor->path, strerror(errno)); + WARNX("Can't write data to file"); + close(sensor->fd); + sensor->fd = -1; + return false; + } + return true; +} + /** * @brief send_request - send request to server * @param cmd - command index @@ -112,12 +145,38 @@ void set_nettimeout(double dt){ */ static bool send_request(sl_sock_t *sock, const char *req){ if(sl_sock_sendstrmessage(sock, req) < 1){ + WARNX("Can't send request '%s'", req); LOGERR("Can't send request '%s'", req); return false; } return true; } +/** + * @brief get_answer_line - read one line of server's answer with timeout + * @param sock - socket to read + * @param buf - input buffer + * @param buflen - its length + * @return length of receiving data, 0 if nothing received, -1 in case of error (and clear all sensor's list) + */ +static ssize_t get_answer_line(sl_sock_t *sock, char *buf, size_t buflen){ + FNAME(); + double t0 = sl_dtime(); + DBG("tstart=%g", t0); + ssize_t len = -1; + while(sl_dtime() - t0 < net_timeout){ + len = sl_sock_readline(sock, buf, buflen-1); + if(len > 0) break; + else if(len < 0){ + WARNX("Seems like server disconnected"); + LOGWARN("Error reading server answer, disconnected?"); + sensors_delete(); + } + } + DBG("len=%zd, tend=%g", len, sl_dtime()); + return len; +} + // reinit logs at start or after rotating // @return false if failed bool reinit_logs(){ @@ -150,11 +209,9 @@ static bool find_old_file(senslog_t *sensor){ if(sensor->fd > -1){ DBG("found opened file %s with fd %d -> close", sensor->path, sensor->fd); close(sensor->fd); - return false; // we need to open new file after logrotating } regex_t regex; - // Compile regex if(regcomp(®ex, dbfilename_regex, REG_EXTENDED | REG_NOSUB) != 0){ LOGERR("find_old_file(): error in regcomp(), %s", strerror(errno)); @@ -191,6 +248,7 @@ static bool find_old_file(senslog_t *sensor){ continue; } const char *stname = line + 2; // station name from comment + DBG("Check name: '%s' and '%s'", stname, sensor->sensname); if(0 == strcmp(stname, sensor->sensname)){ // good, we found this file! DBG("Found existant file %s -> append to it", fname); int newfd = open(fname, O_WRONLY | O_APPEND); @@ -213,9 +271,14 @@ static bool find_old_file(senslog_t *sensor){ } // create new database file in `DBpath` +// `sensor` obligate to be fully initialized static bool create_db_file(senslog_t *sensor){ FNAME(); - int num = 0; + if(!sensor || !sensor->initialized){ + WARNX("create_db_file() should be called only with fully initialized `sensor`"); + return false; + } + int num = sensor->idx; // try to start from sensor's index char path[PATH_MAX]; for(; num <= 99; ++num){ snprintf(path, PATH_MAX, dbfilename_mask, DBpath, num); @@ -236,19 +299,67 @@ static bool create_db_file(senslog_t *sensor){ return false; } DBG("OK, %s opened, try to write header", path); - int len = snprintf(path, PATH_MAX, "# %s\n", sensor->sensname); - if(write(newfd, path, len) != len){ - LOGERR("Can't write sensor's name '%s' to file %s: %s", sensor->sensname, path, strerror(errno)); - WARNX("Can't write header"); - close(newfd); - return false; - } - DBG("%s now have descriptor %d: %s", sensor->sensname, newfd, path); sensor->fd = newfd; + ssize_t len = snprintf(path, PATH_MAX, "# %s\n", sensor->sensname); + if(!write2fd(sensor, path, len)) return false; + len = snprintf(path, PATH_MAX, "# Station #%d, format: KEYWORD[level],...\n# TIMESTAMP, ", sensor->idx); + if(!write2fd(sensor, path, len)) return false; + char *ptr = path; + len = 0; + for(int i = 0; i < sensor->nvalues; ++i){ + ssize_t L = snprintf(ptr, PATH_MAX-len, "%s%s[%d]", i ? ", " : "", sensor->keys[i], sensor->levels[i]); + len += L; + ptr += L; + } + len += snprintf(ptr, PATH_MAX-len, "\n"); + if(!write2fd(sensor, path, len)) return false; + DBG("%s now have descriptor %d: %s", sensor->sensname, newfd, path); + return true; } -static bool get_sensor_keys(senslog_t _U_ *sensor, sl_sock_t _U_ *sock){ +/** + * @brief get_sensor_keys - fill `sensor->keys` and `sensor->levels` using `chklevel` request + * @param sensor - "sensor" with inited `nvalues`, `idx` and `sensname`; allocated `keys` and `levels` + * @param sock - socket for request + * @return + */ +static bool get_sensor_keys(senslog_t *sensor, sl_sock_t *sock){ + if(!sensor || !sock) return false; + char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN]; + snprintf(str, BUFSIZ, "%s=%d\n", commands[CMD_CHKLEVEL], sensor->idx); + if(!send_request(sock, str)){ + LOGERR("Can't send request '%s'", commands[CMD_CHKLEVEL]); + return false; + } + ssize_t got = -1; + int valueidx = 0; + while(valueidx < sensor->nvalues && (got = get_answer_line(sock, str, BUFSIZ)) > 0 ){ + DBG("valueidx=%d, nvalues=%d, got answer: %s", valueidx, sensor->nvalues, str); + if(2 != sl_get_keyval(str, key, value)){ + LOGWARN("Wrong answer for `chklevel`: %s", str); + continue; + } + DBG("key=%s", key); + int sensidx; + if(2 != sscanf(key, "[%[^]]][%d]", str, &sensidx)){ + DBG("header? '%s' (str=%s)", key, str); + continue; // omit two heading lines: `PLUGIN` and `NVALUES` + } + DBG("str=%s, sensidx=%d", str, sensidx); + if(sensor->idx != sensidx){ + LOGWARN("Got wrong key answer for sensor with idx=%d: '%s'", sensor->idx, key); + continue; + } + // OK - it's our value, increment number and fill name + sensor->keys[valueidx] = strdup(str); + sensor->levels[valueidx] = atoi(value); + DBG("now sensor->keys[%d]=%s, levels=%d", valueidx, sensor->keys[valueidx], sensor->levels[valueidx]); + ++valueidx; + } + if(got < 0 || valueidx != sensor->nvalues) return false; + sensor->initialized = true; + sensor->lastvalidx = -1; return true; } @@ -259,7 +370,7 @@ static bool prepare_files(sl_sock_t *sock){ if(!sock || !DBpath || Nsensors < 1) return false; for(int i = 0; i < Nsensors; ++i){ senslog_t *sensor = &sensors[i]; - if(!get_sensor_keys(sensor, sock)) return false; + if(!sensor->initialized && !get_sensor_keys(sensor, sock)) return false; DBG("Check if there's something for sensor[%d]", i); if(!find_old_file(sensor)){ // create new file DBG("Nothing found, try to create new"); @@ -276,17 +387,8 @@ static bool prepare_files(sl_sock_t *sock){ static bool analyse_list(sl_sock_t *sock){ FNAME(); char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN]; - double tlast = sl_dtime(); - while(sl_dtime() - tlast < net_timeout){ - ssize_t len = sl_sock_readline(sock, str, BUFSIZ-1); - if(len == 0) continue; - if(len < 0){ - WARNX("Seems like server disconnected"); - LOGWARN("Server disconnected?"); - sensors_delete(); - return false; - } - tlast = sl_dtime(); + ssize_t got; + while((got = get_answer_line(sock, str, BUFSIZ)) > 0){ DBG("Got answer: %s", str); if(2 != sl_get_keyval(str, key, value)){ LOGWARN("Wrong answer from meteodaemon for 'list' request: %s", str); @@ -332,6 +434,7 @@ static bool analyse_list(sl_sock_t *sock){ sensor->levels = MALLOC(int, nvalues); } } + if(got < 0) return false; // disconnection? // now check all we got if(Nsensors < 1){ LOGWARN("Found 0 sensors in server's answer"); @@ -352,10 +455,6 @@ static bool analyse_list(sl_sock_t *sock){ return ans; } -/*for(int i = 0; i < nvalues; ++i){ - sensor->keys[i] = ; - }*/ - // prepare DB files at start static bool prepare_logfiles(sl_sock_t *sock, const char *path){ FNAME(); @@ -370,7 +469,7 @@ static bool prepare_logfiles(sl_sock_t *sock, const char *path){ DBG("Store files in %s; send `list` request", DBpath); snprintf(buf, 255, "%s\n", commands[CMD_LIST]); if(!send_request(sock, buf)){ - WARNX("Can't send inited request"); + LOGERR("Can't send inited request"); return false; } // now we have an answer: 2*N strings a la "PLUGIN[i]=...\nNVALUES[i]=...\n" -> @@ -379,6 +478,131 @@ static bool prepare_logfiles(sl_sock_t *sock, const char *path){ return true; } +// send `get=x` request for each sensor; return true if all sent OK +static bool send_get_req(sl_sock_t *sock){ + char str[128]; + if(!sock || !sensors || Nsensors < 1) return false; + for(int i = 0; i < Nsensors; ++i){ + if(sensors[i].nvalues < 1) continue; + snprintf(str, 128, "%s=%d\n", commands[CMD_GET], sensors[i].idx); + if(!send_request(sock, str)) return false; + } + return true; +} + +// get answers and fill database +// we consider that answers comes in sequence order, from least sensno to timestamp +// @return false only in case of error +static bool poll_server_answers(sl_sock_t *sock){ + char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN];; + ssize_t len = sl_sock_readline(sock, str, BUFSIZ-1); + if(len < 0) return false; // error + else if(len == 0) return true; // nothing to read + DBG("Got line: '%s'", str); + int sensidx; + if(2 != sl_get_keyval(str, key, value) || 2 != sscanf(key, "%[^[][%d]", str, &sensidx)){ + WARNX("not key=value"); + LOGWARN("Wrong answer for `get`: %s", str); + return false; + } + if(sensidx < 0 || sensidx >= Nsensors){ + WARNX("Wrong sensor index"); + LOGWARN("Got station with index (%d) out of bounds [0, %d]", sensidx, Nsensors-1); + return false; + } + // omit header + if(0 == strcmp(key_pluginname, str) || 0 == strcmp(key_nvalues, str)) return true; + senslog_t *sensor = &sensors[sensidx]; + if(0 == strcmp(key_timestamp, str)){ // check timestamps and write data to disk + time_t ts = (time_t)atol(value); + DBG("Got timestamp: %zd (last timestamp: %zd)", ts, sensor->lasttimestamp); + bool allOK = true; + if(ts > sensor->lasttimestamp){ // don't store old data + time_t curt = time(NULL); + DBG("Timestamp minus current time = %zd", ts - curt); + if(ts > curt && ts - curt > TIME_IN_FUTURE_MAX){ + WARNX("timestamp is in future"); + LOGWARN("got future timestamp for sensor %d: %ld", sensidx, ts); + allOK = false; + }else{ // all OK - we can save data + if(sensor->lastvalidx > -1){ // only if at least one keyword received + int trailingcommas = sensor->nvalues - sensor->lastvalidx - 2; // amount of commas after our last record + DBG("trailing commas = %d", trailingcommas); + if(trailingcommas > 0){ + if(sensor->buflen + 2*trailingcommas > BUFSIZ-1){ + WARNX("Sensor's buffer overfull"); + LOGWARN("Sensor %s: writing buffer overfull", sensor->idx); + }else{ // write trailing zeros to buffer + for(int i = 0; i < trailingcommas; ++i){ + sprintf(sensor->buf + sensor->buflen, ", "); + sensor->buflen += 2; + } + } + } + // write timestamp without error checking + size_t l = snprintf(str, BUFSIZ, "%zd, ", ts); + write2fd(sensor, str, l); + // now throw data to disk + DBG("throw '%s' with length %d to disk", sensor->buf, sensor->buflen); + allOK = write2fd(sensor, sensor->buf, sensor->buflen); + write2fd(sensor, "\n", 1); + sensor->lasttimestamp = ts; + DBG("Data for %d is %swritten to disk", sensor->idx, allOK ? "" : "not "); + } + } + } + // clear gathred data + sensor->buflen = 0; + sensor->lastvalidx = -1; + return allOK; + } + // now we need to find index of current keyword + int idx = 0, nvalues = sensor->nvalues; + for(; idx < nvalues; ++idx){ + if(0 == strcmp(str, sensor->keys[idx])) break; + } + DBG("key index=%d", idx); + + if(idx == nvalues){ + WARNX("Not found in keylist"); + LOGWARN("Weird keyword for station %d: '%s'", sensidx, str); + return false; + } + if(sensor->lastvalidx >= idx){ + WARNX("Missed timestamp?"); + LOGWARN("Missed timestamp for station %d?", sensidx); + sensor->lastvalidx = -1; + sensor->buflen = 0; + return false; + } + // add current raw value to sensor's buffer (there's could be a text fields, so don't make atof + char *comment = strchr(value, '/'); + if(comment){ + *comment = 0; // remove comment mark + if(comment != value && comment[-1] == ' ') comment[-1] = 0; // remove last space + } + DBG("VALUE: %s", value); + // add spaces if need + int trailingcommas = idx - sensor->lastvalidx - 1; // amount of commas after our last record before current + DBG("Commas: %d", trailingcommas); + if(trailingcommas > 0){ + if(sensor->buflen + 2*trailingcommas > BUFSIZ-1){ + WARNX("Buffer overfull"); + LOGWARN("Sensor %d: writing buffer overfull", sensor->idx); + }else{ // write trailing zeros to buffer + for(int i = 0; i < trailingcommas; ++i){ + sprintf(sensor->buf + sensor->buflen, ", "); + sensor->buflen += 2; + } + } + } + sensor->buflen += snprintf(sensor->buf + sensor->buflen, BUFSIZ-sensor->buflen, "%s%s", + value, sensor->nvalues-1 != idx ? ", " : ""); + sensor->lastvalidx = idx; + DBG("Now buf[%d]=%s", sensor->idx, sensor->buf); + return true; +} + /** * @brief run_server - run main server: send weather requests and store data * @param node - node to connect @@ -387,6 +611,7 @@ static bool prepare_logfiles(sl_sock_t *sock, const char *path){ */ void run_server(const char *node, sl_socktype_e type, const char *path){ if(!node || !path) return; + //char str[BUFSIZ]; sl_sock_t *sock = sl_sock_run_client(type, node, BUFSIZ); if(!sock){ DBG("Can't connect"); @@ -397,15 +622,29 @@ void run_server(const char *node, sl_socktype_e type, const char *path){ DBG("Superloop"); int errctr = 0; + double tlast = 0.; while(isrunning){ + bool allOK = true; if(logreinit){ - if(!prepare_files(sock)) ++errctr; + if(!prepare_files(sock)) allOK = false; else logreinit = false; } - if(errctr > 5){ - LOGERR("Too much errors -> exit"); - break; + double tnow = sl_dtime(); + if(tnow - tlast > req_interval){ // send next requests + DBG("\n\n\nSend next request; deltaT=%g", tnow - tlast); + if(!send_get_req(sock)) allOK = false; + else tlast = tnow; } + // now poll everything from server + if(!poll_server_answers(sock)) allOK = false; + ; + if(!allOK){ + if(++errctr > 5){ + LOGERR("Too much errors -> exit"); + WARNX("Too much errors -> exit"); + break; + } + }else errctr = 0; usleep(1000); } if(sock) sl_sock_delete(&sock);