add dead sensors' reinit, gathering all fresh data, fixed some little bugs

This commit is contained in:
2026-05-06 17:13:00 +03:00
parent c5b9d43797
commit 63f87d2283
17 changed files with 207 additions and 174 deletions

View File

@@ -49,6 +49,7 @@ static glob_pars defconf = {
// only for config
weather_conf_t WeatherConf = {
.ahtung_delay = 30*60, // 30 minutes
.reinit_delay = 60, // each 1 minute
.wind.good = 5., // < 5m/s - good weather
.wind.bad = 10., // > 10m/s - bad weather
.wind.terrible = 15., // > 15m/s - terrible weather
@@ -92,9 +93,9 @@ sl_option_t cmdlnopts[] = {
};
sl_option_t confopts[] = {
{"verbose", NEED_ARG, NULL, 'a', arg_int, APTR(&G.verb), "logfile verbocity level"},
{"ahtung_delay",NEED_ARG, NULL, 'b', arg_int, APTR(&WeatherConf.ahtung_delay), "delay in seconds after bad weather to change to good"},
{"good_wind", NEED_ARG, NULL, 'c', arg_double, APTR(&WeatherConf.wind.good), "good wind while less this"},
{"verbose", NEED_ARG, NULL, 0, arg_int, APTR(&G.verb), "logfile verbocity level"},
{"ahtung_delay",NEED_ARG, NULL, 0, arg_int, APTR(&WeatherConf.ahtung_delay), "delay in seconds after bad weather to change to good"},
{"good_wind", NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.wind.good), "good wind while less this"},
{"bad_wind", NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.wind.bad), "bad wind if more than this"},
{"terrible_wind",NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.wind.terrible), "terrible wind if more than this"},
{"good_humidity",NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.humidity.good), "humidity is good until this"},
@@ -107,6 +108,7 @@ sl_option_t confopts[] = {
{"good_sky", NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.sky.good), "sky-ambient less than this is good"},
{"bad_sky", NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.sky.bad), "sky-ambient greater than this is bad"},
{"terrible_sky",NEED_ARG, NULL, 0, arg_double, APTR(&WeatherConf.sky.terrible), "sky-ambient greater than this is terrible"},
{"reinit_delay",NEED_ARG, NULL, 0, arg_int, APTR(&WeatherConf.reinit_delay), "delay (s) to reinit dead sensors"},
COMMON_OPTS
end_option
};

View File

@@ -7,11 +7,13 @@ verbose = 2
sockpath = "@weather"
# sensors polling time - 1s
pollt = 1
# try to reinit dead sensors each 10s
reinit_delay = 10
# !!! Point plugins in order of meaning: the most important are first !!!
# see help for plugins format
# this should be furst as almost a half of its sensors are broken
plugin = libreinhardt.so:D:/dev/ttyS0
plugin = libwxa100.so:D:/dev/pl2303_0
plugin = libhydreon.so:D:/dev/ch340_0:1200
plugin = libbtameteo.so
# this should be last as almost a half of its sensors are broken
plugin = libreinhardt.so:D:/dev/ttyS0

View File

