add SDO text parser, fix some functions for multithreading

This commit is contained in:
Edward Emelianov 2020-11-18 17:07:15 +03:00
parent 1f6be7644f
commit 2502c071cc
10 changed files with 211 additions and 149 deletions

10
canserver/4tests.txt Normal file
View File

@ -0,0 +1,10 @@
#raw
register raw 0 raw
mesg raw 0x60a 32 1 96 0 0 0 0 0
#canopen
register x 0x58a canopen
mesg x 10 0x6001 0
#emul
register new 3333 emulation

View File

@ -113,9 +113,13 @@ static CANmesg *mkMesg(SDO *sdo){
return &mesg; return &mesg;
} }
// transform CAN-message to SDO /**
SDO *parseSDO(CANmesg *mesg){ * @brief parseSDO - transform CAN-message to SDO
static SDO sdo; * @param mesg (i) - message
* @param sdo (o) - SDO
* @return sdo or NULL depending on result
*/
SDO *parseSDO(CANmesg *mesg, SDO *sdo){
if(mesg->len != 8){ if(mesg->len != 8){
WARNX("Wrong SDO data length"); WARNX("Wrong SDO data length");
return NULL; return NULL;
@ -125,17 +129,18 @@ SDO *parseSDO(CANmesg *mesg){
DBG("cobid=0x%X, not a TSDO!", cobid); DBG("cobid=0x%X, not a TSDO!", cobid);
return NULL; // not a transmit SDO return NULL; // not a transmit SDO
} }
sdo.NID = mesg->ID & NODEID_MASK; sdo->NID = mesg->ID & NODEID_MASK;
uint8_t spec = mesg->data[0]; uint8_t spec = mesg->data[0];
sdo.ccs = GET_CCS(spec); sdo->ccs = GET_CCS(spec);
sdo.index = (uint16_t)mesg->data[1] | ((uint16_t)mesg->data[2] << 8); sdo->index = (uint16_t)mesg->data[1] | ((uint16_t)mesg->data[2] << 8);
sdo.subindex = mesg->data[3]; sdo->subindex = mesg->data[3];
if((spec & SDO_E) && (spec & SDO_S)) sdo.datalen = SDO_datalen(spec); if((spec & SDO_E) && (spec & SDO_S)) sdo->datalen = SDO_datalen(spec);
else if(sdo.ccs == CCS_ABORT_TRANSFER) sdo.datalen = 4; // error code else if(sdo->ccs == CCS_ABORT_TRANSFER) sdo->datalen = 4; // error code
else sdo.datalen = 0; // no data in message else sdo->datalen = 0; // no data in message
for(uint8_t i = 0; i < sdo.datalen; ++i) sdo.data[i] = mesg->data[4+i]; for(uint8_t i = 0; i < sdo->datalen; ++i) sdo->data[i] = mesg->data[4+i];
DBG("Got TSDO from NID=%d, ccs=%u, index=0x%X, subindex=0x%X, datalen=%d", sdo.NID, sdo.ccs, sdo.index, sdo.subindex, sdo.datalen); DBG("Got TSDO from NID=%d, ccs=%u, index=0x%X, subindex=0x%X, datalen=%d",
return &sdo; sdo->NID, sdo->ccs, sdo->index, sdo->subindex, sdo->datalen);
return sdo;
} }
// send request to read SDO // send request to read SDO
@ -150,21 +155,23 @@ static int ask2read(uint16_t idx, uint8_t subidx, uint8_t NID){
return canbus_write(mesg); return canbus_write(mesg);
} }
static SDO *getSDOans(uint16_t idx, uint8_t subidx, uint8_t NID){ static SDO *getSDOans(uint16_t idx, uint8_t subidx, uint8_t NID, SDO *sdo){
FNAME(); FNAME();
int found = 0;
CANmesg mesg; CANmesg mesg;
SDO *sdo = NULL;
double t0 = dtime(); double t0 = dtime();
while(dtime() - t0 < SDO_ANS_TIMEOUT){ while(dtime() - t0 < SDO_ANS_TIMEOUT){
mesg.ID = TSDO_COBID | NID; // read only from given ID mesg.ID = TSDO_COBID | NID; // read only from given ID
if(canbus_read(&mesg)){ if(canbus_read(&mesg)){
continue; continue;
} }
sdo = parseSDO(&mesg); if(!parseSDO(&mesg, sdo)) continue;
if(!sdo) continue; if(sdo->index == idx && sdo->subindex == subidx){
if(sdo->index == idx && sdo->subindex == subidx) break; found = 1;
break;
} }
if(!sdo || sdo->index != idx || sdo->subindex != subidx){ }
if(!found){
WARNX("No answer from SDO 0x%X/0x%X", idx, subidx); WARNX("No answer from SDO 0x%X/0x%X", idx, subidx);
return NULL; return NULL;
} }
@ -176,15 +183,16 @@ static SDO *getSDOans(uint16_t idx, uint8_t subidx, uint8_t NID){
* @param idx - SDO index * @param idx - SDO index
* @param subidx - SDO subindex * @param subidx - SDO subindex
* @param NID - target node ID * @param NID - target node ID
* @param sdo (i)- SDO to fit
* @return SDO received or NULL if error * @return SDO received or NULL if error
*/ */
SDO *readSDOvalue(uint16_t idx, uint8_t subidx, uint8_t NID){ SDO *readSDOvalue(uint16_t idx, uint8_t subidx, uint8_t NID, SDO *sdo){
FNAME(); FNAME();
if(ask2read(idx, subidx, NID)){ if(ask2read(idx, subidx, NID)){
WARNX("readSDOvalue(): Can't initiate upload"); WARNX("readSDOvalue(): Can't initiate upload");
return NULL; return NULL;
} }
return getSDOans(idx, subidx, NID); return getSDOans(idx, subidx, NID, sdo);
} }
static inline uint32_t mku32(uint8_t data[4]){ static inline uint32_t mku32(uint8_t data[4]){
@ -214,42 +222,42 @@ static inline int8_t mki8(uint8_t data[4]){
// read SDO value, if error - return INT64_MIN // read SDO value, if error - return INT64_MIN
int64_t SDO_read(const SDO_dic_entry *e, uint8_t NID){ int64_t SDO_read(const SDO_dic_entry *e, uint8_t NID){
FNAME(); FNAME();
SDO *sdo = readSDOvalue(e->index, e->subindex, NID); SDO sdo;
if(!sdo){ if(!readSDOvalue(e->index, e->subindex, NID, &sdo)){
return INT64_MIN; return INT64_MIN;
} }
if(sdo->ccs == CCS_ABORT_TRANSFER){ // error if(sdo.ccs == CCS_ABORT_TRANSFER){ // error
WARNX("Got error for SDO 0x%X", e->index); WARNX("Got error for SDO 0x%X", e->index);
uint32_t ac = mku32(sdo->data); uint32_t ac = mku32(sdo.data);
const char *etxt = abortcode_text(ac); const char *etxt = abortcode_text(ac);
if(etxt) WARNX("Abort code 0x%X: %s", ac, etxt); if(etxt) WARNX("Abort code 0x%X: %s", ac, etxt);
return INT64_MIN; return INT64_MIN;
} }
if(sdo->datalen != e->datasize){ if(sdo.datalen != e->datasize){
WARNX("Got SDO with length %d instead of %d (as in dictionary)", sdo->datalen, e->datasize); WARNX("Got SDO with length %d instead of %d (as in dictionary)", sdo.datalen, e->datasize);
} }
int64_t ans = 0; int64_t ans = 0;
if(e->issigned){ if(e->issigned){
switch(sdo->datalen){ switch(sdo.datalen){
case 1: case 1:
ans = mki8(sdo->data); ans = mki8(sdo.data);
break; break;
case 4: case 4:
ans = mki32(sdo->data); ans = mki32(sdo.data);
break; break;
default: // can't be 3! 3->2 default: // can't be 3! 3->2
ans = mki16(sdo->data); ans = mki16(sdo.data);
} }
}else{ }else{
switch(sdo->datalen){ switch(sdo.datalen){
case 1: case 1:
ans = mku8(sdo->data); ans = mku8(sdo.data);
break; break;
case 4: case 4:
ans = mku32(sdo->data); ans = mku32(sdo.data);
break; break;
default: // can't be 3! 3->2 default: // can't be 3! 3->2
ans = mku16(sdo->data); ans = mku16(sdo.data);
} }
} }
return ans; return ans;
@ -276,23 +284,23 @@ int SDO_writeArr(const SDO_dic_entry *e, uint8_t NID, const uint8_t *data){
return 2; return 2;
} }
DBG("get answer"); DBG("get answer");
SDO *sdop = getSDOans(e->index, e->subindex, NID); SDO sdop;
if(!sdop){ if(!getSDOans(e->index, e->subindex, NID, &sdop)){
WARNX("SDO_write(): SDO read error"); WARNX("SDO_write(): SDO read error");
return 3; return 3;
} }
if(sdop->ccs == CCS_ABORT_TRANSFER){ // error if(sdop.ccs == CCS_ABORT_TRANSFER){ // error
WARNX("SDO_write(): Got error for SDO 0x%X", e->index); WARNX("SDO_write(): Got error for SDO 0x%X", e->index);
uint32_t ac = mku32(sdop->data); uint32_t ac = mku32(sdop.data);
const char *etxt = abortcode_text(ac); const char *etxt = abortcode_text(ac);
if(etxt) WARNX("Abort code 0x%X: %s", ac, etxt); if(etxt) WARNX("Abort code 0x%X: %s", ac, etxt);
return 4; return 4;
} }
if(sdop->datalen != 0){ if(sdop.datalen != 0){
WARNX("SDO_write(): got answer with non-zero length"); WARNX("SDO_write(): got answer with non-zero length");
return 5; return 5;
} }
if(sdop->ccs != CCS_SEG_UPLOAD){ if(sdop.ccs != CCS_SEG_UPLOAD){
WARNX("SDO_write(): got wrong answer"); WARNX("SDO_write(): got wrong answer");
return 6; return 6;
} }

View File

@ -78,8 +78,8 @@ typedef struct{
} SDO; } SDO;
const char *abortcode_text(uint32_t abortcode); const char *abortcode_text(uint32_t abortcode);
SDO *parseSDO(CANmesg *mesg); SDO *parseSDO(CANmesg *mesg, SDO *sdo);
SDO *readSDOvalue(uint16_t idx, uint8_t subidx, uint8_t NID); SDO *readSDOvalue(uint16_t idx, uint8_t subidx, uint8_t NID, SDO *sdo);
int64_t SDO_read(const SDO_dic_entry *e, uint8_t NID); int64_t SDO_read(const SDO_dic_entry *e, uint8_t NID);

View File

@ -91,12 +91,6 @@ int main(int argc, char **argv){
} }
} }
#endif #endif
/*
* INSERT CODE HERE
* connection check & device validation
*/
//if(!G->terminal) signals(15); // there's not main controller connected to given terminal
daemonize(GP->port); daemonize(GP->port);
return 0; return 0;
} }

