add raw can & canopen commands

This commit is contained in:
Edward Emelianov 2020-11-17 19:01:49 +03:00
parent 3dc2627c64
commit 1f6be7644f
9 changed files with 362 additions and 284 deletions

View File

@ -22,6 +22,7 @@
#include <sys/select.h>
#include <usefull_macros.h>
#include "aux.h"
#include "canbus.h"
#ifndef BUFLEN
@ -43,6 +44,7 @@ This file should provide next functions:
static TTY_descr *dev = NULL; // shoul be global to restore if die
static int serialspeed = 115200; // speed to open serial device
static int disconnected = 1; // ==1 if disconnected
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static char *read_string();
@ -55,6 +57,7 @@ static char *read_string();
*/
static int read_ttyX(TTY_descr *d){
if(!d || d->comfd < 0) return -1;
if(disconnected) return -1;
size_t L = 0;
ssize_t l;
size_t length = d->bufsz;
@ -71,6 +74,8 @@ static int read_ttyX(TTY_descr *d){
if (!retval) break;
if(FD_ISSET(d->comfd, &rfds)){
if((l = read(d->comfd, ptr, length)) < 1){
WARN("TTY disconnected");
disconnected = 1;
return -1; // disconnect or other error - close TTY & die
}
ptr += l; L += l;
@ -86,21 +91,18 @@ static int read_ttyX(TTY_descr *d){
// thread-safe writing, add trailing '\n'
static int ttyWR(const char *buff, int len){
FNAME();
if(disconnected) return 1;
pthread_mutex_lock(&mutex);
//canbus_clear();
read_string(); // clear RX buffer
DBG("Write 2tty %d bytes: ", len);
#ifdef EBUG
int _U_ n = write(STDERR_FILENO, buff, len);
fprintf(stderr, "\n");
double t0 = dtime();
#endif
int w = write_tty(dev->comfd, buff, (size_t)len);
if(!w) w = write_tty(dev->comfd, "\n", 1);
DBG("Written, dt=%g", dtime() - t0);
int errctr = 0;
while(1){
char *s = read_string(); // clear echo & check
if(disconnected){
w = 1; break;
}
if(!s || strncmp(s, buff, strlen(buff)) != 0){
if(++errctr > 3){
WARNX("wrong answer! Got '%s' instead of '%s'", s, buff);
@ -110,12 +112,12 @@ static int ttyWR(const char *buff, int len){
}else break;
}
pthread_mutex_unlock(&mutex);
DBG("Success, dt=%g", dtime() - t0);
return w;
}
void canbus_close(){
if(dev) close_tty(&dev);
disconnected = 1;
}
void setserialspeed(int speed){
@ -123,7 +125,7 @@ void setserialspeed(int speed){
}
void canbus_clear(){
while(read_ttyX(dev));
while(read_ttyX(dev) > 0);
}
int canbus_open(const char *devname){
@ -131,6 +133,7 @@ int canbus_open(const char *devname){
WARNX("canbus_open(): need device name");
return 1;
}
disconnected = 1;
if(dev) close_tty(&dev);
dev = new_tty((char*)devname, serialspeed, BUFLEN);
if(dev){
@ -140,10 +143,12 @@ int canbus_open(const char *devname){
if(!dev){
return 1;
}
disconnected = 0;
return 0;
}
int canbus_setspeed(int speed){
if(disconnected) return 1;
if(speed == 0) return 0; // default - not change
char buff[BUFLEN];
if(speed < 10 || speed > 3000){
@ -153,12 +158,18 @@ int canbus_setspeed(int speed){
int len = snprintf(buff, BUFLEN, "b %d", speed);
if(len < 1) return 2;
int r = ttyWR(buff, len);
read_string(); // clear RX buf ('Reinit CAN bus with speed XXXXkbps')
canbus_clear();
return r;
}
/**
* @brief canbus_write - write message to CAN bus
* @param mesg - raw message
* @return 0 if all OK
*/
int canbus_write(CANmesg *mesg){
FNAME();
if(disconnected) return 1;
char buf[BUFLEN];
if(!mesg || mesg->len > 8) return 1;
int rem = BUFLEN, len = 0;
@ -178,6 +189,7 @@ int canbus_write(CANmesg *mesg){
* @return NULL if nothing was read or pointer to static buffer
*/
static char *read_string(){
if(disconnected) return NULL;
static char buf[1024];
int LL = 1023, r = 0, l;
char *ptr = NULL;
@ -196,7 +208,9 @@ static char *read_string(){
do{
if((l = read_ttyX(dev))){
if(l < 0){
ERR("tty disconnected");
LOGERR("Tty disconnected");
disconnected = 1;
return NULL;
}
if(l > LL){ // buffer overflow
WARNX("read_string(): buffer overflow");
@ -206,7 +220,6 @@ static char *read_string(){
memcpy(ptr, dev->buf, dev->buflen);
r += l; LL -= l; ptr += l;
if(ptr[-1] == '\n'){
//DBG("Newline detected");
break;
}
d0 = dtime();
@ -220,16 +233,20 @@ static char *read_string(){
++optr;
}else{
WARNX("read_string(): no newline found");
DBG("buf: %s", buf);
optr = NULL;
return NULL;
}
DBG("buf: %s, time: %g", buf, dtime() - d0);
return buf;
}
return NULL;
}
/**
* @brief parseCANmesg - message parser
* @param str - string from terminal: time #ID [data]
* @return NULL if error or pointer to static structure
* Not thread safe!!!
*/
CANmesg *parseCANmesg(const char *str){
static CANmesg m;
int l = sscanf(str, "%d #0x%hx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx 0x%hhx", &m.timemark, &m.ID,
@ -239,40 +256,32 @@ CANmesg *parseCANmesg(const char *str){
return &m;
}
#ifdef EBUG
void showM(CANmesg *m){
printf("TS=%d, ID=0x%X", m->timemark, m->ID);
int l = m->len;
if(l) printf(", data=");
for(int i = 0; i < l; ++i) printf(" 0x%02X", m->data[i]);
printf("\n");
}
#endif
/**
* @brief canbus_read - read any message from CAN bus
* @param mesg - pointer to message
* @return 0 if all OK
*/
int canbus_read(CANmesg *mesg){
if(!mesg) return 1;
if(disconnected) return 1;
pthread_mutex_lock(&mutex);
double t0 = dtime();
int ID = mesg->ID;
char *ans;
CANmesg *m;
double t0 = dtime();
while(dtime() - t0 < T_POLLING_TMOUT){ // read answer
if((ans = read_string())){ // parse new data
if((m = parseCANmesg(ans))){
DBG("Got canbus message (dT=%g):", dtime() - t0);
#ifdef EBUG
showM(m);
#endif
if(ID && m->ID == ID){
memcpy(mesg, m, sizeof(CANmesg));
DBG("All OK");
pthread_mutex_unlock(&mutex);
return 0;
}
memcpy(mesg, m, sizeof(CANmesg));
pthread_mutex_unlock(&mutex);
return 0;
}
}
if(disconnected) break;
}
pthread_mutex_unlock(&mutex);
return 1;
}
int canbus_disconnected(){
return disconnected;
}

View File

@ -22,7 +22,7 @@
#include <stdint.h>
#ifndef T_POLLING_TMOUT
#define T_POLLING_TMOUT (0.5)
#define T_POLLING_TMOUT (0.01)
#endif
typedef struct{
@ -44,5 +44,6 @@ void canbus_clear();
void setserialspeed(int speed);
void showM(CANmesg *m);
CANmesg *parseCANmesg(const char *str);
int canbus_disconnected();
#endif // CANBUS_H__

View File

@ -110,12 +110,6 @@ static CANmesg *mkMesg(SDO *sdo){
mesg.data[1] = sdo->index & 0xff; // l
mesg.data[2] = (sdo->index >> 8) & 0xff; // h
mesg.data[3] = sdo->subindex;
#if 0
FNAME();
green("Make message to 0x%X: ", mesg.ID);
for(uint8_t i = 0; i < 8; ++i) printf("0x%02X ", mesg.data[i]);
printf("\n");
#endif
return &mesg;
}

View File

@ -31,13 +31,22 @@
#include <unistd.h> // usleep
#include <usefull_macros.h>
static int CANspeed = 0; // default speed, if !=0 set it when connected
// all messages are in format "ID [data]"
message CANbusMessages = {0}; // CANserver thread is master
// basic threads
// messages: master - thread, slave - caller
static void *stpemulator(void *arg);
static void *rawcommands(void *arg);
static void *canopencmds(void *arg);
// handlers for standard types
thread_handler CANhandlers[] = {
{"emulation", stpemulator},
{"raw", rawcommands},
{"canopen", canopencmds},
{NULL, NULL}
};
@ -49,66 +58,244 @@ thread_handler *get_handler(const char *name){
return NULL;
}
/**
* @brief parsePacket - convert text into can packet data
* @param packet (o) - pointer to CANpacket or NULL (just to check)
* @param data - text in format "ID [byte0 ... byteN]"
* @return 0 if all OK
*/
static int parsePacket(CANmesg *packet, char *data){
if(!data || *data == 0){ // no data
return 1;
}
long info[10]={0};
int N = 0;
char *saveptr = NULL;
for(char *s = data; N < 10; s = NULL, ++N){
char *nxt = strtok_r(s, " \t,;\r\n", &saveptr);
if(!nxt) break;
if(str2long(nxt, &info[N])) break;
}
if(N > 9 || N == 0) return 1; // ID + >8 data bytes or no at all
if(packet){
packet->ID = info[0];
packet->len = N - 1;
}
for(int i = 1; i < N; ++i){
if(info[i] < 0 || info[i] > 0xff) return 2;
if(packet) packet->data[i-1] = (uint8_t) info[i];
}
return 0;
}
// [re]open serial device
static void reopen_device(){
char *devname = NULL;
double t0 = dtime();
canbus_close();
DBG("Try to [re]open serial device");
while(dtime() - t0 < 5.){
if((devname = find_device())) break;
usleep(1000);
}
if(!devname || canbus_open(devname)){
FREE(devname);
LOGERR("Can't find serial device");
ERRX("Can't find serial device");
}else {DBG("Opened device: %s", devname);}
if(CANspeed){ // set default speed
canbus_clear();
canbus_setspeed(CANspeed);
canbus_clear();
}
FREE(devname);
}
// do something with can message: send to receiver
static void processCANmessage(CANmesg *mesg){
threadinfo *ti = findThreadByID(mesg->ID);
if(!ti) return;
DBG("Found");
char buf[64], *ptr = buf;
int l = 64, x;
x = snprintf(ptr, l, "#0x%03X ", mesg->ID);
l -= x; ptr += x;
for(int i = 0; i < mesg->len; ++i){
x = snprintf(ptr, l, "0x%02X ", mesg->data[i]);
l -= x; ptr += x;
}
addmesg(&ti->answers, buf);
}
/**
* @brief CANserver - main CAN thread; receive/transmit raw messages by CANbusMessages
* @param data - unused
* @return unused
*/
void *CANserver(_U_ void *data){
char *devname = find_device();
if(!devname){
LOGERR("Can't find serial device");
ERRX("Can't find serial device");
}
reopen_device();
while(1){
int fd = open(devname, O_RDONLY);
if(fd == -1){
WARN("open()");
LOGWARN("Device %s is absent", devname);
FREE(devname);
double t0 = dtime();
while(dtime() - t0 < 5.){
if((devname = find_device())) break;
usleep(1000);
}
if(!devname){
LOGERR("Can't open serial device, kill myself");
ERRX("Can't open device, kill myself");
}else LOGMSG("Change device to %s", devname);
}else close(fd);
char *mesg = getmesg(idxMISO, &CANbusMessages);
CANmesg cm;
char *mesg = getmesg(&CANbusMessages);
if(mesg){
DBG("Received message: %s", mesg);
if(parsePacket(&cm, mesg)){
LOGMSG("Received wrong CAN message: %s", mesg);
DBG("Bad message: %s", mesg);
}else{
if(canbus_write(&cm)){
LOGWARN("Can't write to CANbus, try to reopen");
WARNX("Can't write to canbus");
if(canbus_disconnected()) reopen_device();
}
}
FREE(mesg);
// global messages to all clients:
addmesg(idxMISO, &ServerMessages, "CANserver works\n");
}
usleep(1000);
if(!canbus_read(&cm)){ // got raw message from CAN bus - parce it
DBG("Got CAN message from %d, len: %d", cm.ID, cm.len);
// send raw message to 0
threadinfo *ti = findThreadByID(0);
if(ti){
char buf[64], *ptr = buf;
int l = 64, x;
x = snprintf(ptr, l, "#0x%03X ", cm.ID);
l -= x; ptr += x;
for(int i = 0; i < cm.len; ++i){
x = snprintf(ptr, l, "0x%02X ", cm.data[i]);
l -= x; ptr += x;
}
addmesg(&ti->answers, buf);
}
processCANmessage(&cm);
}else if(canbus_disconnected()) reopen_device();
}
LOGERR("CANserver(): UNREACHABLE CODE REACHED!");
return NULL;
}
/**
* @brief stpemulator - stepper motor emulator
* @param arg - threadinfo
* @return unused
*/
static void *stpemulator(void *arg){
threadinfo *ti = (threadinfo*)arg;
while(1){
char *mesg = getmesg(idxMISO, &ti->mesg);
char *mesg = getmesg(&ti->commands);
if(mesg){
DBG("Stepper emulator got: %s", mesg);
addmesg(idxMISO, &ServerMessages, mesg);
addmesg(&ServerMessages, mesg);
/* do something */
FREE(mesg);
}
int r100 = rand() % 10000;
if(r100 < 20){ // 20% of probability
addmesg(idxMISO, &ServerMessages, "stpemulator works fine!\n");
addmesg(&ServerMessages, "stpemulator works fine!");
}
if(r100 > 9998){
addmesg(idxMISO, &ServerMessages, "O that's good!\n");
addmesg(&ServerMessages, "O that's good!");
}
usleep(1000);
}
LOGERR("stpemulator(): UNREACHABLE CODE REACHED!");
return NULL;
}
/**
* @brief rawcommands - send/receive raw commands
* @param arg - threadinfo
* @return unused
* message format: ID [data]; ID - receiver ID (raw), data - 0..8 bytes of data
* ID == 0 receive everything!
*/
static void *rawcommands(void *arg){
threadinfo *ti = (threadinfo*)arg;
while(1){
char *mesg = getmesg(&ti->commands);
if(mesg){
DBG("Got raw command: %s", mesg);
addmesg(&CANbusMessages, mesg);
FREE(mesg);
}
mesg = getmesg(&ti->answers);
if(mesg){ // got raw answer from bus to thread ID, send it to all
addmesg(&ServerMessages, mesg);
FREE(mesg);
}
usleep(1000);
}
LOGERR("rawcommands(): UNREACHABLE CODE REACHED!");
return NULL;
}
// make string for CAN message from command message (NodeID index subindex [data] -> ID data)
static void sendSDO(char *mesg){
long info[8] = {0}; // 0 - NodeID, 1 - index, 2 - subindex, 3..6 - data[0..4]
int N = 0;
char *saveptr = NULL;
for(char *s = mesg; N < 8; s = NULL, ++N){
char *nxt = strtok_r(s, " \t,;\r\n", &saveptr);
if(!nxt) break;
if(str2long(nxt, &info[N])) break;
}
if(N > 7 || N < 3){
WARNX("Got bad CANopen command");
LOGMSG("Got bad CANopen command");
return;
}
DBG("User's message have %d ints", N);
uint8_t data[8] = {0}, datalen = (uint8_t) N - 3;
data[0] = SDO_CCS(CCS_INIT_DOWNLOAD);
if(datalen){ // there's data
data[0] |= SDO_N(datalen) | SDO_E | SDO_S;
for(int i = 0; i < datalen; ++i) data[4+i] = (uint8_t)(info[3+i]);
}
data[1] = info[1] & 0xff;
data[2] = (info[1] >> 8) & 0xff;
data[3] = (uint8_t)(info[2]);
char buf[64], *ptr = buf;
int l = 64, x;
x = snprintf(ptr, l, "0x%03X ", (uint16_t)(RSDO_COBID + info[0]));
l -= x; ptr += x;
for(int i = 0; i < 8; ++i){
x = snprintf(ptr, l, "0x%02X ", (uint16_t)(data[i]));
l -= x; ptr += x;
}
addmesg(&CANbusMessages, buf);
}
// send raw CANopen commands
// message format: NodeID index subindex [data]
static void *canopencmds(void *arg){
threadinfo *ti = (threadinfo*)arg;
while(1){
char *mesg = getmesg(&ti->commands);
if(mesg) do{
DBG("Got CANopen command: %s", mesg);
sendSDO(mesg);
FREE(mesg);
}while(0);
mesg = getmesg(&ti->answers);
if(mesg){ // got raw answer from bus to thread ID, analize it
addmesg(&ServerMessages, mesg);
FREE(mesg);
}
usleep(1000);
}
LOGERR("rawcommands(): UNREACHABLE CODE REACHED!");
return NULL;
}
/**
* @brief setCANspeed - set new speed of CANbus
* @param speed - speed in kbaud
* @return 0 if all OK
*/
int setCANspeed(int speed){
if(canbus_setspeed(speed)) return 1;
CANspeed = speed;
return 0;
}

View File

@ -27,5 +27,6 @@ extern thread_handler CANhandlers[];
void *CANserver(void *data);
thread_handler *get_handler(const char *name);
int setCANspeed(int speed);
#endif // PROCESSMOTORS_H__

View File

@ -20,6 +20,7 @@
#include "cmdlnopts.h"
#include "processmotors.h"
#include "proto.h"
#include "socket.h"
#include "threadlist.h"
#include <stdio.h>
@ -27,15 +28,16 @@
#include <usefull_macros.h>
// standard answers of processCommand
static const char *ANS_OK = "OK\n";
static const char *ANS_WRONGCANID = "Wrong CANID\n";
static const char *ANS_NOTFOUND = "Thread not found\n";
static const char *ANS_CANTSEND = "Can't send message\n";
static const char *ANS_OK = "OK";
static const char *ANS_WRONGCANID = "Wrong CANID";
static const char *ANS_NOTFOUND = "Thread not found";
static const char *ANS_CANTSEND = "Can't send message";
static const char *sendraw(char *id, char *data);
static const char *listthr(_U_ char *par1, _U_ char *par2);
static const char *regthr(char *thrname, char *data);
static const char *unregthr(char *thrname, char *data);
static const char *sendmsg(char *thrname, char *data);
static const char *setspd(char *speed, _U_ char *data);
/*
* Commands format:
@ -53,39 +55,29 @@ typedef struct{
// array with known functions
static cmditem functions[] = {
{"raw", sendraw},
{"register", regthr},
{"unregister", unregthr},
{"mesg", sendmsg},
{"list", listthr}, // list threads
{"mesg", sendmsg}, // "mesg NAME ID [data]"
{"register", regthr}, // "register NAME ID", ID - RAW CAN ID (not canopen ID)!!!
{"speed", setspd}, // set CANbus speed
{"unregister", unregthr}, // "unregister NAME"
{NULL, NULL}
};
/**
* @brief sendraw - send raw data to CANbus
* @param id - CANid (in string format)
* @param data - data to send (delimeters are: space, tab, comma or semicolon)
* WARNING! parameter `data` will be broken after this function
* id & data can be decimal, hexadecimal or octal
* @return answer to client
*/
static const char *sendraw(char *id, char *data){
char buf[128], *s, *saveptr;
if(!id) return "Need CAN ID\n";
long ID, info[9]={0};
int i;
if(str2long(id, &ID)){
return ANS_WRONGCANID;
}
for(s = data, i = 0; i < 9; s = NULL, ++i){
char *nxt = strtok_r(s, " \t,;\r\n", &saveptr);
if(!nxt) break;
if(str2long(nxt, &info[i])) break;
}
if(i > 8) return "Not more than 8 data bytes\n";
snprintf(buf, 128, "ID=%ld, datalen=%d, data={%ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld}\n",
ID, i, info[0], info[1], info[2], info[3], info[4], info[5], info[6], info[7]);
addmesg(idxMISO, &CANbusMessages, buf);
return ANS_OK;
// list all threads
static const char *listthr(_U_ char *par1, _U_ char *par2){
FNAME();
char msg[256];
threadlist *list = NULL;
int empty = 1;
do{
list = nextThread(list);
if(!list) break;
snprintf(msg, 256, "thread name='%s' role='%s' ID=0x%X", list->ti.name, list->ti.handler.name, list->ti.ID);
addmesg(&ServerMessages, msg);
empty = 0;
}while(1);
if(empty) return "No threads";
return NULL;
}
// register new thread
@ -96,13 +88,14 @@ static const char *sendraw(char *id, char *data){
* @return answer to client
*/
static const char *regthr(char *thrname, char *data){
FNAME();
threadinfo *ti = findThreadByName(thrname);
if(ti) return "Thread exists\n";
if(ti) return "Thread exists";
char *saveptr;
char *id = strtok_r(data, " \t,;\r\n", &saveptr);
if(!id) return ANS_WRONGCANID;
char *role = strtok_r(NULL, " \t,;\r\n", &saveptr);
if(!role) return "No thread role\n";
if(!role) return "No thread role";
DBG("Data='%s'; id='%s', role='%s'", data, id, role);
long ID;
if(str2long(data, &ID)){
@ -110,10 +103,10 @@ static const char *regthr(char *thrname, char *data){
}
DBG("Check ID");
ti = findThreadByID(ID);
if(ti) return "Thread with given ID exists\n";
if(ti) return "Thread with given ID exists";
thread_handler *h = get_handler(role);
if(!h) return "Unknown role\n";
if(!registerThread(thrname, ID, h->handler)) return "Can't register\n";
if(!h) return "Unknown role";
if(!registerThread(thrname, ID, h)) return "Can't register";
return ANS_OK;
}
@ -124,6 +117,7 @@ static const char *regthr(char *thrname, char *data){
* @return answer
*/
static const char *unregthr(char *thrname, _U_ char *data){
FNAME();
if(killThreadByName(thrname)) return ANS_NOTFOUND;
return ANS_OK;
}
@ -135,9 +129,20 @@ static const char *unregthr(char *thrname, _U_ char *data){
* @return answer
*/
static const char *sendmsg(char *thrname, char *data){
FNAME();
threadinfo *ti = findThreadByName(thrname);
if(!ti) return ANS_NOTFOUND;
if(!addmesg(idxMISO, &ti->mesg, data)) return ANS_CANTSEND;
if(!addmesg(&ti->commands, data)) return ANS_CANTSEND;
return ANS_OK;
}
static const char *setspd(char *speed, _U_ char *data){
FNAME();
long spd;
if(str2long(speed, &spd) || spd < 1 || spd > 1000 || setCANspeed((int)spd)){
DBG("Wrong speed: %s", speed);
return "Wrong speed";
}
return ANS_OK;
}
@ -163,12 +168,6 @@ const char *processCommand(char *cmd){
for(cmditem *item = functions; item->fname; ++item){
if(0 == strcasecmp(item->fname, fname)) return item->handler(procname, data);
}
return "Wrong command\n";
return "Wrong command";
}
#if 0
static char buf[1024];
snprintf(buf, 1024, "FUNC=%s, PROC=%s, CMD=%s\n", fname, procname, data);
DBG("buf: %s", buf);
return buf;
#endif

View File

@ -46,7 +46,7 @@ message ServerMessages = {0};
/**************** SERVER FUNCTIONS ****************/
//pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
/**
* Send data over socket
* Send data over socket (and add trailing '\n' if absent)
* @param sock - socket fd
* @param textbuf - zero-trailing buffer with data to send
* @return amount of sent bytes
@ -58,25 +58,10 @@ static size_t send_data(int sock, const char *textbuf){
LOGERR("send_data(): write() failed");
return 0;
}else LOGDBG("send_data(): sent '%s'", textbuf);
if(textbuf[Len-1] != '\n') Len += write(sock, "\n", 1);
return (size_t)Len;
}
#if 0
// search a first word after needle without spaces
static char* stringscan(char *str, char *needle){
char *a;//, *e;
char *end = str + strlen(str);
a = strstr(str, needle);
if(!a) return NULL;
a += strlen(needle);
while (a < end && (*a == ' ' || *a == '\r' || *a == '\t' || *a == '\r')) a++;
if(a >= end) return NULL;
// e = strchr(a, ' ');
// if(e) *e = 0;
return a;
}
#endif
/**
* @brief handle_socket - read and process data from socket
* @param sock - socket fd
@ -168,7 +153,7 @@ static void *server(void *asock){
}
}
} // endfor
char *srvmesg = getmesg(idxMISO, &ServerMessages); // broadcast messages to all clients
char *srvmesg = getmesg(&ServerMessages); // broadcast messages to all clients
if(srvmesg){ // send broadcast message to all clients or throw them to /dev/null
for(int fdidx = 1; fdidx < nfd; ++fdidx){
send_data(poll_set[fdidx].fd, srvmesg);

View File

@ -106,10 +106,8 @@ threadinfo *findThreadByName(char *name){
*/
threadinfo *findThreadByID(int ID){
if(!thelist) return NULL; // thread list is empty
DBG("Try to find thread with ID=%d", ID);
threadlist *lptr = thelist;
while(lptr){
DBG("Check %d", lptr->ti.ID);
if(ID == lptr->ti.ID) return &lptr->ti;
lptr = lptr->next;
}
@ -118,46 +116,36 @@ threadinfo *findThreadByID(int ID){
/**
* @brief addmesg - add message to thread's queue
* @param idx - index (MOSI/MISO)
* @param msg - message itself
* @param txt - data to add
* @return data added or NULL if failed
*/
char *addmesg(msgidx idx, message *msg, char *txt){
if(idx < 0 || idx >= idxNUM){
WARNX("Wrong message index");
return NULL;
}
char *addmesg(message *msg, char *txt){
if(!msg) return NULL;
size_t L = strlen(txt);
if(L < 1) return NULL;
DBG("Want to add mesg '%s' with length %zd", txt, L);
if(pthread_mutex_lock(&msg->mutex[idx])) return NULL;
msglist *node = pushmessage(&msg->text[idx], txt);
if(pthread_mutex_lock(&msg->mutex)) return NULL;
msglist *node = pushmessage(&msg->text, txt);
if(!node){
pthread_mutex_unlock(&msg->mutex[idx]);
pthread_mutex_unlock(&msg->mutex);
return NULL;
}
pthread_mutex_unlock(&msg->mutex[idx]);
pthread_mutex_unlock(&msg->mutex);
return node->data;
}
/**
* @brief getmesg - get first message from queue (allocates data, should be free'd after usage!)
* @param idx - index (MOSI/MISO)
* @param msg - message itself
* @return data or NULL if empty
*/
char *getmesg(msgidx idx, message *msg){
if(idx < 0 || idx >= idxNUM){
WARNX("Wrong message index");
return NULL;
}
char *getmesg(message *msg){
if(!msg) return NULL;
char *text = NULL;
if(pthread_mutex_lock(&msg->mutex[idx])) return NULL;
text = popmessage(&msg->text[idx]);
pthread_mutex_unlock(&msg->mutex[idx]);
if(pthread_mutex_lock(&msg->mutex)) return NULL;
text = popmessage(&msg->text);
pthread_mutex_unlock(&msg->mutex);
return text;
}
@ -168,7 +156,7 @@ char *getmesg(msgidx idx, message *msg){
* @param handler - thread handler
* @return pointer to new threadinfo struct or NULL if failed
*/
threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)){
threadinfo *registerThread(char *name, int ID, thread_handler *handler){
if(!name || strlen(name) < 1 || !handler) return NULL;
threadinfo *ti = findThreadByName(name);
DBG("Register new thread with name '%s' and ID=%d", name, ID);
@ -189,13 +177,14 @@ threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)){
last->next = MALLOC(threadlist, 1);
ti = &last->next->ti;
}
ti->handler = handler;
memcpy(&ti->handler, handler, sizeof(thread_handler));
snprintf(ti->name, THREADNAMEMAXLEN+1, "%s", name);
ti->ID = ID;
memset(&ti->mesg, 0, sizeof(ti->mesg));
for(int i = 0; i < 2; ++i)
pthread_mutex_init(&ti->mesg.mutex[i], NULL);
if(pthread_create(&ti->thread, NULL, handler, (void*)ti)){
memset(&ti->commands, 0, sizeof(ti->commands));
pthread_mutex_init(&ti->commands.mutex, NULL);
memset(&ti->answers, 0, sizeof(ti->answers));
pthread_mutex_init(&ti->answers.mutex, NULL);
if(pthread_create(&ti->thread, NULL, handler->handler, (void*)ti)){
WARN("pthread_create()");
return NULL;
}
@ -221,16 +210,18 @@ int killThread(threadlist *lptr, threadlist *prev){
threadlist *next = lptr->next;
if(lptr == thelist) thelist = next;
else if(prev) prev->next = next;
for(int i = 0; i < 2; ++i){
pthread_mutex_lock(&lptr->ti.mesg.mutex[i]);
char *txt;
while((txt = popmessage(&lptr->ti.mesg.text[i]))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.mesg.mutex[i]);
}
char *txt;
pthread_mutex_lock(&lptr->ti.commands.mutex);
while((txt = popmessage(&lptr->ti.commands.text))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.commands.mutex);
pthread_mutex_lock(&lptr->ti.answers.mutex);
while((txt = popmessage(&lptr->ti.answers.text))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.answers.mutex);
if(pthread_cancel(lptr->ti.thread)) WARN("Can't kill thread '%s'", lptr->ti.name);
FREE(lptr);
return 0;
}
/**
* @brief killThread - kill and unregister thread with given name
* @param name - thread's name
@ -249,98 +240,14 @@ int killThreadByName(const char *name){
return 2; // not found
}
#if 0
static void *handler(void *data){
threadinfo *ti = (threadinfo*)data;
while(1){
char *got = getmesg(idxMOSI, &ti->mesg);
if(got){
green("%s got: %s\n", ti->name, got);
FREE(got);
addmesg(idxMISO, &ti->mesg, "received");
addmesg(idxMISO, &ti->mesg, "need more");
}
usleep(100);
}
return NULL;
/**
* @brief nextThread - get next thread in `thelist`
* @param curr - pointer to previous thread or NULL for `thelist`
* @return pointer to next thread in list (or NULL if absent)
*/
threadlist *nextThread(threadlist *curr){
if(!curr) return thelist;
return curr->next;
}
static void dividemessages(message *msg, char *longtext){
char *copy = strdup(longtext), *saveptr = NULL;
for(char *s = copy; ; s = NULL){
char *nxt = strtok_r(s, " ", &saveptr);
if(!nxt) break;
addmesg(idxMOSI, msg, nxt);
}
FREE(copy);
}
static void procmesg(char *text){
if(!text) return;
char *nxt = strchr(text, ' ');
if(!nxt){
WARNX("Usage: cmd data, where cmd:\n"
"\tnew threadname - create thread\n"
"\tdel threadname - delete thread\n"
"\tsend threadname data - send data to thread\n"
"\tsend all data - send data to all\n");
return;
}
*nxt++ = 0;
if(strcasecmp(text, "new") == 0){
registerThread(nxt, handler);
}else if(strcasecmp(text, "del") == 0){
if(killThread(nxt)) WARNX("Can't delete '%s'", nxt);
}else if(strcasecmp(text, "send") == 0){
text = strchr(nxt, ' ');
if(!text){
WARNX("send all/threadname data");
return;
}
*text++ = 0;
if(strcasecmp(nxt, "all") == 0){ // bcast
threadlist *lptr = thelist;
while(lptr){
threadinfo *ti = &lptr->ti;
lptr = lptr->next;
green("Bcast send '%s' to thread '%s'\n", text, ti->name);
dividemessages(&ti->mesg, text);
}
}else{ // single
threadinfo *ti = findthread(nxt);
if(!ti){
WARNX("Thread '%s' not found", nxt);
return;
}
green("Send '%s' to thread '%s'\n", text, nxt);
dividemessages(&ti->mesg, text);
}
}
}
int main(){
using_history();
while(1){
threadlist *lptr = thelist;
while(lptr){
threadinfo *ti = &lptr->ti;
lptr = lptr->next;
char *got;
while((got = getmesg(idxMISO, &ti->mesg))){
red("got from '%s': %s\n", ti->name, got);
fflush(stdout);
FREE(got);
}
}
char *text = readline("mesg > ");
if(!text) break; // ^D
if(strlen(text) < 1) continue;
add_history(text);
procmesg(text);
FREE(text);
}
return 0;
}
#endif

View File

@ -31,26 +31,26 @@ typedef struct msglist_{
struct msglist_ *next, *last; // other elements of list
} msglist;
// for all threads MASTER is the thread itself, slaves are all others
typedef enum{
idxMOSI = 0, // master out, slave in
idxMISO = 1, // master in, slave out
idxNUM = 2 // amount of indexes
} msgidx;
// interthread messages; index 0 - MOSI, index 1 - MISO
typedef struct{
msglist *text[idxNUM]; // stringified text messages
pthread_mutex_t mutex[idxNUM]; // text changing mutex
msglist *text; // stringified text messages
pthread_mutex_t mutex; // text changing mutex
} message;
// name - handler pair for threads registering functions
typedef struct{
const char *name; // handler name
void *(*handler)(void *); // handler function
} thread_handler;
// thread information
typedef struct{
char name[THREADNAMEMAXLEN+1]; // thread name
int ID; // numeric ID (canopen ID)
message mesg; // inter-thread messages
message commands; // commands from clients
message answers; // answers from CANserver (raw messages to given ID)
pthread_t thread; // thread descriptor
void *(*handler)(void *); // handler function
thread_handler handler; // handler name & function
} threadinfo;
// list of threads member
@ -59,17 +59,12 @@ typedef struct thread_list_{
struct thread_list_ *next; // next element
} threadlist;
// name - handler pair for threads registering functions
typedef struct{
const char *name; // handler name
void *(*handler)(void *); // handler function
} thread_handler;
threadinfo *findThreadByName(char *name);
threadinfo *findThreadByID(int ID);
threadinfo *registerThread(char *name, int ID, void *(*handler)(void *));
threadinfo *registerThread(char *name, int ID, thread_handler *handler);
threadlist *nextThread(threadlist *curr);
int killThreadByName(const char *name);
char *getmesg(msgidx idx, message *msg);
char *addmesg(msgidx idx, message *msg, char *txt);
char *getmesg(message *msg);
char *addmesg(message *msg, char *txt);
#endif // THREADLIST_H__