add more functions

This commit is contained in:
Edward Emelianov 2020-11-16 18:17:30 +03:00
parent 6f9c74f166
commit 3dc2627c64
10 changed files with 351 additions and 187 deletions

View File

@ -57,6 +57,8 @@ myoption cmdlnopts[] = {
// common options // common options
{"help", NO_ARGS, NULL, 'h', arg_int, APTR(&help), _("show this help")}, {"help", NO_ARGS, NULL, 'h', arg_int, APTR(&help), _("show this help")},
{"device", NEED_ARG, NULL, 'i', arg_string, APTR(&G.device), _("serial device name (default: none)")}, {"device", NEED_ARG, NULL, 'i', arg_string, APTR(&G.device), _("serial device name (default: none)")},
{"vid", NEED_ARG, NULL, 'V', arg_string, APTR(&G.vid), _("serial device vendor ID (default: none)")},
{"pid", NEED_ARG, NULL, 'P', arg_string, APTR(&G.pid), _("serial device product ID (default: none)")},
{"port", NEED_ARG, NULL, 'p', arg_string, APTR(&G.port), _("network port to connect (default: " DEFAULT_PORT ")")}, {"port", NEED_ARG, NULL, 'p', arg_string, APTR(&G.port), _("network port to connect (default: " DEFAULT_PORT ")")},
{"logfile", NEED_ARG, NULL, 'l', arg_string, APTR(&G.logfile), _("save logs to file (default: none)")}, {"logfile", NEED_ARG, NULL, 'l', arg_string, APTR(&G.logfile), _("save logs to file (default: none)")},
{"echo", NO_ARGS, NULL, 'e', arg_int, APTR(&G.echo), _("echo users commands back")}, {"echo", NO_ARGS, NULL, 'e', arg_int, APTR(&G.echo), _("echo users commands back")},

View File

@ -63,9 +63,9 @@ int main(int argc, char **argv){
signal(SIGTSTP, SIG_IGN); // ignore ctrl+Z signal(SIGTSTP, SIG_IGN); // ignore ctrl+Z
if(GP->logfile){ if(GP->logfile){
Cl_loglevel lvl = LOGLEVEL_ERR; Cl_loglevel lvl = LOGLEVEL_ERR; // default log level - errors
int v = GP->verb; int v = GP->verb;
while(v--){ // increase loglevel while(v--){ // increase loglevel for each "-v"
if(++lvl == LOGLEVEL_ANY) break; if(++lvl == LOGLEVEL_ANY) break;
} }
OPENLOG(GP->logfile, lvl); OPENLOG(GP->logfile, lvl);

View File

@ -22,5 +22,93 @@
#include "cmdlnopts.h" #include "cmdlnopts.h"
#include "processmotors.h" #include "processmotors.h"
#include "pusirobot.h" #include "pusirobot.h"
#include "socket.h"
#include <fcntl.h> // open
#include <stdio.h> // printf
#include <string.h> // strcmp
#include <sys/stat.h> // open
#include <unistd.h> // usleep
#include <usefull_macros.h> #include <usefull_macros.h>
message CANbusMessages = {0}; // CANserver thread is master
static void *stpemulator(void *arg);
// handlers for standard types
thread_handler CANhandlers[] = {
{"emulation", stpemulator},
{NULL, NULL}
};
thread_handler *get_handler(const char *name){
for(thread_handler *ret = CANhandlers; ret->name; ++ret){
if(strcmp(ret->name, name)) continue;
return ret;
}
return NULL;
}
/**
* @brief CANserver - main CAN thread; receive/transmit raw messages by CANbusMessages
* @param data - unused
* @return unused
*/
void *CANserver(_U_ void *data){
char *devname = find_device();
if(!devname){
LOGERR("Can't find serial device");
ERRX("Can't find serial device");
}
while(1){
int fd = open(devname, O_RDONLY);
if(fd == -1){
WARN("open()");
LOGWARN("Device %s is absent", devname);
FREE(devname);
double t0 = dtime();
while(dtime() - t0 < 5.){
if((devname = find_device())) break;
usleep(1000);
}
if(!devname){
LOGERR("Can't open serial device, kill myself");
ERRX("Can't open device, kill myself");
}else LOGMSG("Change device to %s", devname);
}else close(fd);
char *mesg = getmesg(idxMISO, &CANbusMessages);
if(mesg){
DBG("Received message: %s", mesg);
FREE(mesg);
// global messages to all clients:
addmesg(idxMISO, &ServerMessages, "CANserver works\n");
}
}
LOGERR("CANserver(): UNREACHABLE CODE REACHED!");
return NULL;
}
static void *stpemulator(void *arg){
threadinfo *ti = (threadinfo*)arg;
while(1){
char *mesg = getmesg(idxMISO, &ti->mesg);
if(mesg){
DBG("Stepper emulator got: %s", mesg);
addmesg(idxMISO, &ServerMessages, mesg);
/* do something */
FREE(mesg);
}
int r100 = rand() % 10000;
if(r100 < 20){ // 20% of probability
addmesg(idxMISO, &ServerMessages, "stpemulator works fine!\n");
}
if(r100 > 9998){
addmesg(idxMISO, &ServerMessages, "O that's good!\n");
}
usleep(1000);
}
LOGERR("stpemulator(): UNREACHABLE CODE REACHED!");
return NULL;
}

View File

@ -20,6 +20,12 @@
#ifndef PROCESSMOTORS_H__ #ifndef PROCESSMOTORS_H__
#define PROCESSMOTORS_H__ #define PROCESSMOTORS_H__
#include "threadlist.h"
extern message CANbusMessages;
extern thread_handler CANhandlers[];
void *CANserver(void *data);
thread_handler *get_handler(const char *name);
#endif // PROCESSMOTORS_H__ #endif // PROCESSMOTORS_H__

View File

@ -18,39 +18,24 @@
#include "aux.h" #include "aux.h"
#include "cmdlnopts.h" #include "cmdlnopts.h"
#include "processmotors.h"
#include "proto.h" #include "proto.h"
#include "threadlist.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <usefull_macros.h> #include <usefull_macros.h>
/** // standard answers of processCommand
* @brief sendraw - send raw data to CANbus static const char *ANS_OK = "OK\n";
* @param id - CANid (in string format) static const char *ANS_WRONGCANID = "Wrong CANID\n";
* @param data - data to send (delimeters are: space, tab, comma or semicolon) static const char *ANS_NOTFOUND = "Thread not found\n";
* WARNING! parameter `data` will be broken after this function static const char *ANS_CANTSEND = "Can't send message\n";
* id & data can be decimal, hexadecimal or octal
* @return answer to client static const char *sendraw(char *id, char *data);
*/ static const char *regthr(char *thrname, char *data);
char *sendraw(char *id, char *data){ static const char *unregthr(char *thrname, char *data);
char buf[128], *s, *saveptr; static const char *sendmsg(char *thrname, char *data);
if(!id) return strdup("Need CAN ID\n");
long ID, info[8]={0};
int i;
if(str2long(id, &ID)){
snprintf(buf, 128, "Wrong ID: %s\n", id);
return strdup(buf);
}
for(s = data, i = 0; ; s = NULL, ++i){
char *nxt = strtok_r(s, " \t,;\r\n", &saveptr);
if(!nxt) break;
if(str2long(nxt, &info[i])) break;
}
if(i > 8) return strdup("Not more than 8 data bytes\n");
snprintf(buf, 128, "ID=%ld, datalen=%d, data={%ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld}\n",
ID, i, info[0], info[1], info[2], info[3], info[4], info[5], info[6], info[7]);
return strdup(buf);
}
/* /*
* Commands format: * Commands format:
@ -61,24 +46,107 @@ char *sendraw(char *id, char *data){
* - data - function data (e.g. relmove turret1 150) * - data - function data (e.g. relmove turret1 150)
* you can get full list of functions by function `help` * you can get full list of functions by function `help`
*/ */
typedef struct{ typedef struct{
char *fname; const char *fname; // function name
char *(*handler)(char *arg1, char *arg2); const char *(*handler)(char *arg1, char *arg2); // data handler (arg1 and arg2 could be changed)
} cmditem; } cmditem;
// array with known functions // array with known functions
static cmditem functions[] = { static cmditem functions[] = {
{"raw", sendraw}, {"raw", sendraw},
{"register", regthr},
{"unregister", unregthr},
{"mesg", sendmsg},
{NULL, NULL} {NULL, NULL}
}; };
/**
* @brief sendraw - send raw data to CANbus
* @param id - CANid (in string format)
* @param data - data to send (delimeters are: space, tab, comma or semicolon)
* WARNING! parameter `data` will be broken after this function
* id & data can be decimal, hexadecimal or octal
* @return answer to client
*/
static const char *sendraw(char *id, char *data){
char buf[128], *s, *saveptr;
if(!id) return "Need CAN ID\n";
long ID, info[9]={0};
int i;
if(str2long(id, &ID)){
return ANS_WRONGCANID;
}
for(s = data, i = 0; i < 9; s = NULL, ++i){
char *nxt = strtok_r(s, " \t,;\r\n", &saveptr);
if(!nxt) break;
if(str2long(nxt, &info[i])) break;
}
if(i > 8) return "Not more than 8 data bytes\n";
snprintf(buf, 128, "ID=%ld, datalen=%d, data={%ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld}\n",
ID, i, info[0], info[1], info[2], info[3], info[4], info[5], info[6], info[7]);
addmesg(idxMISO, &CANbusMessages, buf);
return ANS_OK;
}
// register new thread
/**
* @brief regthr - register new thread
* @param thrname - thread name
* @param data - CANID and thread role
* @return answer to client
*/
static const char *regthr(char *thrname, char *data){
threadinfo *ti = findThreadByName(thrname);
if(ti) return "Thread exists\n";
char *saveptr;
char *id = strtok_r(data, " \t,;\r\n", &saveptr);
if(!id) return ANS_WRONGCANID;
char *role = strtok_r(NULL, " \t,;\r\n", &saveptr);
if(!role) return "No thread role\n";
DBG("Data='%s'; id='%s', role='%s'", data, id, role);
long ID;
if(str2long(data, &ID)){
return ANS_WRONGCANID;
}
DBG("Check ID");
ti = findThreadByID(ID);
if(ti) return "Thread with given ID exists\n";
thread_handler *h = get_handler(role);
if(!h) return "Unknown role\n";
if(!registerThread(thrname, ID, h->handler)) return "Can't register\n";
return ANS_OK;
}
/**
* @brief unregthr - delete thread
* @param thrname - thread's name
* @param data - unused
* @return answer
*/
static const char *unregthr(char *thrname, _U_ char *data){
if(killThreadByName(thrname)) return ANS_NOTFOUND;
return ANS_OK;
}
/**
* @brief sendmsg - send message to given thread
* @param thrname - thread's naem
* @param data - data to send
* @return answer
*/
static const char *sendmsg(char *thrname, char *data){
threadinfo *ti = findThreadByName(thrname);
if(!ti) return ANS_NOTFOUND;
if(!addmesg(idxMISO, &ti->mesg, data)) return ANS_CANTSEND;
return ANS_OK;
}
/** /**
* @brief processCommand - parse command received by socket * @brief processCommand - parse command received by socket
* @param cmd (io) - text command (after this function its content will be broken!) * @param cmd (io) - text command (after this function its content will be broken!)
* @return answer to user (or NULL if none) !!!ALLOCATED HERE, should be FREEd!!! * @return NULL or error answer to user
*/ */
char *processCommand(char *cmd){ const char *processCommand(char *cmd){
if(!cmd) return NULL; if(!cmd) return NULL;
char *saveptr = NULL, *fname = NULL, *procname = NULL, *data = NULL; char *saveptr = NULL, *fname = NULL, *procname = NULL, *data = NULL;
DBG("Got %s", cmd); DBG("Got %s", cmd);
@ -95,7 +163,7 @@ char *processCommand(char *cmd){
for(cmditem *item = functions; item->fname; ++item){ for(cmditem *item = functions; item->fname; ++item){
if(0 == strcasecmp(item->fname, fname)) return item->handler(procname, data); if(0 == strcasecmp(item->fname, fname)) return item->handler(procname, data);
} }
return strdup("Wrong command\n"); return "Wrong command\n";
} }
#if 0 #if 0

View File

@ -20,6 +20,6 @@
#ifndef PROTO_H__ #ifndef PROTO_H__
#define PROTO_H__ #define PROTO_H__
char *processCommand(char *cmd); const char *processCommand(char *cmd);
#endif // PROTO_H__ #endif // PROTO_H__

View File

@ -18,20 +18,21 @@
#include "aux.h" #include "aux.h"
#include "cmdlnopts.h" // glob_pars #include "cmdlnopts.h" // glob_pars
#include "processmotors.h"
#include "proto.h" #include "proto.h"
#include "socket.h" #include "socket.h"
#include "term.h" #include "term.h"
#include <arpa/inet.h> // inet_ntop #include <arpa/inet.h> // inet_ntop
#include <sys/ioctl.h>
#include <limits.h> // INT_xxx #include <limits.h> // INT_xxx
#include <netdb.h> // addrinfo #include <netdb.h> // addrinfo
#include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <signal.h> // pthread_kill #include <signal.h> // pthread_kill
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <sys/stat.h> // open
#include <sys/syscall.h> // syscall #include <sys/syscall.h> // syscall
#include <fcntl.h> // open
#include <unistd.h> // daemon #include <unistd.h> // daemon
#include <usefull_macros.h> #include <usefull_macros.h>
@ -40,41 +41,7 @@
// Max amount of connections // Max amount of connections
#define BACKLOG (30) #define BACKLOG (30)
extern glob_pars *GP; message ServerMessages = {0};
/*
* Define global data buffers here
*/
/**************** COMMON FUNCTIONS ****************/
/**
* wait for answer from socket
* @param sock - socket fd
* @return 0 in case of error or timeout, 1 in case of socket ready
*/
static int waittoread(int sock){
fd_set fds;
struct timeval timeout;
int rc;
timeout.tv_sec = 1; // wait not more than 1 second
timeout.tv_usec = 0;
FD_ZERO(&fds);
FD_SET(sock, &fds);
do{
rc = select(sock+1, &fds, NULL, NULL, &timeout);
if(rc < 0){
if(errno != EINTR){
WARN("select()");
LOGWARN("waittoread(): select() error");
return 0;
}
continue;
}
break;
}while(1);
if(FD_ISSET(sock, &fds)) return 1;
return 0;
}
/**************** SERVER FUNCTIONS ****************/ /**************** SERVER FUNCTIONS ****************/
//pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
@ -84,7 +51,7 @@ static int waittoread(int sock){
* @param textbuf - zero-trailing buffer with data to send * @param textbuf - zero-trailing buffer with data to send
* @return amount of sent bytes * @return amount of sent bytes
*/ */
static size_t send_data(int sock, char *textbuf){ static size_t send_data(int sock, const char *textbuf){
ssize_t Len = strlen(textbuf); ssize_t Len = strlen(textbuf);
if(Len != write(sock, textbuf, Len)){ if(Len != write(sock, textbuf, Len)){
WARN("write()"); WARN("write()");
@ -110,50 +77,35 @@ static char* stringscan(char *str, char *needle){
} }
#endif #endif
static void *handle_socket(void *asock){ /**
* @brief handle_socket - read and process data from socket
* @param sock - socket fd
* @return 0 if all OK, 1 if socket closed
*/
static int handle_socket(int sock){
FNAME(); FNAME();
int sock = *((int*)asock);
char buff[BUFLEN]; char buff[BUFLEN];
ssize_t rd; ssize_t rd = read(sock, buff, BUFLEN-1);
while(1){ if(rd < 1){
if(!waittoread(sock)){ // no data incoming DBG("read() == %zd", rd);
continue; return 1;
}
if(!(rd = read(sock, buff, BUFLEN-1))){
DBG("Client closed socket");
LOGDBG("Socket %d closed", sock);
break;
}
DBG("Got %zd bytes", rd);
if(rd < 0){ // error
LOGDBG("Close socket %d: read=%d", sock, rd);
DBG("Nothing to read from fd %d (ret: %zd)", sock, rd);
break;
}
// add trailing zero to be on the safe side
buff[rd] = 0;
// now we should check what do user want
// here we can process user data
DBG("user send '%s'", buff);
LOGDBG("user send '%s'", buff);
if(GP->echo){
if(!send_data(sock, buff)){
WARN("Can't send data to user, some error occured");
}
}
//pthread_mutex_lock(&mutex);
char *ans = processCommand(buff); // run command parser
if(ans){
send_data(sock, ans); // send answer
FREE(ans);
}
//pthread_mutex_unlock(&mutex);
} }
LOGDBG("Socket %d closed", sock); // add trailing zero to be on the safe side
DBG("Socket closed"); buff[rd] = 0;
close(sock); // now we should check what do user want
pthread_exit(NULL); // here we can process user data
return NULL; DBG("user %d send '%s'", sock, buff);
LOGDBG("user %d send '%s'", sock, buff);
if(GP->echo){
send_data(sock, buff);
}
//pthread_mutex_lock(&mutex);
const char *ans = processCommand(buff); // run command parser
if(ans){
send_data(sock, ans); // send answer
}
//pthread_mutex_unlock(&mutex);
return 0;
} }
// main socket server // main socket server
@ -165,25 +117,63 @@ static void *server(void *asock){
WARN("listen"); WARN("listen");
return NULL; return NULL;
} }
int nfd = 1;
// max amount of opened fd (+1 for server socket)
#define MAX_FDS (3)
struct pollfd poll_set[MAX_FDS];
memset(poll_set, 0, sizeof(poll_set));
poll_set[0].fd = sock;
poll_set[0].events = POLLIN;
while(1){ while(1){
socklen_t size = sizeof(struct sockaddr_in); poll(poll_set, nfd, 1); // poll for 1ms
struct sockaddr_in their_addr; for(int fdidx = 0; fdidx < nfd; ++fdidx){ // poll opened FDs
int newsock; if((poll_set[fdidx].revents & POLLIN) == 0) continue;
if(!waittoread(sock)) continue; poll_set[fdidx].revents = 0;
newsock = accept(sock, (struct sockaddr*)&their_addr, &size); if(fdidx){ // client
if(newsock <= 0){ int fd = poll_set[fdidx].fd;
LOGERR("server(): accept() failed"); //int nread = 0;
WARN("accept()"); //ioctl(fd, FIONREAD, &nread);
continue; if(handle_socket(fd)){ // socket closed - remove it from list
} close(fd);
pthread_t handler_thread; DBG("Client with fd %d closed", fd);
if(pthread_create(&handler_thread, NULL, handle_socket, (void*) &newsock)){ LOGMSG("Client %d disconnected", fd);
LOGERR("server(): pthread_create() failed"); for(int i = fdidx; i < nfd; ++i)
WARN("pthread_create()"); poll_set[i] = poll_set[i + 1];
}else{ --nfd;
LOGDBG("server(): listen thread created"); }
DBG("Thread created, detouch"); }else{ // server
pthread_detach(handler_thread); // don't care about thread state socklen_t size = sizeof(struct sockaddr_in);
struct sockaddr_in their_addr;
int newsock = accept(sock, (struct sockaddr*)&their_addr, &size);
if(newsock <= 0){
LOGERR("server(): accept() failed");
WARN("accept()");
continue;
}
struct in_addr ipAddr = their_addr.sin_addr;
char str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &ipAddr, str, INET_ADDRSTRLEN);
DBG("Connection from %s, give fd=%d", str, newsock);
LOGMSG("Got connection from %s, fd=%d", str, newsock);
if(nfd == MAX_FDS){
LOGWARN("Max amount of connections: disconnect %s (%d)", str, newsock);
send_data(newsock, "Max amount of connections reached!\n");
WARNX("Limit of connections reached");
close(newsock);
}else{
memset(&poll_set[nfd], 0, sizeof(struct pollfd));
poll_set[nfd].fd = newsock;
poll_set[nfd].events = POLLIN;
++nfd;
}
}
} // endfor
char *srvmesg = getmesg(idxMISO, &ServerMessages); // broadcast messages to all clients
if(srvmesg){ // send broadcast message to all clients or throw them to /dev/null
for(int fdidx = 1; fdidx < nfd; ++fdidx){
send_data(poll_set[fdidx].fd, srvmesg);
}
FREE(srvmesg);
} }
} }
LOGERR("server(): UNREACHABLE CODE REACHED!"); LOGERR("server(): UNREACHABLE CODE REACHED!");
@ -192,14 +182,9 @@ static void *server(void *asock){
// data gathering & socket management // data gathering & socket management
static void daemon_(int sock){ static void daemon_(int sock){
if(sock < 0) return; if(sock < 0) return;
double tgot = 0.; pthread_t sock_thread, canserver_thread;
char *devname = find_device(); if(pthread_create(&sock_thread, NULL, server, (void*) &sock) ||
if(!devname){ pthread_create(&canserver_thread, NULL, CANserver, NULL)){
LOGERR("Can't find serial device");
ERRX("Can't find serial device");
}
pthread_t sock_thread;
if(pthread_create(&sock_thread, NULL, server, (void*) &sock)){
LOGERR("daemon_(): pthread_create() failed"); LOGERR("daemon_(): pthread_create() failed");
ERR("pthread_create()"); ERR("pthread_create()");
} }
@ -209,34 +194,22 @@ static void daemon_(int sock){
LOGERR("Sockets thread died"); LOGERR("Sockets thread died");
pthread_join(sock_thread, NULL); pthread_join(sock_thread, NULL);
if(pthread_create(&sock_thread, NULL, server, (void*) &sock)){ if(pthread_create(&sock_thread, NULL, server, (void*) &sock)){
LOGERR("daemon_(): new pthread_create() failed"); LOGERR("daemon_(): new pthread_create(sock_thread) failed");
ERR("pthread_create()"); ERR("pthread_create(sock_thread)");
}
}
if(pthread_kill(canserver_thread, 0) == ESRCH){
WARNX("CANserver thread died");
LOGERR("CANserver thread died");
pthread_join(canserver_thread, NULL);
if(pthread_create(&canserver_thread, NULL, CANserver, NULL)){
LOGERR("daemon_(): new pthread_create(canserver_thread) failed");
ERR("pthread_create(canserver_thread)");
} }
} }
usleep(1000); // sleep a little or thread's won't be able to lock mutex usleep(1000); // sleep a little or thread's won't be able to lock mutex
if(dtime() - tgot < T_INTERVAL) continue;
tgot = dtime();
/*
* INSERT CODE HERE
* Gather data (poll_device)
*/
// copy temporary buffers to main // copy temporary buffers to main
//pthread_mutex_lock(&mutex); //pthread_mutex_lock(&mutex);
int fd = open(devname, O_RDONLY);
if(fd == -1){
WARN("open()");
LOGWARN("Device %s is absent", devname);
FREE(devname);
double t0 = dtime();
while(dtime() - t0 < 5.){
if((devname = find_device())) break;
usleep(1000);
}
if(!devname){
LOGERR("Can't open serial device, kill myself");
ERRX("Can't open device, kill myself");
}else LOGMSG("Change device to %s", devname);
}else close(fd);
/* /*
* INSERT CODE HERE * INSERT CODE HERE
* fill global data buffers * fill global data buffers

View File

@ -20,11 +20,9 @@
#ifndef __SOCKET_H__ #ifndef __SOCKET_H__
#define __SOCKET_H__ #define __SOCKET_H__
// timeout for socket closing #include "threadlist.h"
#define SOCKET_TIMEOUT (5.0)
// time interval for data polling (seconds)
#define T_INTERVAL (10.)
extern message ServerMessages;
void daemonize(char *port); void daemonize(char *port);
#endif // __SOCKET_H__ #endif // __SOCKET_H__

View File

@ -61,8 +61,9 @@ static char *popmessage(msglist **lst){
if(!lst || !*lst) return NULL; if(!lst || !*lst) return NULL;
char *ret; char *ret;
msglist *node = *lst; msglist *node = *lst;
ret = (*lst)->data; if(node->next) node->next->last = node->last; // pop not last message
*lst = (*lst)->next; ret = node->data;
*lst = node->next;
FREE(node); FREE(node);
return ret; return ret;
} }
@ -132,9 +133,13 @@ char *addmesg(msgidx idx, message *msg, char *txt){
if(L < 1) return NULL; if(L < 1) return NULL;
DBG("Want to add mesg '%s' with length %zd", txt, L); DBG("Want to add mesg '%s' with length %zd", txt, L);
if(pthread_mutex_lock(&msg->mutex[idx])) return NULL; if(pthread_mutex_lock(&msg->mutex[idx])) return NULL;
if(!pushmessage(&msg->text[idx], txt)) return NULL; msglist *node = pushmessage(&msg->text[idx], txt);
if(!node){
pthread_mutex_unlock(&msg->mutex[idx]);
return NULL;
}
pthread_mutex_unlock(&msg->mutex[idx]); pthread_mutex_unlock(&msg->mutex[idx]);
return msg->text[idx]->data; return node->data;
} }
/** /**
@ -197,12 +202,41 @@ threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)){
return ti; return ti;
} }
/**
* @brief killThread - kill thread by its descriptor
* @param lptr - pointer to thread descriptor
* @param prev - pointer to previous thread in list or NULL (to found it here)
* @return 0 if all OK
*/
int killThread(threadlist *lptr, threadlist *prev){
if(!lptr) return 1;
if(!prev){
threadlist *t = thelist;
for(; t; t = t->next){
if(t == lptr) break;
prev = lptr;
}
}
DBG("Delete '%s', prev: '%s'", lptr->ti.name, prev->ti.name);
threadlist *next = lptr->next;
if(lptr == thelist) thelist = next;
else if(prev) prev->next = next;
for(int i = 0; i < 2; ++i){
pthread_mutex_lock(&lptr->ti.mesg.mutex[i]);
char *txt;
while((txt = popmessage(&lptr->ti.mesg.text[i]))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.mesg.mutex[i]);
}
if(pthread_cancel(lptr->ti.thread)) WARN("Can't kill thread '%s'", lptr->ti.name);
FREE(lptr);
return 0;
}
/** /**
* @brief killThread - kill and unregister thread with given name * @brief killThread - kill and unregister thread with given name
* @param name - thread's name * @param name - thread's name
* @return 0 if all OK * @return 0 if all OK
*/ */
int killThread(const char *name){ int killThreadByName(const char *name){
if(!name || !thelist) return 1; if(!name || !thelist) return 1;
threadlist *lptr = thelist, *prev = NULL; threadlist *lptr = thelist, *prev = NULL;
for(; lptr; lptr = lptr->next){ for(; lptr; lptr = lptr->next){
@ -210,19 +244,7 @@ int killThread(const char *name){
prev = lptr; prev = lptr;
continue; continue;
} }
DBG("Found '%s', prev: '%s', delete", name, prev->ti.name); return killThread(lptr, prev);
threadlist *next = lptr->next;
if(lptr == thelist) thelist = next;
else if(prev) prev->next = next;
for(int i = 0; i < 2; ++i){
pthread_mutex_lock(&lptr->ti.mesg.mutex[i]);
char *txt;
while((txt = popmessage(&lptr->ti.mesg.text[i]))) FREE(txt);
pthread_mutex_destroy(&lptr->ti.mesg.mutex[i]);
}
if(pthread_cancel(lptr->ti.thread)) WARN("Can't kill thread '%s'", name);
FREE(lptr);
return 0;
} }
return 2; // not found return 2; // not found
} }

View File

@ -31,6 +31,7 @@ typedef struct msglist_{
struct msglist_ *next, *last; // other elements of list struct msglist_ *next, *last; // other elements of list
} msglist; } msglist;
// for all threads MASTER is the thread itself, slaves are all others
typedef enum{ typedef enum{
idxMOSI = 0, // master out, slave in idxMOSI = 0, // master out, slave in
idxMISO = 1, // master in, slave out idxMISO = 1, // master in, slave out
@ -58,10 +59,16 @@ typedef struct thread_list_{
struct thread_list_ *next; // next element struct thread_list_ *next; // next element
} threadlist; } threadlist;
// name - handler pair for threads registering functions
typedef struct{
const char *name; // handler name
void *(*handler)(void *); // handler function
} thread_handler;
threadinfo *findThreadByName(char *name); threadinfo *findThreadByName(char *name);
threadinfo *findThreadByID(int ID); threadinfo *findThreadByID(int ID);
threadinfo *registerThread(char *name, int ID, void *(*handler)(void *)); threadinfo *registerThread(char *name, int ID, void *(*handler)(void *));
int killThread(const char *name); int killThreadByName(const char *name);
char *getmesg(msgidx idx, message *msg); char *getmesg(msgidx idx, message *msg);
char *addmesg(msgidx idx, message *msg, char *txt); char *addmesg(msgidx idx, message *msg, char *txt);