View File

@ -17,7 +17,6 @@
*/ */
#include "aux.h" #include "aux.h"
#include "canbus.h"
#include "canopen.h" #include "canopen.h"
#include "cmdlnopts.h" #include "cmdlnopts.h"
#include "processmotors.h" #include "processmotors.h"
@ -34,7 +33,9 @@
static int CANspeed = 0; // default speed, if !=0 set it when connected static int CANspeed = 0; // default speed, if !=0 set it when connected
// all messages are in format "ID [data]" // all messages are in format "ID [data]"
message CANbusMessages = {0}; // CANserver thread is master static message CANbusMessages = {0}; // CANserver thread is master
#define CANBUSPUSH(mesg) mesgAddObj(&CANbusMessages, mesg, sizeof(CANmesg))
#define CANBUSPOP() mesgGetObj(&CANbusMessages, NULL)
// basic threads // basic threads
// messages: master - thread, slave - caller // messages: master - thread, slave - caller
@ -113,9 +114,21 @@ static void reopen_device(){
// do something with can message: send to receiver // do something with can message: send to receiver
static void processCANmessage(CANmesg *mesg){ static void processCANmessage(CANmesg *mesg){
threadinfo *ti = findThreadByID(mesg->ID); threadinfo *ti = findThreadByID(0);
if(!ti) return; if(ti){
DBG("Found"); /*char buf[64], *ptr = buf;
int l = 64, x;
x = snprintf(ptr, l, "#0x%03X ", cm.ID);
l -= x; ptr += x;
for(int i = 0; i < cm.len; ++i){
x = snprintf(ptr, l, "0x%02X ", cm.data[i]);
l -= x; ptr += x;
}*/
mesgAddObj(&ti->answers, (void*)mesg, sizeof(CANmesg));
}
ti = findThreadByID(mesg->ID);
if(ti){
/* DBG("Found");
char buf[64], *ptr = buf; char buf[64], *ptr = buf;
int l = 64, x; int l = 64, x;
x = snprintf(ptr, l, "#0x%03X ", mesg->ID); x = snprintf(ptr, l, "#0x%03X ", mesg->ID);
@ -124,7 +137,10 @@ static void processCANmessage(CANmesg *mesg){
x = snprintf(ptr, l, "0x%02X ", mesg->data[i]); x = snprintf(ptr, l, "0x%02X ", mesg->data[i]);
l -= x; ptr += x; l -= x; ptr += x;
} }
addmesg(&ti->answers, buf); mesgAddText(&ti->answers, buf);
*/
mesgAddObj(&ti->answers, (void*) mesg, sizeof(CANmesg));
}
} }
/** /**
@ -135,37 +151,19 @@ static void processCANmessage(CANmesg *mesg){
void *CANserver(_U_ void *data){ void *CANserver(_U_ void *data){
reopen_device(); reopen_device();
while(1){ while(1){
CANmesg cm; CANmesg *msg = CANBUSPOP();
char *mesg = getmesg(&CANbusMessages); if(msg){
if(mesg){ if(canbus_write(msg)){
if(parsePacket(&cm, mesg)){
LOGMSG("Received wrong CAN message: %s", mesg);
DBG("Bad message: %s", mesg);
}else{
if(canbus_write(&cm)){
LOGWARN("Can't write to CANbus, try to reopen"); LOGWARN("Can't write to CANbus, try to reopen");
WARNX("Can't write to canbus"); WARNX("Can't write to canbus");
if(canbus_disconnected()) reopen_device(); if(canbus_disconnected()) reopen_device();
} }
} FREE(msg);
FREE(mesg);
} }
usleep(1000); usleep(1000);
CANmesg cm;
if(!canbus_read(&cm)){ // got raw message from CAN bus - parce it if(!canbus_read(&cm)){ // got raw message from CAN bus - parce it
DBG("Got CAN message from %d, len: %d", cm.ID, cm.len); DBG("Got CAN message from %d, len: %d", cm.ID, cm.len);
// send raw message to 0
threadinfo *ti = findThreadByID(0);
if(ti){
char buf[64], *ptr = buf;
int l = 64, x;
x = snprintf(ptr, l, "#0x%03X ", cm.ID);
l -= x; ptr += x;
for(int i = 0; i < cm.len; ++i){
x = snprintf(ptr, l, "0x%02X ", cm.data[i]);
l -= x; ptr += x;
}
addmesg(&ti->answers, buf);
}
processCANmessage(&cm); processCANmessage(&cm);
}else if(canbus_disconnected()) reopen_device(); }else if(canbus_disconnected()) reopen_device();
} }
@ -181,19 +179,19 @@ void *CANserver(_U_ void *data){
static void *stpemulator(void *arg){ static void *stpemulator(void *arg){
threadinfo *ti = (threadinfo*)arg; threadinfo *ti = (threadinfo*)arg;
while(1){ while(1){
char *mesg = getmesg(&ti->commands); char *mesg = mesgGetText(&ti->commands);
if(mesg){ if(mesg){
DBG("Stepper emulator got: %s", mesg); DBG("Stepper emulator got: %s", mesg);
addmesg(&ServerMessages, mesg); mesgAddText(&ServerMessages, mesg);
/* do something */ /* do something */
FREE(mesg); FREE(mesg);
} }
int r100 = rand() % 10000; int r100 = rand() % 10000;
if(r100 < 20){ // 20% of probability if(r100 < 1){ // 10% of probability
addmesg(&ServerMessages, "stpemulator works fine!"); mesgAddText(&ServerMessages, "stpemulator works fine!");
} }
if(r100 > 9998){ if(r100 > 9998){
addmesg(&ServerMessages, "O that's good!"); mesgAddText(&ServerMessages, "O that's good!");
} }
usleep(1000); usleep(1000);
} }
@ -211,16 +209,25 @@ static void *stpemulator(void *arg){
static void *rawcommands(void *arg){ static void *rawcommands(void *arg){
threadinfo *ti = (threadinfo*)arg; threadinfo *ti = (threadinfo*)arg;
while(1){ while(1){
char *mesg = getmesg(&ti->commands); char *mesg = mesgGetText(&ti->commands);
if(mesg){ if(mesg){
DBG("Got raw command: %s", mesg); DBG("Got raw command: %s", mesg);
addmesg(&CANbusMessages, mesg); CANmesg cm;
if(!parsePacket(&cm, mesg)) CANBUSPUSH(&cm);
FREE(mesg); FREE(mesg);
} }
mesg = getmesg(&ti->answers); CANmesg *ans = (CANmesg*) mesgGetObj(&ti->answers, NULL);
if(mesg){ // got raw answer from bus to thread ID, send it to all if(ans){ // got raw answer from bus to thread ID, send it to all
addmesg(&ServerMessages, mesg); char buf[64], *ptr = buf;
FREE(mesg); int l = 64, x;
x = snprintf(ptr, l, "#0x%03X ", ans->ID);
l -= x; ptr += x;
for(int i = 0; i < ans->len; ++i){
x = snprintf(ptr, l, "0x%02X ", ans->data[i]);
l -= x; ptr += x;
}
mesgAddText(&ServerMessages, buf);
FREE(ans);
} }
usleep(1000); usleep(1000);
} }
@ -245,25 +252,19 @@ static void sendSDO(char *mesg){
} }
DBG("User's message have %d ints", N); DBG("User's message have %d ints", N);
uint8_t data[8] = {0}, datalen = (uint8_t) N - 3; CANmesg comesg;
data[0] = SDO_CCS(CCS_INIT_DOWNLOAD); uint8_t datalen = (uint8_t) N - 3;
comesg.data[0] = SDO_CCS(CCS_INIT_DOWNLOAD);
comesg.len = 8;
if(datalen){ // there's data if(datalen){ // there's data
data[0] |= SDO_N(datalen) | SDO_E | SDO_S; comesg.data[0] |= SDO_N(datalen) | SDO_E | SDO_S;
for(int i = 0; i < datalen; ++i) data[4+i] = (uint8_t)(info[3+i]); for(int i = 0; i < datalen; ++i) comesg.data[4+i] = (uint8_t)(info[3+i]);
} }
data[1] = info[1] & 0xff; comesg.data[1] = info[1] & 0xff;
data[2] = (info[1] >> 8) & 0xff; comesg.data[2] = (info[1] >> 8) & 0xff;
data[3] = (uint8_t)(info[2]); comesg.data[3] = (uint8_t)(info[2]);
comesg.ID = (uint16_t)(RSDO_COBID + info[0]);
char buf[64], *ptr = buf; CANBUSPUSH(&comesg);
int l = 64, x;
x = snprintf(ptr, l, "0x%03X ", (uint16_t)(RSDO_COBID + info[0]));
l -= x; ptr += x;
for(int i = 0; i < 8; ++i){
x = snprintf(ptr, l, "0x%02X ", (uint16_t)(data[i]));
l -= x; ptr += x;
}
addmesg(&CANbusMessages, buf);
} }
// send raw CANopen commands // send raw CANopen commands
@ -271,16 +272,36 @@ static void sendSDO(char *mesg){
static void *canopencmds(void *arg){ static void *canopencmds(void *arg){
threadinfo *ti = (threadinfo*)arg; threadinfo *ti = (threadinfo*)arg;
while(1){ while(1){
char *mesg = getmesg(&ti->commands); char *mesg = mesgGetText(&ti->commands);
if(mesg) do{ if(mesg) do{
DBG("Got CANopen command: %s", mesg); DBG("Got CANopen command: %s", mesg);
sendSDO(mesg); sendSDO(mesg);
FREE(mesg); FREE(mesg);
}while(0); }while(0);
mesg = getmesg(&ti->answers); CANmesg *ans = (CANmesg*)mesgGetObj(&ti->answers, NULL);
if(mesg){ // got raw answer from bus to thread ID, analize it if(ans){ // got raw answer from bus to thread ID, analize it
addmesg(&ServerMessages, mesg); SDO sdo;
FREE(mesg); if(parseSDO(ans, &sdo)){
char buf[128], *ptr = buf;
int rest = 128;
int l = snprintf(ptr, rest, "SDO={nid=0x%02X, idx=0x%04X, subidx=%d, ccs=0x%02X, datalen=%d",
sdo.NID, sdo.index, sdo.subindex, sdo.ccs, sdo.datalen);
ptr += l; rest -= l;
if(sdo.datalen){
l = snprintf(ptr, rest, ", data=[");
ptr += l; rest -= l;
for(int idx = 0; idx < sdo.datalen; ++idx){
if(idx) l = snprintf(ptr, rest, ", 0x%02X", sdo.data[idx]);
else l = snprintf(ptr, rest, "0x%02X", sdo.data[idx]);
ptr += l; rest -= l;
}
l = snprintf(ptr, rest, "]");
ptr += l; rest -= l;
}
snprintf(ptr, rest, "}");
mesgAddText(&ServerMessages, buf);
}
FREE(ans);
} }
usleep(1000); usleep(1000);
} }

View File

@ -20,9 +20,9 @@
#ifndef PROCESSMOTORS_H__ #ifndef PROCESSMOTORS_H__
#define PROCESSMOTORS_H__ #define PROCESSMOTORS_H__
#include "canbus.h"
#include "threadlist.h" #include "threadlist.h"
extern message CANbusMessages;
extern thread_handler CANhandlers[]; extern thread_handler CANhandlers[];
void *CANserver(void *data); void *CANserver(void *data);

View File

@ -73,7 +73,7 @@ static const char *listthr(_U_ char *par1, _U_ char *par2){
list = nextThread(list); list = nextThread(list);
if(!list) break; if(!list) break;
snprintf(msg, 256, "thread name='%s' role='%s' ID=0x%X", list->ti.name, list->ti.handler.name, list->ti.ID); snprintf(msg, 256, "thread name='%s' role='%s' ID=0x%X", list->ti.name, list->ti.handler.name, list->ti.ID);
addmesg(&ServerMessages, msg); mesgAddText(&ServerMessages, msg);
empty = 0; empty = 0;
}while(1); }while(1);
if(empty) return "No threads"; if(empty) return "No threads";
@ -132,7 +132,7 @@ static const char *sendmsg(char *thrname, char *data){
FNAME(); FNAME();
threadinfo *ti = findThreadByName(thrname); threadinfo *ti = findThreadByName(thrname);
if(!ti) return ANS_NOTFOUND; if(!ti) return ANS_NOTFOUND;
if(!addmesg(&ti->commands, data)) return ANS_CANTSEND; if(!mesgAddText(&ti->commands, data)) return ANS_CANTSEND;
return ANS_OK; return ANS_OK;
} }

