mirror of
https://github.com/eddyem/small_tel.git
synced 2026-06-21 11:26:30 +03:00
Compare commits
2 Commits
bff6e06684
...
51869f0137
| Author | SHA1 | Date | |
|---|---|---|---|
| 51869f0137 | |||
| b28832fe41 |
127
Daemons/weather_logger/Readme.md
Normal file
127
Daemons/weather_logger/Readme.md
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
# meteologger
|
||||||
|
|
||||||
|
**meteologger** is a client that collects meteorological data from the
|
||||||
|
[`weatherdaemon_multimeteo`](https://github.com/eddyem/small_tel/tree/master/Daemons/weatherdaemon_multimeteo)
|
||||||
|
daemon. It connects to the daemon over a TCP or UNIX socket, periodically requests data from all
|
||||||
|
available weather stations, and saves the received values into separate files (one per station).
|
||||||
|
|
||||||
|
The project is written in C and uses the
|
||||||
|
[`usefull_macros`](https://github.com/eddyem/snippets_library) library for socket handling,
|
||||||
|
logging, argument parsing, and helper macros.
|
||||||
|
|
||||||
|
## Repository
|
||||||
|
|
||||||
|
Source code is located in the [small_tel](https://github.com/eddyem/small_tel) repository under
|
||||||
|
`Daemons/weather_logger`.
|
||||||
|
|
||||||
|
### Dependencies
|
||||||
|
|
||||||
|
- Linux (uses Linux‑specific calls: `prctl`, `fork`, `waitpid`, signals)
|
||||||
|
- CMake ≥ 4.0
|
||||||
|
- C compiler (GCC, Clang)
|
||||||
|
- `usefull_macros` library ≥ 0.3.5 (must be installed and discoverable via `pkg-config`)
|
||||||
|
|
||||||
|
### Building
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git clone --depth=1 https://github.com/eddyem/small_tel.git
|
||||||
|
cd small_tel/Daemons/weather_logger
|
||||||
|
mkdir build && cd build
|
||||||
|
cmake .. # or -DDEBUG=on for debug build
|
||||||
|
make
|
||||||
|
```
|
||||||
|
|
||||||
|
The executable `meteologger` will be placed in `build/` (or `bin/` after installation).
|
||||||
|
|
||||||
|
### Installation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
su -c "make install"
|
||||||
|
```
|
||||||
|
|
||||||
|
By default the program is installed into `bin` (usually `/usr/local/bin`).
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
```
|
||||||
|
meteologger -n <node> -o <directory> [options]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Required arguments
|
||||||
|
|
||||||
|
| Option | Long form | Description |
|
||||||
|
|--------|----------------|-------------|
|
||||||
|
| `-n` | `--node` | Node to connect to: `<host>:<port>` (e.g., `127.0.0.1:5555`) or a UNIX socket path when `-u` is given. |
|
||||||
|
| `-o` | `--output` | Directory where database files will be stored (must exist and be writable). |
|
||||||
|
|
||||||
|
### Optional arguments
|
||||||
|
|
||||||
|
| Option | Long form | Default | Description |
|
||||||
|
|--------|----------------|-----------------------------|-------------|
|
||||||
|
| `-h` | `--help` | - | Show help and exit. |
|
||||||
|
| `-u` | `--isunix` | - | Use a UNIX socket instead of TCP. |
|
||||||
|
| `-l` | `--logfile` | (none) | File to write logs |
|
||||||
|
| `-p` | `--pidfile` | `/tmp/meteologger.pid` | File where the process PID is written. |
|
||||||
|
| `-v` | `--verbose` | 0 | Increase logging verbosity (each `-v` raises the level). |
|
||||||
|
| `-i` | `--interval` | `0.5` | Request interval in seconds. Allowed range: `[0.2, 900]`. |
|
||||||
|
| `-t` | `--timeout` | `1.0` | Network timeout for server responses (seconds). Allowed range: `[0.1, 30]`. |
|
||||||
|
|
||||||
|
### Examples
|
||||||
|
|
||||||
|
1. Connect to a local daemon on port 5555 and store data in `/var/log/weather`:
|
||||||
|
```bash
|
||||||
|
meteologger -n 127.0.0.1:5555 -o /var/log/weather
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Use a UNIX socket `/tmp/weather.sock`, write PID to `/run/meteologger.pid`:
|
||||||
|
```bash
|
||||||
|
meteologger -n /tmp/weather.sock -o /data/weather -u -p /run/meteologger.pid
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Enable verbose logging to a file:
|
||||||
|
```bash
|
||||||
|
meteologger -n localhost:5555 -o /srv/weather -vv -l /var/log/meteologger.log
|
||||||
|
```
|
||||||
|
|
||||||
|
## Output file format
|
||||||
|
|
||||||
|
Files are stored in the directory given by `-o` and are named `weatherXX.log`, where `XX` is a
|
||||||
|
two‑digit station number (from `00` to `99`). If a file already exists for a station (matching the
|
||||||
|
station name in the header comment), new data is appended.
|
||||||
|
|
||||||
|
File structure:
|
||||||
|
|
||||||
|
```
|
||||||
|
# <station_name>
|
||||||
|
# Station #<number>, format: KEYWORD[level],...
|
||||||
|
# TIMESTAMP, <key1>[<level1>], <key2>[<level2>], ...
|
||||||
|
<timestamp>, <value1>, <value2>, ...
|
||||||
|
```
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
```
|
||||||
|
# WXA100-06 ultrasonic meteostation @ D:/dev/pl2303_0
|
||||||
|
# Station #0, format: KEYWORD[level],...
|
||||||
|
# TIMESTAMP, WIND[1], WINDDIR[2], HUMIDITY[1], EXTTEMP[1], PRESSURE[1], PRECIP[3], PRECIPLV[3], PRECRATE[3]
|
||||||
|
1779369905, 0.50, 120.70, 76.90, 8.40, 590.00, 1, 374.70, 0.00
|
||||||
|
1779369907, 0.60, 128.20, 76.90, 8.40, 590.00, 1, 374.70, 0.00
|
||||||
|
1779369909, 0.70, 130.30, 76.90, 8.40, 590.00, 1, 374.70, 0.00
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
- First line: comment with station name.
|
||||||
|
- Second line: station number and format description.
|
||||||
|
- Third line: column headers: `TIMESTAMP` followed by keys with their levels.
|
||||||
|
- Following lines: data – Unix timestamp (integer) and values (floating‑point, integer or string).
|
||||||
|
|
||||||
|
-----
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
Copyright © 2026 Edward V. Emelianov.
|
||||||
|
**GNU General Public License v3.0** or later.
|
||||||
|
|
||||||
|
See the `LICENSE` file in repository's root directory or [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/)
|
||||||
|
for details.
|
||||||
|
|
||||||
@@ -79,6 +79,15 @@ int main(int argc, char **argv){
|
|||||||
signal(SIGUSR1, signals); // reload DB
|
signal(SIGUSR1, signals); // reload DB
|
||||||
#ifndef EBUG
|
#ifndef EBUG
|
||||||
if(sl_daemonize()) ERRX("Can't daemonize");
|
if(sl_daemonize()) ERRX("Can't daemonize");
|
||||||
|
#endif
|
||||||
|
if(G->logfile){
|
||||||
|
sl_loglevel_e lvl = LOGLEVEL_ERR + G->verb;
|
||||||
|
if(lvl >= LOGLEVEL_AMOUNT) lvl = LOGLEVEL_AMOUNT - 1;
|
||||||
|
DBG("Loglevel: %d", lvl);
|
||||||
|
OPENLOG(G->logfile, lvl, 1);
|
||||||
|
}
|
||||||
|
LOGMSG("Started");
|
||||||
|
#ifndef EBUG
|
||||||
catchsig = INT_MAX; // now `signals` won't run exit()
|
catchsig = INT_MAX; // now `signals` won't run exit()
|
||||||
while(catchsig == INT_MAX){ // guard for dead processes
|
while(catchsig == INT_MAX){ // guard for dead processes
|
||||||
childpid = fork();
|
childpid = fork();
|
||||||
@@ -114,8 +123,10 @@ int main(int argc, char **argv){
|
|||||||
}else{
|
}else{
|
||||||
pid_t self = getpid();
|
pid_t self = getpid();
|
||||||
prepare_and_run(G);
|
prepare_and_run(G);
|
||||||
if(catchsig == INT_MAX) LOGERR("Child process %d died", self);
|
if(catchsig == INT_MAX){
|
||||||
else LOGERR("Child process %d exits with status %d", self, catchsig);
|
LOGERR("Child process %d died", self);
|
||||||
|
catchsig = 0; // like normal exit
|
||||||
|
}else LOGERR("Child process %d exits with status %d", self, catchsig);
|
||||||
; // cleanup child here
|
; // cleanup child here
|
||||||
#ifdef EBUG
|
#ifdef EBUG
|
||||||
if(G->pidfile) unlink(G->pidfile); // unlink PID-file in debug-mode
|
if(G->pidfile) unlink(G->pidfile); // unlink PID-file in debug-mode
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
CMakeLists.txt
|
CMakeLists.txt
|
||||||
|
Readme.md
|
||||||
main.c
|
main.c
|
||||||
parseargs.c
|
parseargs.c
|
||||||
parseargs.h
|
parseargs.h
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ glob_pars *parseargs(int *argc, char ***argv){
|
|||||||
// remove trailing '/'
|
// remove trailing '/'
|
||||||
int eol = strlen(G.bddir) - 1;
|
int eol = strlen(G.bddir) - 1;
|
||||||
DBG("eol=%d", eol);
|
DBG("eol=%d", eol);
|
||||||
while(eol > 0){
|
while(eol > 0){ // don't remove leading slash in case of "/"
|
||||||
DBG("before: %s", G.bddir);
|
DBG("before: %s", G.bddir);
|
||||||
if(G.bddir[eol] == '/') G.bddir[eol] = 0;
|
if(G.bddir[eol] == '/') G.bddir[eol] = 0;
|
||||||
else break;
|
else break;
|
||||||
@@ -73,10 +73,7 @@ glob_pars *parseargs(int *argc, char ***argv){
|
|||||||
if(G.net_timeout < MIN_NET_TMOUT || G.net_timeout > MAX_NET_TMOUT)
|
if(G.net_timeout < MIN_NET_TMOUT || G.net_timeout > MAX_NET_TMOUT)
|
||||||
ERRX("Wrong network timeout %g, should be in [%g, %g]", G.net_timeout, MIN_NET_TMOUT, MAX_NET_TMOUT);
|
ERRX("Wrong network timeout %g, should be in [%g, %g]", G.net_timeout, MIN_NET_TMOUT, MAX_NET_TMOUT);
|
||||||
if(G.logfile){
|
if(G.logfile){
|
||||||
sl_loglevel_e lvl = LOGLEVEL_ERR + G.verb;
|
if(*G.logfile != '/') ERRX("Logging file path should be absolute!");
|
||||||
if(lvl >= LOGLEVEL_AMOUNT) lvl = LOGLEVEL_AMOUNT - 1;
|
|
||||||
DBG("Loglevel: %d", lvl);
|
|
||||||
if(!OPENLOG(G.logfile, lvl, 1)) ERRX("Can't open log file %s", G.logfile);
|
|
||||||
}
|
}
|
||||||
return &G;
|
return &G;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,9 +27,14 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
|
// max available delta for timestamp if it's in future to current time
|
||||||
|
// USE NTP everywhere!!11111
|
||||||
|
#define TIME_IN_FUTURE_MAX (60)
|
||||||
|
|
||||||
// some "standard" keys from server
|
// some "standard" keys from server
|
||||||
static const char *key_pluginname = "PLUGIN";
|
static const char *key_pluginname = "PLUGIN";
|
||||||
static const char *key_nvalues = "NVALUES";
|
static const char *key_nvalues = "NVALUES";
|
||||||
|
static const char *key_timestamp = "TWEATH";
|
||||||
|
|
||||||
// sensor's db filename mask: DBpath/weatherXX.log
|
// sensor's db filename mask: DBpath/weatherXX.log
|
||||||
static const char *dbfilename_mask = "%s/weather%02d.log";
|
static const char *dbfilename_mask = "%s/weather%02d.log";
|
||||||
@@ -46,10 +51,15 @@ typedef struct{
|
|||||||
int fd; // log file descriptor
|
int fd; // log file descriptor
|
||||||
int idx; // index of station for requests
|
int idx; // index of station for requests
|
||||||
int nvalues; // maximal amount of values
|
int nvalues; // maximal amount of values
|
||||||
|
bool initialized; // `sensor` is fully initialized
|
||||||
char path[PATH_MAX];// path to file
|
char path[PATH_MAX];// path to file
|
||||||
char *sensname; // sensor's name
|
char *sensname; // sensor's name
|
||||||
char **keys; // keyword names for header and index in string
|
char **keys; // keyword names for header and index in string
|
||||||
int *levels; // array of `weather level` for each keyword
|
int *levels; // array of `weather level` for each keyword
|
||||||
|
time_t lasttimestamp;// timestamp of last received data
|
||||||
|
char buf[BUFSIZ]; // string buffer for file writing operations
|
||||||
|
int buflen; // current length of `buf`
|
||||||
|
int lastvalidx; // index of last value stored in `buf`
|
||||||
} senslog_t;
|
} senslog_t;
|
||||||
|
|
||||||
static int Nsensors = 0; // amount of sensors for logging
|
static int Nsensors = 0; // amount of sensors for logging
|
||||||
@@ -82,6 +92,10 @@ static void sensors_delete(){
|
|||||||
if(!sensors) return;
|
if(!sensors) return;
|
||||||
for(int i = 0; i < Nsensors; ++i){
|
for(int i = 0; i < Nsensors; ++i){
|
||||||
senslog_t *sensor = &sensors[i];
|
senslog_t *sensor = &sensors[i];
|
||||||
|
if(sensor->fd > -1){
|
||||||
|
close(sensor->fd);
|
||||||
|
sensor->fd = -1;
|
||||||
|
}
|
||||||
delete_senskeys(sensor);
|
delete_senskeys(sensor);
|
||||||
FREE(sensor->levels);
|
FREE(sensor->levels);
|
||||||
FREE(sensor->sensname);
|
FREE(sensor->sensname);
|
||||||
@@ -105,6 +119,33 @@ void set_nettimeout(double dt){
|
|||||||
net_timeout = dt;
|
net_timeout = dt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief write2fd - tries to write into file; in case of error close this file and set `sensor->fd=-1`
|
||||||
|
* @param sensor - "sensor" to write
|
||||||
|
* @param str - input string
|
||||||
|
* @param len - length of string
|
||||||
|
* @return true if all OK
|
||||||
|
*/
|
||||||
|
static bool write2fd(senslog_t *sensor, const char *str, size_t len){
|
||||||
|
if(!sensor || !str || len < 1) return false;
|
||||||
|
const char *ptr = str;
|
||||||
|
size_t written = 0;
|
||||||
|
while(len){
|
||||||
|
ssize_t l = write(sensor->fd, ptr, len);
|
||||||
|
if(l < 0) break;
|
||||||
|
len -= l;
|
||||||
|
ptr += l;
|
||||||
|
}
|
||||||
|
if(written != len){
|
||||||
|
LOGERR("Can't write '%s' to file %s: %s", str, sensor->path, strerror(errno));
|
||||||
|
WARNX("Can't write data to file");
|
||||||
|
close(sensor->fd);
|
||||||
|
sensor->fd = -1;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief send_request - send request to server
|
* @brief send_request - send request to server
|
||||||
* @param cmd - command index
|
* @param cmd - command index
|
||||||
@@ -112,12 +153,38 @@ void set_nettimeout(double dt){
|
|||||||
*/
|
*/
|
||||||
static bool send_request(sl_sock_t *sock, const char *req){
|
static bool send_request(sl_sock_t *sock, const char *req){
|
||||||
if(sl_sock_sendstrmessage(sock, req) < 1){
|
if(sl_sock_sendstrmessage(sock, req) < 1){
|
||||||
|
WARNX("Can't send request '%s'", req);
|
||||||
LOGERR("Can't send request '%s'", req);
|
LOGERR("Can't send request '%s'", req);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief get_answer_line - read one line of server's answer with timeout
|
||||||
|
* @param sock - socket to read
|
||||||
|
* @param buf - input buffer
|
||||||
|
* @param buflen - its length
|
||||||
|
* @return length of receiving data, 0 if nothing received, -1 in case of error (and clear all sensor's list)
|
||||||
|
*/
|
||||||
|
static ssize_t get_answer_line(sl_sock_t *sock, char *buf, size_t buflen){
|
||||||
|
FNAME();
|
||||||
|
double t0 = sl_dtime();
|
||||||
|
DBG("tstart=%g", t0);
|
||||||
|
ssize_t len = -1;
|
||||||
|
while(sl_dtime() - t0 < net_timeout){
|
||||||
|
len = sl_sock_readline(sock, buf, buflen-1);
|
||||||
|
if(len > 0) break;
|
||||||
|
else if(len < 0){
|
||||||
|
WARNX("Seems like server disconnected");
|
||||||
|
LOGWARN("Error reading server answer, disconnected?");
|
||||||
|
sensors_delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DBG("len=%zd, tend=%g", len, sl_dtime());
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
// reinit logs at start or after rotating
|
// reinit logs at start or after rotating
|
||||||
// @return false if failed
|
// @return false if failed
|
||||||
bool reinit_logs(){
|
bool reinit_logs(){
|
||||||
@@ -150,11 +217,9 @@ static bool find_old_file(senslog_t *sensor){
|
|||||||
if(sensor->fd > -1){
|
if(sensor->fd > -1){
|
||||||
DBG("found opened file %s with fd %d -> close", sensor->path, sensor->fd);
|
DBG("found opened file %s with fd %d -> close", sensor->path, sensor->fd);
|
||||||
close(sensor->fd);
|
close(sensor->fd);
|
||||||
return false; // we need to open new file after logrotating
|
|
||||||
}
|
}
|
||||||
|
|
||||||
regex_t regex;
|
regex_t regex;
|
||||||
|
|
||||||
// Compile regex
|
// Compile regex
|
||||||
if(regcomp(®ex, dbfilename_regex, REG_EXTENDED | REG_NOSUB) != 0){
|
if(regcomp(®ex, dbfilename_regex, REG_EXTENDED | REG_NOSUB) != 0){
|
||||||
LOGERR("find_old_file(): error in regcomp(), %s", strerror(errno));
|
LOGERR("find_old_file(): error in regcomp(), %s", strerror(errno));
|
||||||
@@ -191,6 +256,7 @@ static bool find_old_file(senslog_t *sensor){
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const char *stname = line + 2; // station name from comment
|
const char *stname = line + 2; // station name from comment
|
||||||
|
DBG("Check name: '%s' and '%s'", stname, sensor->sensname);
|
||||||
if(0 == strcmp(stname, sensor->sensname)){ // good, we found this file!
|
if(0 == strcmp(stname, sensor->sensname)){ // good, we found this file!
|
||||||
DBG("Found existant file %s -> append to it", fname);
|
DBG("Found existant file %s -> append to it", fname);
|
||||||
int newfd = open(fname, O_WRONLY | O_APPEND);
|
int newfd = open(fname, O_WRONLY | O_APPEND);
|
||||||
@@ -213,9 +279,14 @@ static bool find_old_file(senslog_t *sensor){
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create new database file in `DBpath`
|
// create new database file in `DBpath`
|
||||||
|
// `sensor` obligate to be fully initialized
|
||||||
static bool create_db_file(senslog_t *sensor){
|
static bool create_db_file(senslog_t *sensor){
|
||||||
FNAME();
|
FNAME();
|
||||||
int num = 0;
|
if(!sensor || !sensor->initialized){
|
||||||
|
WARNX("create_db_file() should be called only with fully initialized `sensor`");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
int num = sensor->idx; // try to start from sensor's index
|
||||||
char path[PATH_MAX];
|
char path[PATH_MAX];
|
||||||
for(; num <= 99; ++num){
|
for(; num <= 99; ++num){
|
||||||
snprintf(path, PATH_MAX, dbfilename_mask, DBpath, num);
|
snprintf(path, PATH_MAX, dbfilename_mask, DBpath, num);
|
||||||
@@ -236,19 +307,67 @@ static bool create_db_file(senslog_t *sensor){
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
DBG("OK, %s opened, try to write header", path);
|
DBG("OK, %s opened, try to write header", path);
|
||||||
int len = snprintf(path, PATH_MAX, "# %s\n", sensor->sensname);
|
|
||||||
if(write(newfd, path, len) != len){
|
|
||||||
LOGERR("Can't write sensor's name '%s' to file %s: %s", sensor->sensname, path, strerror(errno));
|
|
||||||
WARNX("Can't write header");
|
|
||||||
close(newfd);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
DBG("%s now have descriptor %d: %s", sensor->sensname, newfd, path);
|
|
||||||
sensor->fd = newfd;
|
sensor->fd = newfd;
|
||||||
|
ssize_t len = snprintf(path, PATH_MAX, "# %s\n", sensor->sensname);
|
||||||
|
if(!write2fd(sensor, path, len)) return false;
|
||||||
|
len = snprintf(path, PATH_MAX, "# Station #%d, format: KEYWORD[level],...\n# TIMESTAMP, ", sensor->idx);
|
||||||
|
if(!write2fd(sensor, path, len)) return false;
|
||||||
|
char *ptr = path;
|
||||||
|
len = 0;
|
||||||
|
for(int i = 0; i < sensor->nvalues; ++i){
|
||||||
|
ssize_t L = snprintf(ptr, PATH_MAX-len, "%s%s[%d]", i ? ", " : "", sensor->keys[i], sensor->levels[i]);
|
||||||
|
len += L;
|
||||||
|
ptr += L;
|
||||||
|
}
|
||||||
|
len += snprintf(ptr, PATH_MAX-len, "\n");
|
||||||
|
if(!write2fd(sensor, path, len)) return false;
|
||||||
|
DBG("%s now have descriptor %d: %s", sensor->sensname, newfd, path);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool get_sensor_keys(senslog_t _U_ *sensor, sl_sock_t _U_ *sock){
|
/**
|
||||||
|
* @brief get_sensor_keys - fill `sensor->keys` and `sensor->levels` using `chklevel` request
|
||||||
|
* @param sensor - "sensor" with inited `nvalues`, `idx` and `sensname`; allocated `keys` and `levels`
|
||||||
|
* @param sock - socket for request
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
static bool get_sensor_keys(senslog_t *sensor, sl_sock_t *sock){
|
||||||
|
if(!sensor || !sock) return false;
|
||||||
|
char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN];
|
||||||
|
snprintf(str, BUFSIZ, "%s=%d\n", commands[CMD_CHKLEVEL], sensor->idx);
|
||||||
|
if(!send_request(sock, str)){
|
||||||
|
LOGERR("Can't send request '%s'", commands[CMD_CHKLEVEL]);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ssize_t got = -1;
|
||||||
|
int valueidx = 0;
|
||||||
|
while(valueidx < sensor->nvalues && (got = get_answer_line(sock, str, BUFSIZ)) > 0 ){
|
||||||
|
DBG("valueidx=%d, nvalues=%d, got answer: %s", valueidx, sensor->nvalues, str);
|
||||||
|
if(2 != sl_get_keyval(str, key, value)){
|
||||||
|
LOGWARN("Wrong answer for `chklevel`: %s", str);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
DBG("key=%s", key);
|
||||||
|
int sensidx;
|
||||||
|
if(2 != sscanf(key, "[%[^]]][%d]", str, &sensidx)){
|
||||||
|
DBG("header? '%s' (str=%s)", key, str);
|
||||||
|
continue; // omit two heading lines: `PLUGIN` and `NVALUES`
|
||||||
|
}
|
||||||
|
DBG("str=%s, sensidx=%d", str, sensidx);
|
||||||
|
if(sensor->idx != sensidx){
|
||||||
|
LOGWARN("Got wrong key answer for sensor with idx=%d: '%s'", sensor->idx, key);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// OK - it's our value, increment number and fill name
|
||||||
|
sensor->keys[valueidx] = strdup(str);
|
||||||
|
sensor->levels[valueidx] = atoi(value);
|
||||||
|
DBG("now sensor->keys[%d]=%s, levels=%d", valueidx, sensor->keys[valueidx], sensor->levels[valueidx]);
|
||||||
|
++valueidx;
|
||||||
|
}
|
||||||
|
if(got < 0 || valueidx != sensor->nvalues) return false;
|
||||||
|
sensor->initialized = true;
|
||||||
|
sensor->lastvalidx = -1;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -259,7 +378,7 @@ static bool prepare_files(sl_sock_t *sock){
|
|||||||
if(!sock || !DBpath || Nsensors < 1) return false;
|
if(!sock || !DBpath || Nsensors < 1) return false;
|
||||||
for(int i = 0; i < Nsensors; ++i){
|
for(int i = 0; i < Nsensors; ++i){
|
||||||
senslog_t *sensor = &sensors[i];
|
senslog_t *sensor = &sensors[i];
|
||||||
if(!get_sensor_keys(sensor, sock)) return false;
|
if(!sensor->initialized && !get_sensor_keys(sensor, sock)) return false;
|
||||||
DBG("Check if there's something for sensor[%d]", i);
|
DBG("Check if there's something for sensor[%d]", i);
|
||||||
if(!find_old_file(sensor)){ // create new file
|
if(!find_old_file(sensor)){ // create new file
|
||||||
DBG("Nothing found, try to create new");
|
DBG("Nothing found, try to create new");
|
||||||
@@ -276,17 +395,8 @@ static bool prepare_files(sl_sock_t *sock){
|
|||||||
static bool analyse_list(sl_sock_t *sock){
|
static bool analyse_list(sl_sock_t *sock){
|
||||||
FNAME();
|
FNAME();
|
||||||
char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN];
|
char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN];
|
||||||
double tlast = sl_dtime();
|
ssize_t got;
|
||||||
while(sl_dtime() - tlast < net_timeout){
|
while((got = get_answer_line(sock, str, BUFSIZ)) > 0){
|
||||||
ssize_t len = sl_sock_readline(sock, str, BUFSIZ-1);
|
|
||||||
if(len == 0) continue;
|
|
||||||
if(len < 0){
|
|
||||||
WARNX("Seems like server disconnected");
|
|
||||||
LOGWARN("Server disconnected?");
|
|
||||||
sensors_delete();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
tlast = sl_dtime();
|
|
||||||
DBG("Got answer: %s", str);
|
DBG("Got answer: %s", str);
|
||||||
if(2 != sl_get_keyval(str, key, value)){
|
if(2 != sl_get_keyval(str, key, value)){
|
||||||
LOGWARN("Wrong answer from meteodaemon for 'list' request: %s", str);
|
LOGWARN("Wrong answer from meteodaemon for 'list' request: %s", str);
|
||||||
@@ -303,8 +413,10 @@ static bool analyse_list(sl_sock_t *sock){
|
|||||||
if(Nsensors <= idx){
|
if(Nsensors <= idx){
|
||||||
int oldN = Nsensors;
|
int oldN = Nsensors;
|
||||||
Nsensors = idx + 1;
|
Nsensors = idx + 1;
|
||||||
|
senslog_t *old_sensors = sensors;
|
||||||
sensors = realloc(sensors, Nsensors * sizeof(senslog_t));
|
sensors = realloc(sensors, Nsensors * sizeof(senslog_t));
|
||||||
if(!sensors){
|
if(!sensors){
|
||||||
|
sensors = old_sensors; // restore old pointer instead of NULL
|
||||||
sensors_delete();
|
sensors_delete();
|
||||||
LOGERR("analyse_list(): error in realloc()");
|
LOGERR("analyse_list(): error in realloc()");
|
||||||
WARNX("analyse_list(): error in realloc()");
|
WARNX("analyse_list(): error in realloc()");
|
||||||
@@ -332,6 +444,7 @@ static bool analyse_list(sl_sock_t *sock){
|
|||||||
sensor->levels = MALLOC(int, nvalues);
|
sensor->levels = MALLOC(int, nvalues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(got < 0) return false; // disconnection?
|
||||||
// now check all we got
|
// now check all we got
|
||||||
if(Nsensors < 1){
|
if(Nsensors < 1){
|
||||||
LOGWARN("Found 0 sensors in server's answer");
|
LOGWARN("Found 0 sensors in server's answer");
|
||||||
@@ -352,10 +465,6 @@ static bool analyse_list(sl_sock_t *sock){
|
|||||||
return ans;
|
return ans;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*for(int i = 0; i < nvalues; ++i){
|
|
||||||
sensor->keys[i] = ;
|
|
||||||
}*/
|
|
||||||
|
|
||||||
// prepare DB files at start
|
// prepare DB files at start
|
||||||
static bool prepare_logfiles(sl_sock_t *sock, const char *path){
|
static bool prepare_logfiles(sl_sock_t *sock, const char *path){
|
||||||
FNAME();
|
FNAME();
|
||||||
@@ -370,7 +479,7 @@ static bool prepare_logfiles(sl_sock_t *sock, const char *path){
|
|||||||
DBG("Store files in %s; send `list` request", DBpath);
|
DBG("Store files in %s; send `list` request", DBpath);
|
||||||
snprintf(buf, 255, "%s\n", commands[CMD_LIST]);
|
snprintf(buf, 255, "%s\n", commands[CMD_LIST]);
|
||||||
if(!send_request(sock, buf)){
|
if(!send_request(sock, buf)){
|
||||||
WARNX("Can't send inited request");
|
LOGERR("Can't send inited request");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// now we have an answer: 2*N strings a la "PLUGIN[i]=...\nNVALUES[i]=...\n" ->
|
// now we have an answer: 2*N strings a la "PLUGIN[i]=...\nNVALUES[i]=...\n" ->
|
||||||
@@ -379,6 +488,148 @@ static bool prepare_logfiles(sl_sock_t *sock, const char *path){
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send `get=x` request for each sensor; return true if all sent OK
|
||||||
|
static bool send_get_req(sl_sock_t *sock){
|
||||||
|
char str[128];
|
||||||
|
if(!sock || !sensors || Nsensors < 1) return false;
|
||||||
|
for(int i = 0; i < Nsensors; ++i){
|
||||||
|
if(sensors[i].nvalues < 1) continue;
|
||||||
|
snprintf(str, 128, "%s=%d\n", commands[CMD_GET], sensors[i].idx);
|
||||||
|
if(!send_request(sock, str)) return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// get answers and fill database
|
||||||
|
// we consider that answers comes in sequence order, from least sensno to timestamp
|
||||||
|
// @return false only in case of error
|
||||||
|
static bool poll_server_answers(sl_sock_t *sock){
|
||||||
|
char str[BUFSIZ], key[SL_KEY_LEN], value[SL_VAL_LEN];;
|
||||||
|
ssize_t len = sl_sock_readline(sock, str, BUFSIZ-1);
|
||||||
|
if(len < 0) return false; // error
|
||||||
|
else if(len == 0) return true; // nothing to read
|
||||||
|
DBG("Got line: '%s'", str);
|
||||||
|
int sensidx;
|
||||||
|
if(2 != sl_get_keyval(str, key, value) || 2 != sscanf(key, "%[^[][%d]", str, &sensidx)){
|
||||||
|
WARNX("not key=value");
|
||||||
|
LOGWARN("Wrong answer for `get`: %s", str);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if(sensidx < 0 || sensidx >= Nsensors){
|
||||||
|
WARNX("Wrong sensor index");
|
||||||
|
LOGWARN("Got station with index (%d) out of bounds [0, %d]", sensidx, Nsensors-1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// omit header
|
||||||
|
if(0 == strcmp(key_pluginname, str) || 0 == strcmp(key_nvalues, str)) return true;
|
||||||
|
senslog_t *sensor = &sensors[sensidx];
|
||||||
|
if(0 == strcmp(key_timestamp, str)){ // check timestamps and write data to disk
|
||||||
|
time_t ts = (time_t)atol(value);
|
||||||
|
DBG("Got timestamp: %zd (last timestamp: %zd)", ts, sensor->lasttimestamp);
|
||||||
|
bool allOK = true;
|
||||||
|
if(ts > sensor->lasttimestamp){ // don't store old data
|
||||||
|
time_t curt = time(NULL);
|
||||||
|
DBG("Timestamp minus current time = %zd", ts - curt);
|
||||||
|
if(ts > curt && ts - curt > TIME_IN_FUTURE_MAX){
|
||||||
|
WARNX("timestamp is in future");
|
||||||
|
LOGWARN("got future timestamp for sensor %d: %ld", sensidx, ts);
|
||||||
|
allOK = false;
|
||||||
|
}else{ // all OK - we can save data
|
||||||
|
if(sensor->lastvalidx > -1){ // only if at least one keyword received
|
||||||
|
int trailingcommas = sensor->nvalues - sensor->lastvalidx - 2; // amount of commas after our last record
|
||||||
|
DBG("trailing commas = %d", trailingcommas);
|
||||||
|
if(trailingcommas > 0){
|
||||||
|
if(sensor->buflen + 2*trailingcommas > BUFSIZ-1){
|
||||||
|
WARNX("Sensor's buffer overfull");
|
||||||
|
LOGWARN("Sensor %s: writing buffer overfull", sensor->idx);
|
||||||
|
}else{ // write trailing zeros to buffer
|
||||||
|
for(int i = 0; i < trailingcommas; ++i){
|
||||||
|
sprintf(sensor->buf + sensor->buflen, ", ");
|
||||||
|
sensor->buflen += 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// write timestamp without error checking
|
||||||
|
size_t l = snprintf(str, BUFSIZ, "%zd, ", ts);
|
||||||
|
write2fd(sensor, str, l);
|
||||||
|
// now throw data to disk
|
||||||
|
DBG("throw '%s' with length %d to disk", sensor->buf, sensor->buflen);
|
||||||
|
allOK = write2fd(sensor, sensor->buf, sensor->buflen);
|
||||||
|
if(allOK){
|
||||||
|
write2fd(sensor, "\n", 1);
|
||||||
|
fsync(sensor->fd); // sync last full record
|
||||||
|
sensor->lasttimestamp = ts;
|
||||||
|
}
|
||||||
|
DBG("Data for %d is %swritten to disk", sensor->idx, allOK ? "" : "not ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// clear gathred data
|
||||||
|
sensor->buflen = 0;
|
||||||
|
sensor->lastvalidx = -1;
|
||||||
|
return allOK;
|
||||||
|
}
|
||||||
|
// now we need to find index of current keyword
|
||||||
|
int idx = 0, nvalues = sensor->nvalues;
|
||||||
|
for(; idx < nvalues; ++idx){
|
||||||
|
if(0 == strcmp(str, sensor->keys[idx])) break;
|
||||||
|
}
|
||||||
|
DBG("key index=%d", idx);
|
||||||
|
|
||||||
|
if(idx == nvalues){
|
||||||
|
WARNX("Not found in keylist");
|
||||||
|
LOGWARN("Weird keyword for station %d: '%s'", sensidx, str);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if(sensor->lastvalidx >= idx){
|
||||||
|
WARNX("Missed timestamp?");
|
||||||
|
LOGWARN("Missed timestamp for station %d?", sensidx);
|
||||||
|
sensor->lastvalidx = -1;
|
||||||
|
sensor->buflen = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// add current raw value to sensor's buffer (there's could be a text fields, so don't make atof
|
||||||
|
char *comment = strchr(value, '/');
|
||||||
|
if(comment){
|
||||||
|
*comment = 0; // remove comment mark
|
||||||
|
if(comment != value && comment[-1] == ' ') comment[-1] = 0; // remove last space
|
||||||
|
}
|
||||||
|
DBG("VALUE: %s", value);
|
||||||
|
// add spaces if need
|
||||||
|
int trailingcommas = idx - sensor->lastvalidx - 1; // amount of commas after our last record before current
|
||||||
|
DBG("Commas: %d", trailingcommas);
|
||||||
|
if(trailingcommas > 0){
|
||||||
|
if(sensor->buflen + 2*trailingcommas > BUFSIZ-1){
|
||||||
|
WARNX("Buffer overfull");
|
||||||
|
LOGWARN("Sensor %d: writing buffer overfull", sensor->idx);
|
||||||
|
}else{ // write trailing zeros to buffer
|
||||||
|
for(int i = 0; i < trailingcommas; ++i){
|
||||||
|
sprintf(sensor->buf + sensor->buflen, ", ");
|
||||||
|
sensor->buflen += 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int avail = BUFSIZ - sensor->buflen;
|
||||||
|
if(avail <= 0){
|
||||||
|
LOGWARN("Sensor %d: writing buffer overfull", sensor->idx);
|
||||||
|
WARNX("Overfull; clear all");
|
||||||
|
sensor->lastvalidx = -1;
|
||||||
|
sensor->buflen = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
int n = snprintf(sensor->buf + sensor->buflen, avail, "%s%s",
|
||||||
|
value, sensor->nvalues-1 != idx ? ", " : "");
|
||||||
|
if(n >= avail){
|
||||||
|
// Data would be truncated handle error or discard
|
||||||
|
n = avail - 1;
|
||||||
|
LOGWARN("Sensor %d: writing buffer truncated", sensor->idx);
|
||||||
|
}
|
||||||
|
sensor->buflen += n;
|
||||||
|
sensor->lastvalidx = idx;
|
||||||
|
DBG("Now buf[%d]=%s", sensor->idx, sensor->buf);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief run_server - run main server: send weather requests and store data
|
* @brief run_server - run main server: send weather requests and store data
|
||||||
* @param node - node to connect
|
* @param node - node to connect
|
||||||
@@ -387,6 +638,7 @@ static bool prepare_logfiles(sl_sock_t *sock, const char *path){
|
|||||||
*/
|
*/
|
||||||
void run_server(const char *node, sl_socktype_e type, const char *path){
|
void run_server(const char *node, sl_socktype_e type, const char *path){
|
||||||
if(!node || !path) return;
|
if(!node || !path) return;
|
||||||
|
//char str[BUFSIZ];
|
||||||
sl_sock_t *sock = sl_sock_run_client(type, node, BUFSIZ);
|
sl_sock_t *sock = sl_sock_run_client(type, node, BUFSIZ);
|
||||||
if(!sock){
|
if(!sock){
|
||||||
DBG("Can't connect");
|
DBG("Can't connect");
|
||||||
@@ -397,15 +649,29 @@ void run_server(const char *node, sl_socktype_e type, const char *path){
|
|||||||
|
|
||||||
DBG("Superloop");
|
DBG("Superloop");
|
||||||
int errctr = 0;
|
int errctr = 0;
|
||||||
|
double tlast = 0.;
|
||||||
while(isrunning){
|
while(isrunning){
|
||||||
|
bool allOK = true;
|
||||||
if(logreinit){
|
if(logreinit){
|
||||||
if(!prepare_files(sock)) ++errctr;
|
if(!prepare_files(sock)) allOK = false;
|
||||||
else logreinit = false;
|
else logreinit = false;
|
||||||
}
|
}
|
||||||
if(errctr > 5){
|
double tnow = sl_dtime();
|
||||||
LOGERR("Too much errors -> exit");
|
if(tnow - tlast > req_interval){ // send next requests
|
||||||
break;
|
DBG("\n\n\nSend next request; deltaT=%g", tnow - tlast);
|
||||||
|
if(!send_get_req(sock)) allOK = false;
|
||||||
|
else tlast = tnow;
|
||||||
}
|
}
|
||||||
|
// now poll everything from server
|
||||||
|
if(!poll_server_answers(sock)) allOK = false;
|
||||||
|
;
|
||||||
|
if(!allOK){
|
||||||
|
if(++errctr > 5){
|
||||||
|
LOGERR("Too much errors -> exit");
|
||||||
|
WARNX("Too much errors -> exit");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}else errctr = 0;
|
||||||
usleep(1000);
|
usleep(1000);
|
||||||
}
|
}
|
||||||
if(sock) sl_sock_delete(&sock);
|
if(sock) sl_sock_delete(&sock);
|
||||||
|
|||||||
Reference in New Issue
Block a user