From 1f6be7644f2306b763413c832af6185a9eae6749 Mon Sep 17 00:00:00 2001 From: Edward Emelianov Date: Tue, 17 Nov 2020 19:01:49 +0300 Subject: [PATCH] add raw can & canopen commands --- canserver/canbus.c | 81 +++++++------ canserver/canbus.h | 3 +- canserver/canopen.c | 6 - canserver/processmotors.c | 247 +++++++++++++++++++++++++++++++++----- canserver/processmotors.h | 1 + canserver/proto.c | 95 ++++++++------- canserver/socket.c | 21 +--- canserver/threadlist.c | 157 +++++------------------- canserver/threadlist.h | 35 +++--- 9 files changed, 362 insertions(+), 284 deletions(-) diff --git a/canserver/canbus.c b/canserver/canbus.c index 1974640..dea4426 100644 --- a/canserver/canbus.c +++ b/canserver/canbus.c @@ -22,6 +22,7 @@ #include #include +#include "aux.h" #include "canbus.h" #ifndef BUFLEN @@ -43,6 +44,7 @@ This file should provide next functions: static TTY_descr *dev = NULL; // shoul be global to restore if die static int serialspeed = 115200; // speed to open serial device +static int disconnected = 1; // ==1 if disconnected static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static char *read_string(); @@ -55,6 +57,7 @@ static char *read_string(); */ static int read_ttyX(TTY_descr *d){ if(!d || d->comfd < 0) return -1; + if(disconnected) return -1; size_t L = 0; ssize_t l; size_t length = d->bufsz; @@ -71,6 +74,8 @@ static int read_ttyX(TTY_descr *d){ if (!retval) break; if(FD_ISSET(d->comfd, &rfds)){ if((l = read(d->comfd, ptr, length)) < 1){ + WARN("TTY disconnected"); + disconnected = 1; return -1; // disconnect or other error - close TTY & die } ptr += l; L += l; @@ -86,21 +91,18 @@ static int read_ttyX(TTY_descr *d){ // thread-safe writing, add trailing '\n' static int ttyWR(const char *buff, int len){ FNAME(); + if(disconnected) return 1; pthread_mutex_lock(&mutex); //canbus_clear(); read_string(); // clear RX buffer - DBG("Write 2tty %d bytes: ", len); -#ifdef EBUG - int _U_ n = write(STDERR_FILENO, buff, len); - fprintf(stderr, "\n"); - double t0 = dtime(); -#endif int w = write_tty(dev->comfd, buff, (size_t)len); if(!w) w = write_tty(dev->comfd, "\n", 1); - DBG("Written, dt=%g", dtime() - t0); int errctr = 0; while(1){ char *s = read_string(); // clear echo & check + if(disconnected){ + w = 1; break; + } if(!s || strncmp(s, buff, strlen(buff)) != 0){ if(++errctr > 3){ WARNX("wrong answer! Got '%s' instead of '%s'", s, buff); @@ -110,12 +112,12 @@ static int ttyWR(const char *buff, int len){ }else break; } pthread_mutex_unlock(&mutex); - DBG("Success, dt=%g", dtime() - t0); return w; } void canbus_close(){ if(dev) close_tty(&dev); + disconnected = 1; } void setserialspeed(int speed){ @@ -123,7 +125,7 @@ void setserialspeed(int speed){ } void canbus_clear(){ - while(read_ttyX(dev)); + while(read_ttyX(dev) > 0); } int canbus_open(const char *devname){ @@ -131,6 +133,7 @@ int canbus_open(const char *devname){ WARNX("canbus_open(): need device name"); return 1; } + disconnected = 1; if(dev) close_tty(&dev); dev = new_tty((char*)devname, serialspeed, BUFLEN); if(dev){ @@ -140,10 +143,12 @@ int canbus_open(const char *devname){ if(!dev){ return 1; } + disconnected = 0; return 0; } int canbus_setspeed(int speed){ + if(disconnected) return 1; if(speed == 0) return 0; // default - not change char buff[BUFLEN]; if(speed < 10 || speed > 3000){ @@ -153,12 +158,18 @@ int canbus_setspeed(int speed){ int len = snprintf(buff, BUFLEN, "b %d", speed); if(len < 1) return 2; int r = ttyWR(buff, len); - read_string(); // clear RX buf ('Reinit CAN bus with speed XXXXkbps') + canbus_clear(); return r; } +/** + * @brief canbus_write - write message to CAN bus + * @param mesg - raw message + * @return 0 if all OK + */ int canbus_write(CANmesg *mesg){ FNAME(); + if(disconnected) return 1; char buf[BUFLEN]; if(!mesg || mesg->len > 8) return 1; int rem = BUFLEN, len = 0; @@ -178,6 +189,7 @@ int canbus_write(CANmesg *mesg){ * @return NULL if nothing was read or pointer to static buffer */ static char *read_string(){ + if(disconnected) return NULL; static char buf[1024]; int LL = 1023, r = 0, l; char *ptr = NULL; @@ -196,7 +208,9 @@ static char *read_string(){ do{ if((l = read_ttyX(dev))){ if(l < 0){ - ERR("tty disconnected"); + LOGERR("Tty disconnected"); + disconnected = 1; + return NULL; } if(l > LL){ // buffer overflow WARNX("read_string(): buffer overflow"); @@ -206,7 +220,6 @@ static char *read_string(){ memcpy(ptr, dev->buf, dev->buflen); r += l; LL -= l; ptr += l; if(ptr[-1] == '\n'){ - //DBG("Newline detected"); break; } d0 = dtime(); @@ -220,16 +233,20 @@ static char *read_string(){ ++optr; }else{ WARNX("read_string(): no newline found"); - DBG("buf: %s", buf); optr = NULL; return NULL; } - DBG("buf: %s, time: %g", buf, dtime() - d0); return buf; } return NULL; } +/** + * @brief parseCANmesg - message parser + * @param str - string from terminal: time #ID [data] + * @return NULL if error or pointer to static structure + * Not thread safe!!! + */ CANmesg *parseCANmesg(const char *str){ static CANmesg m; int l = sscanf(str, "%d #0x%hx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx", &m.timemark, &m.ID, @@ -239,40 +256,32 @@ CANmesg *parseCANmesg(const char *str){ return &m; } -#ifdef EBUG -void showM(CANmesg *m){ - printf("TS=%d, ID=0x%X", m->timemark, m->ID); - int l = m->len; - if(l) printf(", data="); - for(int i = 0; i < l; ++i) printf(" 0x%02X", m->data[i]); - printf("\n"); -} -#endif - +/** + * @brief canbus_read - read any message from CAN bus + * @param mesg - pointer to message + * @return 0 if all OK + */ int canbus_read(CANmesg *mesg){ if(!mesg) return 1; + if(disconnected) return 1; pthread_mutex_lock(&mutex); - double t0 = dtime(); - int ID = mesg->ID; char *ans; CANmesg *m; + double t0 = dtime(); while(dtime() - t0 < T_POLLING_TMOUT){ // read answer if((ans = read_string())){ // parse new data if((m = parseCANmesg(ans))){ - DBG("Got canbus message (dT=%g):", dtime() - t0); -#ifdef EBUG - showM(m); -#endif - if(ID && m->ID == ID){ - memcpy(mesg, m, sizeof(CANmesg)); - DBG("All OK"); - pthread_mutex_unlock(&mutex); - return 0; - } + memcpy(mesg, m, sizeof(CANmesg)); + pthread_mutex_unlock(&mutex); + return 0; } } + if(disconnected) break; } pthread_mutex_unlock(&mutex); return 1; } +int canbus_disconnected(){ + return disconnected; +} diff --git a/canserver/canbus.h b/canserver/canbus.h index f2c2bc8..c26421e 100644 --- a/canserver/canbus.h +++ b/canserver/canbus.h @@ -22,7 +22,7 @@ #include #ifndef T_POLLING_TMOUT -#define T_POLLING_TMOUT (0.5) +#define T_POLLING_TMOUT (0.01) #endif typedef struct{ @@ -44,5 +44,6 @@ void canbus_clear(); void setserialspeed(int speed); void showM(CANmesg *m); CANmesg *parseCANmesg(const char *str); +int canbus_disconnected(); #endif // CANBUS_H__ diff --git a/canserver/canopen.c b/canserver/canopen.c index 17800d9..a0e5bc1 100644 --- a/canserver/canopen.c +++ b/canserver/canopen.c @@ -110,12 +110,6 @@ static CANmesg *mkMesg(SDO *sdo){ mesg.data[1] = sdo->index & 0xff; // l mesg.data[2] = (sdo->index >> 8) & 0xff; // h mesg.data[3] = sdo->subindex; -#if 0 - FNAME(); - green("Make message to 0x%X: ", mesg.ID); - for(uint8_t i = 0; i < 8; ++i) printf("0x%02X ", mesg.data[i]); - printf("\n"); -#endif return &mesg; } diff --git a/canserver/processmotors.c b/canserver/processmotors.c index f48a389..e2b4214 100644 --- a/canserver/processmotors.c +++ b/canserver/processmotors.c @@ -31,13 +31,22 @@ #include // usleep #include +static int CANspeed = 0; // default speed, if !=0 set it when connected + +// all messages are in format "ID [data]" message CANbusMessages = {0}; // CANserver thread is master +// basic threads +// messages: master - thread, slave - caller static void *stpemulator(void *arg); +static void *rawcommands(void *arg); +static void *canopencmds(void *arg); // handlers for standard types thread_handler CANhandlers[] = { {"emulation", stpemulator}, + {"raw", rawcommands}, + {"canopen", canopencmds}, {NULL, NULL} }; @@ -49,66 +58,244 @@ thread_handler *get_handler(const char *name){ return NULL; } +/** + * @brief parsePacket - convert text into can packet data + * @param packet (o) - pointer to CANpacket or NULL (just to check) + * @param data - text in format "ID [byte0 ... byteN]" + * @return 0 if all OK + */ +static int parsePacket(CANmesg *packet, char *data){ + if(!data || *data == 0){ // no data + return 1; + } + long info[10]={0}; + int N = 0; + char *saveptr = NULL; + for(char *s = data; N < 10; s = NULL, ++N){ + char *nxt = strtok_r(s, " \t,;\r\n", &saveptr); + if(!nxt) break; + if(str2long(nxt, &info[N])) break; + } + if(N > 9 || N == 0) return 1; // ID + >8 data bytes or no at all + if(packet){ + packet->ID = info[0]; + packet->len = N - 1; + } + for(int i = 1; i < N; ++i){ + if(info[i] < 0 || info[i] > 0xff) return 2; + if(packet) packet->data[i-1] = (uint8_t) info[i]; + } + return 0; +} + +// [re]open serial device +static void reopen_device(){ + char *devname = NULL; + double t0 = dtime(); + canbus_close(); + DBG("Try to [re]open serial device"); + while(dtime() - t0 < 5.){ + if((devname = find_device())) break; + usleep(1000); + } + if(!devname || canbus_open(devname)){ + FREE(devname); + LOGERR("Can't find serial device"); + ERRX("Can't find serial device"); + }else {DBG("Opened device: %s", devname);} + if(CANspeed){ // set default speed + canbus_clear(); + canbus_setspeed(CANspeed); + canbus_clear(); + } + FREE(devname); +} + +// do something with can message: send to receiver +static void processCANmessage(CANmesg *mesg){ + threadinfo *ti = findThreadByID(mesg->ID); + if(!ti) return; + DBG("Found"); + char buf[64], *ptr = buf; + int l = 64, x; + x = snprintf(ptr, l, "#0x%03X ", mesg->ID); + l -= x; ptr += x; + for(int i = 0; i < mesg->len; ++i){ + x = snprintf(ptr, l, "0x%02X ", mesg->data[i]); + l -= x; ptr += x; + } + addmesg(&ti->answers, buf); +} + /** * @brief CANserver - main CAN thread; receive/transmit raw messages by CANbusMessages * @param data - unused * @return unused */ void *CANserver(_U_ void *data){ - char *devname = find_device(); - if(!devname){ - LOGERR("Can't find serial device"); - ERRX("Can't find serial device"); - } + reopen_device(); while(1){ - int fd = open(devname, O_RDONLY); - if(fd == -1){ - WARN("open()"); - LOGWARN("Device %s is absent", devname); - FREE(devname); - double t0 = dtime(); - while(dtime() - t0 < 5.){ - if((devname = find_device())) break; - usleep(1000); - } - if(!devname){ - LOGERR("Can't open serial device, kill myself"); - ERRX("Can't open device, kill myself"); - }else LOGMSG("Change device to %s", devname); - }else close(fd); - - char *mesg = getmesg(idxMISO, &CANbusMessages); + CANmesg cm; + char *mesg = getmesg(&CANbusMessages); if(mesg){ - DBG("Received message: %s", mesg); + if(parsePacket(&cm, mesg)){ + LOGMSG("Received wrong CAN message: %s", mesg); + DBG("Bad message: %s", mesg); + }else{ + if(canbus_write(&cm)){ + LOGWARN("Can't write to CANbus, try to reopen"); + WARNX("Can't write to canbus"); + if(canbus_disconnected()) reopen_device(); + } + } FREE(mesg); - // global messages to all clients: - addmesg(idxMISO, &ServerMessages, "CANserver works\n"); } + usleep(1000); + if(!canbus_read(&cm)){ // got raw message from CAN bus - parce it + DBG("Got CAN message from %d, len: %d", cm.ID, cm.len); + // send raw message to 0 + threadinfo *ti = findThreadByID(0); + if(ti){ + char buf[64], *ptr = buf; + int l = 64, x; + x = snprintf(ptr, l, "#0x%03X ", cm.ID); + l -= x; ptr += x; + for(int i = 0; i < cm.len; ++i){ + x = snprintf(ptr, l, "0x%02X ", cm.data[i]); + l -= x; ptr += x; + } + addmesg(&ti->answers, buf); + } + processCANmessage(&cm); + }else if(canbus_disconnected()) reopen_device(); } LOGERR("CANserver(): UNREACHABLE CODE REACHED!"); return NULL; } - +/** + * @brief stpemulator - stepper motor emulator + * @param arg - threadinfo + * @return unused + */ static void *stpemulator(void *arg){ threadinfo *ti = (threadinfo*)arg; while(1){ - char *mesg = getmesg(idxMISO, &ti->mesg); + char *mesg = getmesg(&ti->commands); if(mesg){ DBG("Stepper emulator got: %s", mesg); - addmesg(idxMISO, &ServerMessages, mesg); + addmesg(&ServerMessages, mesg); /* do something */ FREE(mesg); } int r100 = rand() % 10000; if(r100 < 20){ // 20% of probability - addmesg(idxMISO, &ServerMessages, "stpemulator works fine!\n"); + addmesg(&ServerMessages, "stpemulator works fine!"); } if(r100 > 9998){ - addmesg(idxMISO, &ServerMessages, "O that's good!\n"); + addmesg(&ServerMessages, "O that's good!"); } usleep(1000); } LOGERR("stpemulator(): UNREACHABLE CODE REACHED!"); return NULL; } + +/** + * @brief rawcommands - send/receive raw commands + * @param arg - threadinfo + * @return unused + * message format: ID [data]; ID - receiver ID (raw), data - 0..8 bytes of data + * ID == 0 receive everything! + */ +static void *rawcommands(void *arg){ + threadinfo *ti = (threadinfo*)arg; + while(1){ + char *mesg = getmesg(&ti->commands); + if(mesg){ + DBG("Got raw command: %s", mesg); + addmesg(&CANbusMessages, mesg); + FREE(mesg); + } + mesg = getmesg(&ti->answers); + if(mesg){ // got raw answer from bus to thread ID, send it to all + addmesg(&ServerMessages, mesg); + FREE(mesg); + } + usleep(1000); + } + LOGERR("rawcommands(): UNREACHABLE CODE REACHED!"); + return NULL; +} + +// make string for CAN message from command message (NodeID index subindex [data] -> ID data) +static void sendSDO(char *mesg){ + long info[8] = {0}; // 0 - NodeID, 1 - index, 2 - subindex, 3..6 - data[0..4] + int N = 0; + char *saveptr = NULL; + for(char *s = mesg; N < 8; s = NULL, ++N){ + char *nxt = strtok_r(s, " \t,;\r\n", &saveptr); + if(!nxt) break; + if(str2long(nxt, &info[N])) break; + } + if(N > 7 || N < 3){ + WARNX("Got bad CANopen command"); + LOGMSG("Got bad CANopen command"); + return; + } + DBG("User's message have %d ints", N); + + uint8_t data[8] = {0}, datalen = (uint8_t) N - 3; + data[0] = SDO_CCS(CCS_INIT_DOWNLOAD); + if(datalen){ // there's data + data[0] |= SDO_N(datalen) | SDO_E | SDO_S; + for(int i = 0; i < datalen; ++i) data[4+i] = (uint8_t)(info[3+i]); + } + data[1] = info[1] & 0xff; + data[2] = (info[1] >> 8) & 0xff; + data[3] = (uint8_t)(info[2]); + + char buf[64], *ptr = buf; + int l = 64, x; + x = snprintf(ptr, l, "0x%03X ", (uint16_t)(RSDO_COBID + info[0])); + l -= x; ptr += x; + for(int i = 0; i < 8; ++i){ + x = snprintf(ptr, l, "0x%02X ", (uint16_t)(data[i])); + l -= x; ptr += x; + } + addmesg(&CANbusMessages, buf); +} + +// send raw CANopen commands +// message format: NodeID index subindex [data] +static void *canopencmds(void *arg){ + threadinfo *ti = (threadinfo*)arg; + while(1){ + char *mesg = getmesg(&ti->commands); + if(mesg) do{ + DBG("Got CANopen command: %s", mesg); + sendSDO(mesg); + FREE(mesg); + }while(0); + mesg = getmesg(&ti->answers); + if(mesg){ // got raw answer from bus to thread ID, analize it +addmesg(&ServerMessages, mesg); + FREE(mesg); + } + usleep(1000); + } + LOGERR("rawcommands(): UNREACHABLE CODE REACHED!"); + return NULL; +} + + +/** + * @brief setCANspeed - set new speed of CANbus + * @param speed - speed in kbaud + * @return 0 if all OK + */ +int setCANspeed(int speed){ + if(canbus_setspeed(speed)) return 1; + CANspeed = speed; + return 0; +} diff --git a/canserver/processmotors.h b/canserver/processmotors.h index d4477c6..4662b6e 100644 --- a/canserver/processmotors.h +++ b/canserver/processmotors.h @@ -27,5 +27,6 @@ extern thread_handler CANhandlers[]; void *CANserver(void *data); thread_handler *get_handler(const char *name); +int setCANspeed(int speed); #endif // PROCESSMOTORS_H__ diff --git a/canserver/proto.c b/canserver/proto.c index 7cb1861..4e26fa8 100644 --- a/canserver/proto.c +++ b/canserver/proto.c @@ -20,6 +20,7 @@ #include "cmdlnopts.h" #include "processmotors.h" #include "proto.h" +#include "socket.h" #include "threadlist.h" #include @@ -27,15 +28,16 @@ #include // standard answers of processCommand -static const char *ANS_OK = "OK\n"; -static const char *ANS_WRONGCANID = "Wrong CANID\n"; -static const char *ANS_NOTFOUND = "Thread not found\n"; -static const char *ANS_CANTSEND = "Can't send message\n"; +static const char *ANS_OK = "OK"; +static const char *ANS_WRONGCANID = "Wrong CANID"; +static const char *ANS_NOTFOUND = "Thread not found"; +static const char *ANS_CANTSEND = "Can't send message"; -static const char *sendraw(char *id, char *data); +static const char *listthr(_U_ char *par1, _U_ char *par2); static const char *regthr(char *thrname, char *data); static const char *unregthr(char *thrname, char *data); static const char *sendmsg(char *thrname, char *data); +static const char *setspd(char *speed, _U_ char *data); /* * Commands format: @@ -53,39 +55,29 @@ typedef struct{ // array with known functions static cmditem functions[] = { - {"raw", sendraw}, - {"register", regthr}, - {"unregister", unregthr}, - {"mesg", sendmsg}, + {"list", listthr}, // list threads + {"mesg", sendmsg}, // "mesg NAME ID [data]" + {"register", regthr}, // "register NAME ID", ID - RAW CAN ID (not canopen ID)!!! + {"speed", setspd}, // set CANbus speed + {"unregister", unregthr}, // "unregister NAME" {NULL, NULL} }; -/** - * @brief sendraw - send raw data to CANbus - * @param id - CANid (in string format) - * @param data - data to send (delimeters are: space, tab, comma or semicolon) - * WARNING! parameter `data` will be broken after this function - * id & data can be decimal, hexadecimal or octal - * @return answer to client - */ -static const char *sendraw(char *id, char *data){ - char buf[128], *s, *saveptr; - if(!id) return "Need CAN ID\n"; - long ID, info[9]={0}; - int i; - if(str2long(id, &ID)){ - return ANS_WRONGCANID; - } - for(s = data, i = 0; i < 9; s = NULL, ++i){ - char *nxt = strtok_r(s, " \t,;\r\n", &saveptr); - if(!nxt) break; - if(str2long(nxt, &info[i])) break; - } - if(i > 8) return "Not more than 8 data bytes\n"; - snprintf(buf, 128, "ID=%ld, datalen=%d, data={%ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld}\n", - ID, i, info[0], info[1], info[2], info[3], info[4], info[5], info[6], info[7]); - addmesg(idxMISO, &CANbusMessages, buf); - return ANS_OK; +// list all threads +static const char *listthr(_U_ char *par1, _U_ char *par2){ + FNAME(); + char msg[256]; + threadlist *list = NULL; + int empty = 1; + do{ + list = nextThread(list); + if(!list) break; + snprintf(msg, 256, "thread name='%s' role='%s' ID=0x%X", list->ti.name, list->ti.handler.name, list->ti.ID); + addmesg(&ServerMessages, msg); + empty = 0; + }while(1); + if(empty) return "No threads"; + return NULL; } // register new thread @@ -96,13 +88,14 @@ static const char *sendraw(char *id, char *data){ * @return answer to client */ static const char *regthr(char *thrname, char *data){ + FNAME(); threadinfo *ti = findThreadByName(thrname); - if(ti) return "Thread exists\n"; + if(ti) return "Thread exists"; char *saveptr; char *id = strtok_r(data, " \t,;\r\n", &saveptr); if(!id) return ANS_WRONGCANID; char *role = strtok_r(NULL, " \t,;\r\n", &saveptr); - if(!role) return "No thread role\n"; + if(!role) return "No thread role"; DBG("Data='%s'; id='%s', role='%s'", data, id, role); long ID; if(str2long(data, &ID)){ @@ -110,10 +103,10 @@ static const char *regthr(char *thrname, char *data){ } DBG("Check ID"); ti = findThreadByID(ID); - if(ti) return "Thread with given ID exists\n"; + if(ti) return "Thread with given ID exists"; thread_handler *h = get_handler(role); - if(!h) return "Unknown role\n"; - if(!registerThread(thrname, ID, h->handler)) return "Can't register\n"; + if(!h) return "Unknown role"; + if(!registerThread(thrname, ID, h)) return "Can't register"; return ANS_OK; } @@ -124,6 +117,7 @@ static const char *regthr(char *thrname, char *data){ * @return answer */ static const char *unregthr(char *thrname, _U_ char *data){ + FNAME(); if(killThreadByName(thrname)) return ANS_NOTFOUND; return ANS_OK; } @@ -135,9 +129,20 @@ static const char *unregthr(char *thrname, _U_ char *data){ * @return answer */ static const char *sendmsg(char *thrname, char *data){ + FNAME(); threadinfo *ti = findThreadByName(thrname); if(!ti) return ANS_NOTFOUND; - if(!addmesg(idxMISO, &ti->mesg, data)) return ANS_CANTSEND; + if(!addmesg(&ti->commands, data)) return ANS_CANTSEND; + return ANS_OK; +} + +static const char *setspd(char *speed, _U_ char *data){ + FNAME(); + long spd; + if(str2long(speed, &spd) || spd < 1 || spd > 1000 || setCANspeed((int)spd)){ + DBG("Wrong speed: %s", speed); + return "Wrong speed"; + } return ANS_OK; } @@ -163,12 +168,6 @@ const char *processCommand(char *cmd){ for(cmditem *item = functions; item->fname; ++item){ if(0 == strcasecmp(item->fname, fname)) return item->handler(procname, data); } - return "Wrong command\n"; + return "Wrong command"; } -#if 0 -static char buf[1024]; -snprintf(buf, 1024, "FUNC=%s, PROC=%s, CMD=%s\n", fname, procname, data); -DBG("buf: %s", buf); -return buf; -#endif diff --git a/canserver/socket.c b/canserver/socket.c index 33ff998..b648359 100644 --- a/canserver/socket.c +++ b/canserver/socket.c @@ -46,7 +46,7 @@ message ServerMessages = {0}; /**************** SERVER FUNCTIONS ****************/ //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /** - * Send data over socket + * Send data over socket (and add trailing '\n' if absent) * @param sock - socket fd * @param textbuf - zero-trailing buffer with data to send * @return amount of sent bytes @@ -58,25 +58,10 @@ static size_t send_data(int sock, const char *textbuf){ LOGERR("send_data(): write() failed"); return 0; }else LOGDBG("send_data(): sent '%s'", textbuf); + if(textbuf[Len-1] != '\n') Len += write(sock, "\n", 1); return (size_t)Len; } -#if 0 -// search a first word after needle without spaces -static char* stringscan(char *str, char *needle){ - char *a;//, *e; - char *end = str + strlen(str); - a = strstr(str, needle); - if(!a) return NULL; - a += strlen(needle); - while (a < end && (*a == ' ' || *a == '\r' || *a == '\t' || *a == '\r')) a++; - if(a >= end) return NULL; -// e = strchr(a, ' '); -// if(e) *e = 0; - return a; -} -#endif - /** * @brief handle_socket - read and process data from socket * @param sock - socket fd @@ -168,7 +153,7 @@ static void *server(void *asock){ } } } // endfor - char *srvmesg = getmesg(idxMISO, &ServerMessages); // broadcast messages to all clients + char *srvmesg = getmesg(&ServerMessages); // broadcast messages to all clients if(srvmesg){ // send broadcast message to all clients or throw them to /dev/null for(int fdidx = 1; fdidx < nfd; ++fdidx){ send_data(poll_set[fdidx].fd, srvmesg); diff --git a/canserver/threadlist.c b/canserver/threadlist.c index b4ca19d..67105a7 100644 --- a/canserver/threadlist.c +++ b/canserver/threadlist.c @@ -106,10 +106,8 @@ threadinfo *findThreadByName(char *name){ */ threadinfo *findThreadByID(int ID){ if(!thelist) return NULL; // thread list is empty - DBG("Try to find thread with ID=%d", ID); threadlist *lptr = thelist; while(lptr){ - DBG("Check %d", lptr->ti.ID); if(ID == lptr->ti.ID) return &lptr->ti; lptr = lptr->next; } @@ -118,46 +116,36 @@ threadinfo *findThreadByID(int ID){ /** * @brief addmesg - add message to thread's queue - * @param idx - index (MOSI/MISO) * @param msg - message itself * @param txt - data to add * @return data added or NULL if failed */ -char *addmesg(msgidx idx, message *msg, char *txt){ - if(idx < 0 || idx >= idxNUM){ - WARNX("Wrong message index"); - return NULL; - } +char *addmesg(message *msg, char *txt){ if(!msg) return NULL; size_t L = strlen(txt); if(L < 1) return NULL; DBG("Want to add mesg '%s' with length %zd", txt, L); - if(pthread_mutex_lock(&msg->mutex[idx])) return NULL; - msglist *node = pushmessage(&msg->text[idx], txt); + if(pthread_mutex_lock(&msg->mutex)) return NULL; + msglist *node = pushmessage(&msg->text, txt); if(!node){ - pthread_mutex_unlock(&msg->mutex[idx]); + pthread_mutex_unlock(&msg->mutex); return NULL; } - pthread_mutex_unlock(&msg->mutex[idx]); + pthread_mutex_unlock(&msg->mutex); return node->data; } /** * @brief getmesg - get first message from queue (allocates data, should be free'd after usage!) - * @param idx - index (MOSI/MISO) * @param msg - message itself * @return data or NULL if empty */ -char *getmesg(msgidx idx, message *msg){ - if(idx < 0 || idx >= idxNUM){ - WARNX("Wrong message index"); - return NULL; - } +char *getmesg(message *msg){ if(!msg) return NULL; char *text = NULL; - if(pthread_mutex_lock(&msg->mutex[idx])) return NULL; - text = popmessage(&msg->text[idx]); - pthread_mutex_unlock(&msg->mutex[idx]); + if(pthread_mutex_lock(&msg->mutex)) return NULL; + text = popmessage(&msg->text); + pthread_mutex_unlock(&msg->mutex); return text; } @@ -168,7 +156,7 @@ char *getmesg(msgidx idx, message *msg){ * @param handler - thread handler * @return pointer to new threadinfo struct or NULL if failed */ -threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)){ +threadinfo *registerThread(char *name, int ID, thread_handler *handler){ if(!name || strlen(name) < 1 || !handler) return NULL; threadinfo *ti = findThreadByName(name); DBG("Register new thread with name '%s' and ID=%d", name, ID); @@ -189,13 +177,14 @@ threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)){ last->next = MALLOC(threadlist, 1); ti = &last->next->ti; } - ti->handler = handler; + memcpy(&ti->handler, handler, sizeof(thread_handler)); snprintf(ti->name, THREADNAMEMAXLEN+1, "%s", name); ti->ID = ID; - memset(&ti->mesg, 0, sizeof(ti->mesg)); - for(int i = 0; i < 2; ++i) - pthread_mutex_init(&ti->mesg.mutex[i], NULL); - if(pthread_create(&ti->thread, NULL, handler, (void*)ti)){ + memset(&ti->commands, 0, sizeof(ti->commands)); + pthread_mutex_init(&ti->commands.mutex, NULL); + memset(&ti->answers, 0, sizeof(ti->answers)); + pthread_mutex_init(&ti->answers.mutex, NULL); + if(pthread_create(&ti->thread, NULL, handler->handler, (void*)ti)){ WARN("pthread_create()"); return NULL; } @@ -221,16 +210,18 @@ int killThread(threadlist *lptr, threadlist *prev){ threadlist *next = lptr->next; if(lptr == thelist) thelist = next; else if(prev) prev->next = next; - for(int i = 0; i < 2; ++i){ - pthread_mutex_lock(&lptr->ti.mesg.mutex[i]); - char *txt; - while((txt = popmessage(&lptr->ti.mesg.text[i]))) FREE(txt); - pthread_mutex_destroy(&lptr->ti.mesg.mutex[i]); - } + char *txt; + pthread_mutex_lock(&lptr->ti.commands.mutex); + while((txt = popmessage(&lptr->ti.commands.text))) FREE(txt); + pthread_mutex_destroy(&lptr->ti.commands.mutex); + pthread_mutex_lock(&lptr->ti.answers.mutex); + while((txt = popmessage(&lptr->ti.answers.text))) FREE(txt); + pthread_mutex_destroy(&lptr->ti.answers.mutex); if(pthread_cancel(lptr->ti.thread)) WARN("Can't kill thread '%s'", lptr->ti.name); FREE(lptr); return 0; } + /** * @brief killThread - kill and unregister thread with given name * @param name - thread's name @@ -249,98 +240,14 @@ int killThreadByName(const char *name){ return 2; // not found } -#if 0 -static void *handler(void *data){ - threadinfo *ti = (threadinfo*)data; - while(1){ - char *got = getmesg(idxMOSI, &ti->mesg); - if(got){ - green("%s got: %s\n", ti->name, got); - FREE(got); - addmesg(idxMISO, &ti->mesg, "received"); - addmesg(idxMISO, &ti->mesg, "need more"); - } - usleep(100); - } - return NULL; +/** + * @brief nextThread - get next thread in `thelist` + * @param curr - pointer to previous thread or NULL for `thelist` + * @return pointer to next thread in list (or NULL if absent) + */ +threadlist *nextThread(threadlist *curr){ + if(!curr) return thelist; + return curr->next; } -static void dividemessages(message *msg, char *longtext){ - char *copy = strdup(longtext), *saveptr = NULL; - for(char *s = copy; ; s = NULL){ - char *nxt = strtok_r(s, " ", &saveptr); - if(!nxt) break; - addmesg(idxMOSI, msg, nxt); - } - FREE(copy); -} - -static void procmesg(char *text){ - if(!text) return; - char *nxt = strchr(text, ' '); - if(!nxt){ - WARNX("Usage: cmd data, where cmd:\n" - "\tnew threadname - create thread\n" - "\tdel threadname - delete thread\n" - "\tsend threadname data - send data to thread\n" - "\tsend all data - send data to all\n"); - return; - } - *nxt++ = 0; - if(strcasecmp(text, "new") == 0){ - registerThread(nxt, handler); - }else if(strcasecmp(text, "del") == 0){ - if(killThread(nxt)) WARNX("Can't delete '%s'", nxt); - }else if(strcasecmp(text, "send") == 0){ - text = strchr(nxt, ' '); - if(!text){ - WARNX("send all/threadname data"); - return; - } - *text++ = 0; - if(strcasecmp(nxt, "all") == 0){ // bcast - threadlist *lptr = thelist; - while(lptr){ - threadinfo *ti = &lptr->ti; - lptr = lptr->next; - green("Bcast send '%s' to thread '%s'\n", text, ti->name); - dividemessages(&ti->mesg, text); - } - }else{ // single - threadinfo *ti = findthread(nxt); - if(!ti){ - WARNX("Thread '%s' not found", nxt); - return; - } - green("Send '%s' to thread '%s'\n", text, nxt); - dividemessages(&ti->mesg, text); - } - } -} - -int main(){ - using_history(); - while(1){ - threadlist *lptr = thelist; - while(lptr){ - threadinfo *ti = &lptr->ti; - lptr = lptr->next; - char *got; - while((got = getmesg(idxMISO, &ti->mesg))){ - red("got from '%s': %s\n", ti->name, got); - fflush(stdout); - FREE(got); - } - } - char *text = readline("mesg > "); - if(!text) break; // ^D - if(strlen(text) < 1) continue; - add_history(text); - procmesg(text); - FREE(text); - } - return 0; -} - -#endif diff --git a/canserver/threadlist.h b/canserver/threadlist.h index bd062c1..0510374 100644 --- a/canserver/threadlist.h +++ b/canserver/threadlist.h @@ -31,26 +31,26 @@ typedef struct msglist_{ struct msglist_ *next, *last; // other elements of list } msglist; -// for all threads MASTER is the thread itself, slaves are all others -typedef enum{ - idxMOSI = 0, // master out, slave in - idxMISO = 1, // master in, slave out - idxNUM = 2 // amount of indexes -} msgidx; - // interthread messages; index 0 - MOSI, index 1 - MISO typedef struct{ - msglist *text[idxNUM]; // stringified text messages - pthread_mutex_t mutex[idxNUM]; // text changing mutex + msglist *text; // stringified text messages + pthread_mutex_t mutex; // text changing mutex } message; +// name - handler pair for threads registering functions +typedef struct{ + const char *name; // handler name + void *(*handler)(void *); // handler function +} thread_handler; + // thread information typedef struct{ char name[THREADNAMEMAXLEN+1]; // thread name int ID; // numeric ID (canopen ID) - message mesg; // inter-thread messages + message commands; // commands from clients + message answers; // answers from CANserver (raw messages to given ID) pthread_t thread; // thread descriptor - void *(*handler)(void *); // handler function + thread_handler handler; // handler name & function } threadinfo; // list of threads member @@ -59,17 +59,12 @@ typedef struct thread_list_{ struct thread_list_ *next; // next element } threadlist; -// name - handler pair for threads registering functions -typedef struct{ - const char *name; // handler name - void *(*handler)(void *); // handler function -} thread_handler; - threadinfo *findThreadByName(char *name); threadinfo *findThreadByID(int ID); -threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)); +threadinfo *registerThread(char *name, int ID, thread_handler *handler); +threadlist *nextThread(threadlist *curr); int killThreadByName(const char *name); -char *getmesg(msgidx idx, message *msg); -char *addmesg(msgidx idx, message *msg, char *txt); +char *getmesg(message *msg); +char *addmesg(message *msg, char *txt); #endif // THREADLIST_H__