View File

@ -153,7 +153,7 @@ static void *server(void *asock){
} }
} }
} // endfor } // endfor
char *srvmesg = getmesg(&ServerMessages); // broadcast messages to all clients char *srvmesg = mesgGetText(&ServerMessages); // broadcast messages to all clients
if(srvmesg){ // send broadcast message to all clients or throw them to /dev/null if(srvmesg){ // send broadcast message to all clients or throw them to /dev/null
for(int fdidx = 1; fdidx < nfd; ++fdidx){ for(int fdidx = 1; fdidx < nfd; ++fdidx){
send_data(poll_set[fdidx].fd, srvmesg); send_data(poll_set[fdidx].fd, srvmesg);

View File

@ -32,16 +32,18 @@ static threadlist *thelist = NULL;
* @param v (i) - data to push * @param v (i) - data to push
* @return pointer to just pushed node * @return pointer to just pushed node
*/ */
static msglist *pushmessage(msglist **lst, char *v){ static msglist *pushmessage(msglist **lst, void *v, size_t size){
if(!lst || !v) return NULL; if(!lst || !v) return NULL;
msglist *node; msglist *node;
if((node = MALLOC(msglist, 1)) == 0) if((node = MALLOC(msglist, 1)) == 0)
return NULL; // allocation error return NULL; // allocation error
node->data = strdup(v); node->data = malloc(size);
if(!node->data){ if(!node->data){
FREE(node); FREE(node);
return NULL; return NULL;
} }
memcpy(node->data, v, size);
node->size = size;
if(!*lst){ if(!*lst){
*lst = node; *lst = node;
(*lst)->last = node; (*lst)->last = node;
@ -55,14 +57,16 @@ static msglist *pushmessage(msglist **lst, char *v){
/** /**
* @brief popmessage - get data from head of list * @brief popmessage - get data from head of list
* @param lst (io) - list * @param lst (io) - list
* @param size (o) - data size
* @return data from first node or NULL if absent (SHOULD BE FREEd AFER USAGE!) * @return data from first node or NULL if absent (SHOULD BE FREEd AFER USAGE!)
*/ */
static char *popmessage(msglist **lst){ static void *popmessage(msglist **lst, size_t *size){
if(!lst || !*lst) return NULL; if(!lst || !*lst) return NULL;
char *ret; char *ret;
msglist *node = *lst; msglist *node = *lst;
if(node->next) node->next->last = node->last; // pop not last message if(node->next) node->next->last = node->last; // pop not last message
ret = node->data; ret = node->data;
if(size) *size = node->size;
*lst = node->next; *lst = node->next;
FREE(node); FREE(node);
return ret; return ret;
@ -115,18 +119,17 @@ threadinfo *findThreadByID(int ID){
} }
/** /**
* @brief addmesg - add message to thread's queue * @brief mesgAddObj - add any object to message
* @param msg - message itself * @param msg (i) - message
* @param txt - data to add * @param data (i) - any data
* @return data added or NULL if failed * @param size - it's size
* @return pointer to message data if success (NULL if failed)
*/ */
char *addmesg(message *msg, char *txt){ void *mesgAddObj(message *msg, void *data, size_t size){
if(!msg) return NULL; if(!msg || !data || size == 0) return NULL;
size_t L = strlen(txt); DBG("Want to add mesg with length %zd", size);
if(L < 1) return NULL;
DBG("Want to add mesg '%s' with length %zd", txt, L);
if(pthread_mutex_lock(&msg->mutex)) return NULL; if(pthread_mutex_lock(&msg->mutex)) return NULL;
msglist *node = pushmessage(&msg->text, txt); msglist *node = pushmessage(&msg->msg, data, size);
if(!node){ if(!node){
pthread_mutex_unlock(&msg->mutex); pthread_mutex_unlock(&msg->mutex);
return NULL; return NULL;
@ -136,19 +139,42 @@ char *addmesg(message *msg, char *txt){
} }
/** /**
* @brief getmesg - get first message from queue (allocates data, should be free'd after usage!) * @brief mesgAddText - add message to thread's queue
* @param msg - message itself * @param msg - message itself
* @return data or NULL if empty * @param txt - data to add
* @return data added or NULL if failed
*/ */
char *getmesg(message *msg){ char *mesgAddText(message *msg, char *txt){
if(!txt) return NULL;
DBG("mesg add text '%s'", txt);
size_t l = strlen(txt) + 1;
return mesgAddObj(msg, (void*)txt, l);
}
/**
* @brief mesgGetObj - get object from message
* @param msg (i) - message
* @param size (o) - object's size or NULL
* @return pointer to object or NULL if absent
*/
void *mesgGetObj(message *msg, size_t *size){
if(!msg) return NULL; if(!msg) return NULL;
char *text = NULL; char *text = NULL;
if(pthread_mutex_lock(&msg->mutex)) return NULL; if(pthread_mutex_lock(&msg->mutex)) return NULL;
text = popmessage(&msg->text); text = popmessage(&msg->msg, size);
pthread_mutex_unlock(&msg->mutex); pthread_mutex_unlock(&msg->mutex);
return text; return text;
} }
/**
* @brief mesgGetText - get first message from queue (allocates data, should be free'd after usage!)
* @param msg - message itself
* @return data or NULL if empty
*/
char *mesgGetText(message *msg){
return (char*) mesgGetObj(msg, NULL);
}
/** /**
* @brief registerThread - register new thread * @brief registerThread - register new thread
* @param name - thread name * @param name - thread name
@ -212,10 +238,10 @@ int killThread(threadlist *lptr, threadlist *prev){
else if(prev) prev->next = next; else if(prev) prev->next = next;
char *txt; char *txt;
pthread_mutex_lock(&lptr->ti.commands.mutex); pthread_mutex_lock(&lptr->ti.commands.mutex);
while((txt = popmessage(&lptr->ti.commands.text))) FREE(txt); while((txt = popmessage(&lptr->ti.commands.msg, NULL))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.commands.mutex); pthread_mutex_destroy(&lptr->ti.commands.mutex);
pthread_mutex_lock(&lptr->ti.answers.mutex); pthread_mutex_lock(&lptr->ti.answers.mutex);
while((txt = popmessage(&lptr->ti.answers.text))) FREE(txt); while((txt = popmessage(&lptr->ti.answers.msg, NULL))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.answers.mutex); pthread_mutex_destroy(&lptr->ti.answers.mutex);
if(pthread_cancel(lptr->ti.thread)) WARN("Can't kill thread '%s'", lptr->ti.name); if(pthread_cancel(lptr->ti.thread)) WARN("Can't kill thread '%s'", lptr->ti.name);
FREE(lptr); FREE(lptr);

View File

@ -27,13 +27,14 @@
// messages FIFO // messages FIFO
typedef struct msglist_{ typedef struct msglist_{
char *data; // message itself void *data; // message itself
size_t size; // message length in bytes
struct msglist_ *next, *last; // other elements of list struct msglist_ *next, *last; // other elements of list
} msglist; } msglist;
// interthread messages; index 0 - MOSI, index 1 - MISO // interthread messages; index 0 - MOSI, index 1 - MISO
typedef struct{ typedef struct{
msglist *text; // stringified text messages msglist *msg; // stringified text messages
pthread_mutex_t mutex; // text changing mutex pthread_mutex_t mutex; // text changing mutex
} message; } message;
@ -47,8 +48,8 @@ typedef struct{
typedef struct{ typedef struct{
char name[THREADNAMEMAXLEN+1]; // thread name char name[THREADNAMEMAXLEN+1]; // thread name
int ID; // numeric ID (canopen ID) int ID; // numeric ID (canopen ID)
message commands; // commands from clients message commands; // commands from clients (char *)
message answers; // answers from CANserver (raw messages to given ID) message answers; // answers from CANserver (CANmesg *)
pthread_t thread; // thread descriptor pthread_t thread; // thread descriptor
thread_handler handler; // handler name & function thread_handler handler; // handler name & function
} threadinfo; } threadinfo;
@ -64,7 +65,9 @@ threadinfo *findThreadByID(int ID);
threadinfo *registerThread(char *name, int ID, thread_handler *handler); threadinfo *registerThread(char *name, int ID, thread_handler *handler);
threadlist *nextThread(threadlist *curr); threadlist *nextThread(threadlist *curr);
int killThreadByName(const char *name); int killThreadByName(const char *name);
char *getmesg(message *msg); char *mesgGetText(message *msg);
char *addmesg(message *msg, char *txt); char *mesgAddText(message *msg, char *txt);
void *mesgGetObj(message *msg, size_t *size);
void *mesgAddObj(message *msg, void *data, size_t size);
#endif // THREADLIST_H__ #endif // THREADLIST_H__