@@ -74,6 +74,7 @@ int main(int argc, char **argv){
sl_init();
GP = parse_args(argc, argv);
if(!GP) ERRX("Error parsing args");
sl_check4running((char*)__progname, GP->pidfile);
if(!GP->sockname) ERRX("Point command socket name");
if(GP->logfile){
sl_loglevel_e lvl = LOGLEVEL_ERR + GP->verb;
@@ -99,7 +100,6 @@ int main(int argc, char **argv){
ERRX("Can't find any sensor plugin");
}
if(GP->nplugins && GP->nplugins != nopened) LOGWARN("Work without some plugins");
sl_check4running((char*)__progname, GP->pidfile);
#ifndef EBUG
sl_daemonize();
while(1){ // guard for dead processes
@@ -121,7 +121,7 @@ int main(int argc, char **argv){
signal(SIGUSR1, signals);
signal(SIGUSR2, signals);
if(!start_servers(GP->port, GP->sockname)) ERRX("Can't run server's threads");
while(1) pause();
//while(1) pause();
//WARNX("TEST ends");
//signals(0);
return 0; // never reached

View File

@@ -89,6 +89,10 @@ static val_t collected_data[NAMOUNT_OF_DATA] = {
// {.sense = VAL_OBLIGATORY, .type = VALT_FLOAT, .meaning = IS_OTHER},
};
// additional fields marked as `IS_OTHER` gathered from different stations
static int Nadditional = 0;
static val_t *additional_data = NULL;
/**
* @brief weather_level - set/clear weather level
* @param newlvl - -1 for getter or 0..3 for setter
@@ -231,21 +235,22 @@ static double get_max_forT(sliding_max_t *sm, time_t tcutoff){
}
int collected_amount(){
return NAMOUNT_OF_DATA;
return NAMOUNT_OF_DATA + Nadditional;
}
int get_collected(val_t *val, int N){
if(!val || N < 0 || N >= NAMOUNT_OF_DATA){
if(!val || N < 0 || N >= NAMOUNT_OF_DATA + Nadditional){
DBG("Wrong number (%d) requested or no place for data", N);
return FALSE;
}
pthread_mutex_lock(&datamutex);
val_t *dptr = (N < NAMOUNT_OF_DATA) ? &collected_data[N] : &additional_data[N-NAMOUNT_OF_DATA];
#ifdef EBUG
char buf[KEY_LEN+1];
get_fieldname(&collected_data[N], buf);
DBG("Copied data of %d (u=%d, nm=%s)", N, collected_data[N].value.u, buf);
get_fieldname(dptr, buf);
DBG("Copied data of %d (u=%d, nm=%s, t=%zd)", N, dptr->value.u, buf, dptr->time);
#endif
*val = collected_data[N];
*val = *dptr;
pthread_mutex_unlock(&datamutex);
return TRUE;
}
@@ -318,6 +323,24 @@ static void fix_new_data(val_t *collected, const val_t *fresh, int force){
}
}
// find value.name in `additional_data`, if need - allocate new memory and update
static void update_additional(val_t *value){
if(!value) return;
int idx = 0;
for(; idx < Nadditional; ++idx){
if(0 == strcmp(additional_data[idx].name, value->name)) break;
}
if(idx == Nadditional){ // not found -> allocate
additional_data = realloc(additional_data, sizeof(val_t) * (++Nadditional));
if(!additional_data){
LOGERR("update_additional() can't realloc()");
ERR("realloc()");
}
memcpy(&additional_data[idx], value, sizeof(val_t));
DBG("Allocated new field: %s", value->name);
}else fix_new_data(&additional_data[idx], value, 0);
}
/**
* @brief chkweatherlevel - increase weather level if need, also check force shutdown flags
* @param curlevel - current max weather level
@@ -379,12 +402,14 @@ static weather_cond_t const shtdnflag = {.good = 0.1, .bad = 0.5, .terrible = 0.
void refresh_sensval(sensordata_t *s){
//FNAME();
//static time_t poll_time = 0;
char reason[KEY_LEN+1] = {0}; // reason of weather level increasing
static char reason[KEY_LEN+1] = {0}; // reason of weather level increasing
val_t value;
if(!s || !s->get_value) return;
//if(poll_time == 0) poll_time = get_pollT();
uint32_t curlevel = 0; // this is worse weather leavel, start from best
static uint32_t curlevel = 0; // this is worse weather leavel, start from best (collect by all sensors through 3*tpoll)
static time_t lasttupdate = 0; // last update time of weather level
time_t curtime = time(NULL);
time_t tpoll = get_pollT(), _3tpoll = 3*tpoll;
double dir = -100., dir2 = -100.; // mean wind directions
//DBG("%d meteo values", s->Nvalues);
for(int i = 0; i < s->Nvalues; ++i){
@@ -459,6 +484,10 @@ void refresh_sensval(sensordata_t *s){
break;
default : break;
}
if(value.meaning == IS_OTHER){ // check for new or existant field in `additional_data`
update_additional(&value);
continue;
}
if(idx < 0 || idx >= NAMOUNT_OF_DATA) continue;
//DBG("IDX=%d", idx);
pthread_mutex_lock(&datamutex);
@@ -489,13 +518,18 @@ void refresh_sensval(sensordata_t *s){
collected_data[NWINDDIR2].value.f = (float) dir2;
collected_data[NWINDDIR2].time = curtime;
}
if(curtime - collected_data[NWIND].time < tpoll + 1){
collected_data[NWINDMAX].value.f = (float) get_current_max(&windspeeds);
collected_data[NWINDMAX].time = curtime;
collected_data[NWINDMAX1].value.f = (float) get_max_forT(&windspeeds, curtime - T_ONE_HOUR);
collected_data[NWINDMAX1].time = curtime;
}
//DBG("check ahtung");
if(Forbidden) collected_data[NCOMMWEATH].value.u = WEATHER_PROHIBITED;
else{
time_t _2update = lasttupdate + _3tpoll;
if(Forbidden){
collected_data[NCOMMWEATH].value.u = WEATHER_PROHIBITED;
collected_data[NCOMMWEATH].time = curtime;
}else if(curtime >= _2update){ // need to update weather level
if(collected_data[NCOMMWEATH].value.u > curlevel){ // check timeout to make level lower
// DBG("curtime: %zd, curahtt: %d, diff: %zd, delay: %d", curtime, collected_data[NLASTAHTUNG].value.u, curtime - collected_data[NLASTAHTUNG].value.u, WeatherConf.ahtung_delay);
if(curtime - collected_data[NLASTAHTUNG].value.u > WeatherConf.ahtung_delay){
@@ -521,12 +555,14 @@ void refresh_sensval(sensordata_t *s){
}
if(curlevel){
collected_data[NLASTAHTUNG].value.u = curtime; // refresh last ahtung time only for level > good
collected_data[NLASTAHTUNG].time = curtime;
collected_data[NAHTUNGRSN].time = curtime;
}
}
lasttupdate = curtime;
curlevel = 0; // wait for next collected max level
collected_data[NCOMMWEATH].time = curtime; // refresh `common weather` updating time
}
collected_data[NCOMMWEATH].time = curtime;
collected_data[NLASTAHTUNG].time = curtime;
pthread_mutex_unlock(&datamutex);
//DBG("Refreshed");
}
@@ -546,25 +582,3 @@ void forbid_observations(int f){
// `forbid` flag getter
int is_forbidden(){ return Forbidden; }
#if 0
// main cycle
void run_mainweather(){
int N = get_nplugins();
if(N < 1) return;
poll_time = get_pollT();
while(1){
int nactive = 0;
pthread_mutex_lock(&datamutex);
for(int i = N-1; i > -1; --i){ // the most important is the last
sensordata_t *s = get_plugin(i);
if(!s || !sensor_alive(s)) continue;
++nactive;
}
pthread_mutex_unlock(&datamutex);
if(nactive == 0) break; // no active sensors
usleep(10000);
}
LOGERR("Main weather collector died: all sensors lost");
}
#endif

View File

@@ -39,6 +39,7 @@ typedef struct{
typedef struct{
int ahtung_delay; // delay to change "bad weather" to good after last "bad event"
int reinit_delay; // delay to check all sensors and reinit dead
// wind, m/s
weather_cond_t wind;
// humidity, %%

View File

@@ -41,7 +41,7 @@ static const val_t values[NAMOUNT] = {
static void *mainthread(void *s){
FNAME();
sensordata_t *sensor = (sensordata_t *)s;
while(1){
while(sensor->fdes > -1){
if(check_shm_block(&sdat)){
//DBG("Got next");
time_t tnow = time(NULL);
@@ -64,16 +64,13 @@ static void *mainthread(void *s){
return NULL;
}
sensordata_t *sensor_new(int N, time_t pollt, _U_ const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
sensordata_t *s = common_new();
if(!s) return NULL;
s->PluginNo = N;
if(pollt) s->tpoll = pollt;
if(!s) return FALSE;
if(!get_shm_block(&sdat, ClientSide)){
WARNX("Can't get BTA shared memory block or create main thread");
s->kill(s);
return NULL;
return FALSE;
}
s->values = MALLOC(val_t, NAMOUNT);
for(int i = 0; i < NAMOUNT; ++i) s->values[i] = values[i];
@@ -82,9 +79,9 @@ sensordata_t *sensor_new(int N, time_t pollt, _U_ const char *descr){
if(pthread_create(&s->thread, NULL, mainthread, (void*)s)){
WARN("Can't create main thread");
s->kill(s);
return NULL;
return FALSE;
}
s->fdes = 0;
return s;
return TRUE;
}

View File

@@ -38,7 +38,7 @@ static void *mainthread(void *s){
FNAME();
double t0 = sl_dtime();
sensordata_t *sensor = (sensordata_t *)s;
while(1){
while(sensor->fdes > -1){
//DBG("locked");
pthread_mutex_lock(&sensor->valmutex);
float f = sensor->values[0].value.f + (drand48() - 0.5) / 2.;
@@ -69,13 +69,11 @@ static void *mainthread(void *s){
return NULL;
}
sensordata_t *sensor_new(int N, time_t pollt, _U_ const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
sensordata_t *s = common_new();
if(!s) return NULL;
if(!s) return FALSE;
s->Nvalues = NS;
strncpy(s->name, SENSOR_NAME, NAME_LEN);
if(pollt) s->tpoll = pollt;
s->values = MALLOC(val_t, NS);
for(int i = 0; i < NS; ++i) s->values[i] = values[i];
s->values[0].value.f = 1.;
@@ -85,11 +83,10 @@ sensordata_t *sensor_new(int N, time_t pollt, _U_ const char *descr){
s->values[4].value.f = 89.;
s->values[5].value.u = 0;
s->values[6].value.f = 4.5;
s->PluginNo = N;
if(pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
s->fdes = 0;
return s;
return TRUE;
}

View File

@@ -120,28 +120,24 @@ static void *mainthread(void *s){
return NULL;
}
sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
if(!descr || !*descr) return NULL;
int fd = getFD(descr);
if(fd < 0) return NULL;
sensordata_t *s = common_new();
if(!s) return NULL;
if(!s) return FALSE;
int fd = getFD(s->path);
if(fd < 0) return FALSE;
s->fdes = fd;
s->PluginNo = N;
if(pollt) s->tpoll = pollt;
snprintf(s->name, NAME_LEN, "%s @ %s", SENSOR_NAME, descr);
snprintf(s->name, NAME_LEN, "%s", SENSOR_NAME);
s->values = MALLOC(val_t, NS);
// don't use memcpy, as `values` could be aligned
for(int i = 0; i < NS; ++i) s->values[i] = values[i];
if(!(s->ringbuffer = sl_RB_new(BUFSIZ))){
WARNX("Can't init ringbuffer!");
s->kill(s);
return NULL;
return FALSE;
}
if(pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
return s;
return TRUE;
}

View File

@@ -194,25 +194,21 @@ static void *mainthread(void *s){
return NULL;
}
sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
if(!descr || !*descr) return NULL;
int fd = getFD(descr);
if(fd < 0) return NULL;
sensordata_t *s = common_new();
if(!s) return NULL;
snprintf(s->name, NAME_LEN, "%s @ %s", SENSOR_NAME, descr);
if(!s) return FALSE;
int fd = getFD(s->path);
if(fd < 0) return FALSE;
snprintf(s->name, NAME_LEN, "%s", SENSOR_NAME);
s->fdes = fd;
s->PluginNo = N;
s->Nvalues = NAMOUNT;
if(pollt) s->tpoll = pollt;
s->values = MALLOC(val_t, NAMOUNT);
// don't use memcpy, as `values` could be aligned
for(int i = 0; i < NAMOUNT; ++i) s->values[i] = values[i];
if(!(s->ringbuffer = sl_RB_new(BUFSIZ)) ||
pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
return s;
return TRUE;
}

View File

@@ -111,8 +111,8 @@ static void *mainthread(void *s){
time_t tnow = time(NULL);
if(tnow - tpoll > sensor->tpoll){
int dlen = sprintf(buf, "%s0\n%s1\n", commands[CMD_DISTANCE], commands[CMD_DISTANCE]);
if(sl_tty_write(sensor->fdes, buf, dlen)){
WARN("Can't ask new data");
if(dlen != write(sensor->fdes, buf, dlen)){
WARN("Can't ask new data from lightning monitor");
break;
}
DBG("poll @%zd, pollt=%zd", tnow, sensor->tpoll);
@@ -169,22 +169,24 @@ static void *mainthread(void *s){
sensor->values[NLIGHTNING].time = tnow;
}
pthread_mutex_unlock(&sensor->valmutex);
if(gotfresh && sensor->freshdatahandler) sensor->freshdatahandler(sensor);
if(gotfresh) DBG("got fresh data");
if(gotfresh && sensor->freshdatahandler){
DBG("Run fresh data handler");
sensor->freshdatahandler(sensor);
}
usleep(1000);
}
DBG("suicide");
sensor->kill(sensor);
return NULL;
}
sensordata_t *sensor_new(int N, time_t _U_ pollt, const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
if(!descr || !*descr) return NULL;
int fd = getFD(descr);
if(fd < 0) return NULL;
sensordata_t *s = common_new();
if(!s) return NULL;
snprintf(s->name, NAME_LEN, "%s @ %s", SENSOR_NAME, descr);
s->PluginNo = N;
if(!s) return FALSE;
int fd = getFD(s->path);
if(fd < 0) return FALSE;
snprintf(s->name, NAME_LEN, "%s", SENSOR_NAME);
s->fdes = fd;
s->Nvalues = NAMOUNT;
s->tpoll = TCHECK;
@@ -193,7 +195,7 @@ sensordata_t *sensor_new(int N, time_t _U_ pollt, const char *descr){
if(!(s->ringbuffer = sl_RB_new(BUFSIZ)) ||
pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
return s;
return TRUE;
}

View File

@@ -78,7 +78,7 @@ static void *mainthread(void *s){
while(sensor->fdes > -1){
time_t tnow = time(NULL);
if(tnow - tpoll > sensor->tpoll){
if(sl_tty_write(sensor->fdes, "?U\r\n", 4)){
if(4 != write(sensor->fdes, "?U\r\n", 4)){
WARN("Can't ask new data");
break;
}
@@ -166,24 +166,20 @@ static void *mainthread(void *s){
}
sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
if(!descr || !*descr) return NULL;
int fd = getFD(descr);
if(fd < 0) return NULL;
sensordata_t *s = common_new();
if(!s) return NULL;
if(!s) return FALSE;
int fd = getFD(s->path);
if(fd < 0) return FALSE;
s->Nvalues = NAMOUNT;
s->PluginNo = N;
s->fdes = fd;
snprintf(s->name, NAME_LEN, "%s @ %s", SENSOR_NAME, descr);
if(pollt) s->tpoll = pollt;
snprintf(s->name, NAME_LEN, "%s", SENSOR_NAME);
s->values = MALLOC(val_t, NAMOUNT);
for(int i = 0; i < NAMOUNT; ++i) s->values[i] = values[i];
if(!(s->ringbuffer = sl_RB_new(BUFSIZ)) ||
pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
return s;
return TRUE;
}

View File

@@ -91,7 +91,6 @@ enum{
static netsnmp_session *snmp_session;
static oid anOID[OID_AMOUNT][MAX_OID_LEN];
static size_t anOID_len[OID_AMOUNT];
static int running = 1;
const char *oids[OID_AMOUNT] = {
[OID_BATT_STATUS] = ".1.3.6.1.2.1.33.1.2.1.0",
@@ -114,7 +113,7 @@ static void *mainthread(void *s){
double t0 = sl_dtime();
sensordata_t *sensor = (sensordata_t *)s;
netsnmp_pdu *pdu, *response;
while(running){
while(sensor->fdes > -1){
//DBG("run");
pdu = snmp_pdu_create(SNMP_MSG_GET);
for(int i = 0; i < OID_AMOUNT; ++i)
@@ -123,19 +122,20 @@ static void *mainthread(void *s){
//DBG("status = %d", status);
if(status == STAT_SUCCESS && response->errstat == SNMP_ERR_NOERROR){
time_t curt = time(NULL);
netsnmp_variable_list *vars = response->variables;
netsnmp_variable_list *vars = response->variables; // OID_BATT_STATUS
pthread_mutex_lock(&sensor->valmutex);
int ival = *vars->val.integer;
if(ival > 0 && ival < BATT_STAT_AMOUNT)
if(ival > 0 && ival < BATT_STAT_AMOUNT){
snprintf(sensor->values[NBATSTAT].value.str, STRT_LEN+1, "%s", batt_stat[ival]);
vars = vars->next_variable;
}
vars = vars->next_variable; // OID_BATT_SECONDS_ONBAT
uint32_t tonbat = (uint32_t) *vars->val.integer;
sensor->values[NTONBAT].value.u = tonbat;
vars = vars->next_variable;
vars = vars->next_variable; // OID_BATT_EST_MINUTES
sensor->values[NTREMAIN].value.u = 60 * (uint32_t) *vars->val.integer;
vars = vars->next_variable;
vars = vars->next_variable; // OID_BATT_CAPACITY
sensor->values[NBATCAP].value.u = (uint32_t) *vars->val.integer;
vars = vars->next_variable;
vars = vars->next_variable; // OID_OUTPUT_SOURCE
ival = *vars->val.integer;
if(ival > 0 && ival < SOURCE_AMOUNT)
snprintf(sensor->values[NSOURCE].value.str, STRT_LEN+1, "%s", sources[ival]);
@@ -144,6 +144,7 @@ static void *mainthread(void *s){
}else sensor->values[NONBAT].value.u = 0;
for(int i = 0; i < NAMOUNT; ++i)
sensor->values[i].time = curt;
//DBG("times updated to %zd", curt);
pthread_mutex_unlock(&sensor->valmutex);
if(sensor->freshdatahandler) sensor->freshdatahandler(sensor);
}else DBG("Error in packet");
@@ -156,15 +157,15 @@ static void *mainthread(void *s){
}
static void snmp_kill(sensordata_t *s){
running = 0;
s->fdes = -1;
pthread_join(s->thread, NULL);
snmp_close(snmp_session);
common_kill(s);
}
sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
if(!descr || !*descr) return NULL;
if(!s || !s->path[0]) return FALSE;
netsnmp_session session;
init_snmp("snmpapp");
@@ -174,29 +175,24 @@ sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
session.community = (u_char *)"public";
session.community_len = strlen((const char *)session.community);
const char *colon = strchr(descr, ':');
if(colon) descr = colon + 1; // omit "N:" in field "N:host"
session.peername = strdup(descr);
DBG("PATH: %s", s->path);
const char *colon = strchr(s->path, ':');
if(colon) ++colon; // omit "N:" in field "N:host"
session.peername = strdup(colon);
snmp_session = snmp_open(&session);
if(!snmp_session){
snmp_sess_perror("snmp_open", &session);
FREE(session.peername);
return NULL;
return FALSE;
}
sensordata_t *s = common_new();
if(!s){
snmp_close(snmp_session);
return NULL;
}
s->kill = snmp_kill;
snprintf(s->name, NAME_LEN, "%s @ %s", SENSOR_NAME, descr);
s->PluginNo = N;
s->fdes = -1;
snprintf(s->name, NAME_LEN, "%s", SENSOR_NAME);
s->fdes = 0;
s->Nvalues = NAMOUNT;
if(pollt) s->tpoll = pollt;
s->values = MALLOC(val_t, NAMOUNT);
for(int i = 0; i < NAMOUNT; ++i) s->values[i] = values[i];
@@ -214,7 +210,7 @@ sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
if(!(s->ringbuffer = sl_RB_new(BUFSIZ)) ||
pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
return s;
return TRUE;
}

View File

@@ -127,7 +127,7 @@ static void *mainthread(void *s){
while(sensor->fdes > -1){
time_t tnow = time(NULL);
if(tnow - tpoll > sensor->tpoll){
if(sl_tty_write(sensor->fdes, "!0R0\r\n", 6)){
if(6 != write(sensor->fdes, "!0R0\r\n", 6)){
WARN("Can't ask new data");
break;
}
@@ -194,25 +194,21 @@ static void *mainthread(void *s){
return NULL;
}
sensordata_t *sensor_new(int N, time_t pollt, const char *descr){
int sensor_init(sensordata_t *s){
FNAME();
if(!descr || !*descr) return NULL;
int fd = getFD(descr);
if(fd < 0) return NULL;
sensordata_t *s = common_new();
if(!s) return NULL;
snprintf(s->name, NAME_LEN, "%s @ %s", SENSOR_NAME, descr);
s->PluginNo = N;
if(!s) return FALSE;
int fd = getFD(s->path);
if(fd < 0) return FALSE;
snprintf(s->name, NAME_LEN, "%s", SENSOR_NAME);
s->fdes = fd;
s->Nvalues = NAMOUNT;
if(pollt) s->tpoll = pollt;
s->values = MALLOC(val_t, NAMOUNT);
for(int i = 0; i < NAMOUNT; ++i) s->values[i] = values[i];
if(!(s->ringbuffer = sl_RB_new(BUFSIZ)) ||
pthread_create(&s->thread, NULL, mainthread, (void*)s)){
s->kill(s);
return NULL;
return FALSE;
}
return s;
return TRUE;
}

View File

@@ -85,7 +85,7 @@ void *open_plugin(const char *name){
*/
static void dumpsensors(struct sensordata_t* station){
//FNAME();
if(!station || !station->get_value || station->Nvalues < 1 || station->IsMuted) return;
if(!sensor_alive(station) || !station->get_value || station->Nvalues < 1 || station->IsMuted) return;
refresh_sensval(station);
#if 0
DBG("New values...");
@@ -136,14 +136,20 @@ int openplugins(char **paths, int N){
void* dlh = open_plugin(buf);
if(!dlh) continue;
DBG("OPENED");
sensor_new_t sensnew = (sensor_new_t) dlsym(dlh, "sensor_new");
if(sensnew){
sensordata_t *S = sensnew(nplugins, poll_interval, colon); // here nplugins is index in array
if(!S) WARNXL("Can't init plugin %s", paths[i]);
sensor_init_t sensinit = (sensor_init_t) dlsym(dlh, "sensor_init");
if(sensinit){
sensordata_t *S = sensor_new(nplugins, colon);
if(!S) WARNXL("Can't allocate memory for 'sensor' structure");
else{
S->init = sensinit;
S->tpoll = poll_interval;
allplugins[nplugins++] = S;
int inited = sensinit(S); // here nplugins is index in array
if(!inited) WARNXL("Can't init plugin %s", paths[i]);
else{
if(!S->onrefresh || !S->onrefresh(S, dumpsensors)) WARNXL("Can't init refresh funtion");
LOGMSG("Plugin %s nave %d sensors", paths[i], S->Nvalues);
allplugins[nplugins++] = S;
}
}
}else WARNXL("Can't find initing function in plugin %s: %s", paths[i], dlerror());
}
@@ -158,6 +164,7 @@ void closeplugins(){
if(!allplugins || nplugins < 1) return;
for(int i = 0; i < nplugins; ++i){
if(allplugins[i]->kill) allplugins[i]->kill(allplugins[i]);
FREE(allplugins[i]);
}
FREE(allplugins);
nplugins = 0;

View File

@@ -39,16 +39,18 @@ static sl_sock_hresult_e timehandler(sl_sock_t *client, _U_ sl_sock_hitem_t *ite
return RESULT_SILENCE;
}
#define SSZ_ (PATH_MAX + 256)
// show all connected libraries
static sl_sock_hresult_e listhandler(sl_sock_t *client, _U_ sl_sock_hitem_t *item, _U_ const char *req){
if(!client) return RESULT_FAIL;
char buf[256];
char buf[SSZ_];
int N = get_nplugins();
if(N < 1) return RESULT_FAIL;
sensordata_t *d = NULL;
for(int i = 0; i < N; ++i){
if(!(d = get_plugin(i))) continue;
snprintf(buf, 255, "PLUGIN[%d]=%s\nNVALUES[%d]=%d\n", i, d->name, i, d->Nvalues);
if(d->path[0]) snprintf(buf, SSZ_, "PLUGIN[%d]=%s @ %s\nNVALUES[%d]=%d\n", i, d->name, d->path, i, d->Nvalues);
else snprintf(buf, SSZ_, "PLUGIN[%d]=%s\nNVALUES[%d]=%d\n", i, d->name, i, d->Nvalues);
sl_sock_sendstrmessage(client, buf);
}
return RESULT_SILENCE;
@@ -83,7 +85,7 @@ static void showdata(sl_sock_t *client, int N){
if(!s) return;
Ncoll = s->Nvalues;
}
time_t oldest = time(NULL) - WeatherConf.ahtung_delay, mstm = 0;
time_t oldest = time(NULL) - 2 * WeatherConf.ahtung_delay, mstm = 0;
for(int i = 0; i < Ncoll; ++i){
int ans = 0;
@@ -402,7 +404,25 @@ int start_servers(const char *netnode, const char *sockpath){
sl_sock_dischandler(localsocket, disconnected);
sl_sock_defmsghandler(netsocket, defhandler);
sl_sock_defmsghandler(localsocket, defhandler);
return TRUE;
// now run checking cycle
int Nplugins = get_nplugins();
time_t tstart = time(NULL), tcur;
while(1){
for(int i = 0; i < Nplugins; ++i){
sensordata_t *s = get_plugin(i);
if(!s) continue;
if(sensor_alive(s)) continue;
// sensor isn't inited - try to do it
DBG("sensor with path %s isn't inited, try", s->path);
if(s->init){
if(s->init(s)) LOGMSG("Sensor %s reinited @ %s", s->name, s->path);
else DBG("Can't reinit");
}
}
while((tcur = time(NULL)) - tstart < WeatherConf.reinit_delay) sleep(1);
tstart = tcur;
}
return TRUE; // should be never reached
}
void kill_servers(){

View File

@@ -29,15 +29,18 @@
//static int common_init(sensordata_t*, int, time_t, int);
/**
* @brief common_new - call this function from your plugin's `sensor_new`
* @return
* @brief sensor_new - call this function before calling `sensor_init`
* @param N - plugin number in array
* @return pointer to allocated sensor's structure
*/
sensordata_t *common_new(){
sensordata_t *sensor_new(int N, const char *descr){
sensordata_t *s = MALLOC(sensordata_t, 1);
s->fdes = -1; // not inited
s->onrefresh = common_onrefresh;
s->onrefresh = common_onrefresh; // `init` function can redefine basic stubs
s->get_value = common_getval;
s->kill = common_kill;
s->PluginNo = N; // `init` shouldn't change this value
snprintf(s->path, PATH_MAX, "%s", descr); // `init` shouldn't change this value
pthread_mutex_init(&s->valmutex, NULL);
return s;
}
@@ -66,19 +69,26 @@ int common_onrefresh(sensordata_t *s, void (*handler)(sensordata_t *)){
}
/**
* @brief common_kill - common `die` function
* @brief common_kill - common `die` function (close, but don't destroy sensor)
* @param s - sensor
*/
void common_kill(sensordata_t *s){
FNAME();
if(!s) return;
if(s->fdes > -1){ // inited and maybe have opened file/socket
if(pthread_equal(pthread_self(), s->thread)){
DBG("Don't cancel myself");
}else{
DBG("Cancel sensor's thread");
if(0 == pthread_cancel(s->thread)){
DBG("%s main thread canceled, join", s->name);
pthread_join(s->thread, NULL);
DBG("Done");
}
}
close(s->fdes);
s->fdes = -1;
DBG("FD closed");
}
DBG("Delete RB");
if(s->ringbuffer) sl_RB_delete(&s->ringbuffer);
@@ -86,9 +96,8 @@ void common_kill(sensordata_t *s){
if(s->privdatafree) s->privdatafree(s->privdata);
else FREE(s->privdata);
DBG("Sensor '%s' killed", s->name);
LOGERR("Sensor '%s' killed", s->name);
FREE(s);
DBG("There's no more this sensor");
if(s->path[0]) LOGERR("Sensor '%s' @ '%s' killed", s->name, s->path);
else LOGERR("Sensor '%s' killed", s->name);
}
/**

View File

@@ -97,10 +97,12 @@ typedef struct{
// all sensor's data
// all functions have `this` as first arg
typedef struct sensordata_t{
char name[NAME_LEN+1]; // max 31 symbol of sensor's name (e.g. "rain sensor")
char name[NAME_LEN+1]; // sensor's name (e.g. "rain sensor @ localhost")
char path[PATH_MAX]; // path to sensor's device or socket description in format D:/path, U:path, N:host:port
int Nvalues; // amount of values
int PluginNo; // plugin number in array (if several)
int IsMuted; // ==1 for "muted" station (don't refresh sensors' data)
int (*init)(struct sensordata_t*); // main init function
int (*onrefresh)(struct sensordata_t*, void (*handler)(struct sensordata_t*)); // handler of new data; return TRUE if OK
int (*get_value)(struct sensordata_t*, val_t*, int); // getter of Nth value
void (*kill)(struct sensordata_t*); // close everything and remove sensor
@@ -118,14 +120,14 @@ typedef struct sensordata_t{
} sensordata_t;
// type for function extraction
typedef sensordata_t* (*sensor_new_t)(int, time_t, const char*);
typedef int (*sensor_init_t)(sensordata_t *);
// init meteostation with given PluginNo, poll_interval and descriptor
sensordata_t *sensor_new(int PluginNo, time_t poll_interval, const char *descr); // external initial function for any plugin
//sensordata_t *sensor_init(int PluginNo, time_t poll_interval, const char *descr); // external initial function for any plugin
int sensor_alive(sensordata_t *s);
sensordata_t *sensor_new(int N, const char *descr);
// private function (for plugins usage only)
sensordata_t *common_new();
void common_kill(sensordata_t *s);
int common_onrefresh(sensordata_t *s, void (*handler)(sensordata_t *));
int common_getval(struct sensordata_t *s, val_t *o, int N);