Teensy 3.x multithreading library first release

Hello,

i played a bit around with this library.
WOW is what i think :)

But i have a question.
I am using the FlexCan Library of "collin" (https://github.com/collin80/FlexCAN_Library) and i put the Can-Signal handling in a thread to have this always running without interrupting my loop.
It looks good until some Can-Messages arrive when i send a request, then the Can-Read stop.

Example:
Code:
void thread_CanBus()
{
        while(1)
        {
               canbus();
        }

void setup()
{
// Do some can init stuff here
Can0.begin(125000);
// Set Filter and so on...


threads.addThread(thread_CanBus);

}

void loop()
{
// Do some Loop Stuff here...

}

And my CanBus task is like that:
Code:
void canbus()
{
  while (Can0.available())
  {
    Can0.read(rxmsg);
  }
}

When i put a counter in the while can0 available it counts good.
But when i send something through the canbus, after 3 times sending the counter of the can-messages goes down to 0. But i know 100% there are still messages.
Before it was about 250.
When i put this canbus(); in the loop everything runs fine, i can send as much messages i want and it will not stop.

Is this CanBus Library not compatible with this Multithread Lib?

Thanks a lot

Thomas.
 
you need to add mutex locks if your accessing (writing) from outside the thread

ex, if yiur doing writes off the loop and not the thread, and your thread scope is constantly polling the same hardware port, bounds are surely going to cause undefined behaviour
 
Thank you vey much for your answer.

I am not such a good programmer as you are so sorry for that, but i still try to learn :)


Here is my test sketch:

Code:
#include <Arduino.h>
#include <FlexCan.h>
#include <EEPROM.h>
#include <TeensyThreads.h>

static CAN_message_t msg, rxmsg;

unsigned char canmessage1[8] = { 0x12, 0x34, 0x56, 0x78, 0x90, 0xFF, 0xFF, 0xFF };
unsigned char buf[8];

long actualmilliseconds = 0;

long lLoopMillis = 0;
long lLastLoopMillis = 0;
long lLoops = 0;

long lCanLoopMillis = 0;
long lLastCanLoopMillis = 0;
long lCanLoops = 0;

String debugString;

void canbus()
{
        while (Can0.read(rxmsg))
        {
                //Can0.read(rxmsg);
                lCanLoopMillis = millis();
                lCanLoops++;
                if (lCanLoopMillis - lLastCanLoopMillis > 1000)
                {
                        Serial.print("CanLoops: ");
                        Serial.println(lCanLoops);
                        lCanLoops = 0;
                        lLastCanLoopMillis = lCanLoopMillis;
                }
        }
}

void thread_CanBus()
{
        while(1)
        {
                canbus();
        }
}

void setup()
{
        Serial.begin(15200);

        Can0.begin(125000);

        msg.len = 8;
        msg.id = 0x10;

        Serial.println("SETUP DONE");

        threads.addThread(thread_CanBus);

}     // END SETUP




void loop()
{

        actualmilliseconds = millis();

        lLoops++;
        if (actualmilliseconds - lLastLoopMillis > 1000)
        {
                Serial.print("Loops: ");
                Serial.println(lLoops);
                lLastLoopMillis = actualmilliseconds;
                lLoops = 0;
        }

        while (Serial.available())
        {
                char c = Serial.read();
                debugString += c;
        }
        debugString.trim();
        if (debugString.length() > 0)
        {
          if (debugString == "send")
          {
                  Serial.println("Sending canbus message");
                  memcpy(msg.buf, canmessage1, msg.len);
                  Can0.write(msg);
          }

                debugString = "";
        }

}     // END LOOP

And the output is that:
Code:
SETUP DONE
Loops: 79724
CanLoops: 159
Loops: 130941
CanLoops: 253
Loops: 132772
CanLoops: 253
Loops: 132775
CanLoops: 253
Loops: 132759
CanLoops: 253
Loops: 132770
CanLoops: 254
Loops: 132772
CanLoops: 253
Sending canbus message
Loops: 131782
CanLoops: 253
Loops: 131314
CanLoops: 253
Sending canbus message
Loops: 131314
CanLoops: 253
Sending canbus message
Loops: 131363
Loops: 131461
Loops: 131462
Loops: 131461
Loops: 131462
Loops: 131463
Loops: 131461

I read a bit about the mutex locks and have to play with it, because i understand nothing at the moment :)

Thanks
Thomas
 
in your loop() your doing can0.write, which means both loop and thread are accessing the hardware port at same time, thats why its crashing. if you implement mutex locks for the same hardware port (can0), you wont crash anymore since the other one waits for the other mutex to unlock before it locks it to use it

so to simplify it,

loop() is one thread
thread_Canbus is another

you have two choices here:

1) implement mutex locks
or
2) only use can0 from within a single thread

i posted some examples of mutex locking quite a few posts back on this thread, if you still need to find it
 
i will post a stripped down version of a few pages back:

Code:
#include "TeensyThreads.h"
#include <i2c_t3.h>

Threads::Mutex wire1; // example lock
Threads::Mutex spi1; // another lock
Threads::Mutex serial; // another lock...
Threads::Mutex can0; // <--[COLOR="#FF0000"]pay attention to your own lock![/COLOR]



void setup() {
  Wire.begin(I2C_MASTER, 0x00, I2C_PINS_18_19, I2C_PULLUP_EXT, 100000);
  Wire.setDefaultTimeout(200000); // 200ms
  pinMode(LED, OUTPUT);

  auto blinky = threads.addThread(blinkthread);
  threads.setTimeSlice(0, 1); // <-- loop() thread
  threads.setTimeSlice(1, 1); // blinkthread() thread
}


void loop() {
  byte x, y;
  { Threads::Scope scope(serial);   // <----
    Serial.println("1");                    // <---- [COLOR="#FF0000"]whatever is enclosed within the brackets is locked, last bracket here unlocks it. as an example, blinkthread will NOT be able to use "serial" locked scopes until this thread exits scope[/COLOR]
  }                                            // <----
  { Threads::Scope scope(wire1);  // <-- [COLOR="#FF0000"]this locks the i2c_wire port, "wire1" is just the name of the lock and has no attachment to the port itself, you can call it anything you want in the constructor.[/COLOR]
    Wire.beginTransmission(32); Wire.write(0x0C); Wire.write(0xFF); Wire.write(0xFF); Wire.endTransmission();
    Wire.beginTransmission(32); Wire.write(0x12); Wire.endTransmission(); Wire.requestFrom(32, 2);
    x = Wire.read();
    y = Wire.read();
  }
  { Threads::Scope scope(serial); // [COLOR="#FF0000"]scope starts lock on "serial" mutex[/COLOR]
    Serial.println("2");
    if ( x == 191 && y == 127 ) {
      myCounter++;
      digitalWriteFast(LED, 1);
      Serial.print("Wire Register Read Thread1: ");
      Serial.print(x);
      Serial.print(" : ");
      Serial.print(y);
      Serial.print(" GOOD! : Counter: ");
      Serial.println(myCounter);
      Serial.println("3");
    }
    else {
      Serial.print("Wire Register Read Thread1: ");
      Serial.print(x);
      Serial.print(" : ");
      Serial.print(y);
      Serial.println(" FAIL!");
    }
    Serial.println("4");
  } [COLOR="#FF0000"]// <-- lock is released since the scope ends here! anyone can use "serial" mutex now[/COLOR]
}

void blinkthread() {
  while (1) {
    byte x, y;
    { Threads::Scope scope(wire1); // <--[COLOR="#FF0000"] wire1 is used here as well, it takes turns with other thread to access it one at a time, never at same time[/COLOR]
      Wire.beginTransmission(32); Wire.write(0x0C); Wire.write(0xFF); Wire.write(0xFF); Wire.endTransmission();
      Wire.beginTransmission(32); Wire.write(0x13); Wire.endTransmission(); Wire.requestFrom(32, 1);
      x = Wire.read();
      Wire.beginTransmission(32); Wire.write(0x12); Wire.endTransmission(); Wire.requestFrom(32, 1);
      y = Wire.read();
    }
    { Threads::Scope scope(serial);
      Serial.println("Wire Written");
      if ( x == 127 && y == 191 ) {
        myCounter--;
        digitalWriteFast(LED, 0);
        Serial.print("Wire Register Read Thread2: ");
        Serial.print(x);
        Serial.print(" : ");
        Serial.print(y);
        Serial.print(" GOOD! : Counter: ");
        Serial.println(myCounter);
      }
      else
      {
        Serial.print("Wire Register Read Thread2: ");
        Serial.print(x);
        Serial.print(" : ");
        Serial.print(y);
        Serial.println(" FAIL!");
      }
    }


  }
}
 
So if I take your code above, untested, should look something like this->:

Code:
#include <Arduino.h>
#include <FlexCan.h>
#include <EEPROM.h>
#include <TeensyThreads.h>
[COLOR="#FF0000"]Threads::Mutex can0_lock;[/COLOR]
static CAN_message_t msg, rxmsg;

unsigned char canmessage1[8] = { 0x12, 0x34, 0x56, 0x78, 0x90, 0xFF, 0xFF, 0xFF };
unsigned char buf[8];

long actualmilliseconds = 0;

long lLoopMillis = 0;
long lLastLoopMillis = 0;
long lLoops = 0;

long lCanLoopMillis = 0;
long lLastCanLoopMillis = 0;
long lCanLoops = 0;

String debugString;

void canbus()
{
  while (1) {
    { Threads::Scope scope(can0_lock); // [COLOR="#FF0000"]lock the mutex[/COLOR]
      while (Can0.read(rxmsg))
      {
        //Can0.read(rxmsg);
        lCanLoopMillis = millis();
        lCanLoops++;
        if (lCanLoopMillis - lLastCanLoopMillis > 1000)
        {
          Serial.print("CanLoops: ");
          Serial.println(lCanLoops);
          lCanLoops = 0;
          lLastCanLoopMillis = lCanLoopMillis;
        }
      }
    } // [COLOR="#FF0000"]<-- can0_lock unlocks here[/COLOR]
  }
}

void setup()
{
  Serial.begin(15200);

  Can0.begin(125000); [COLOR="#FF0000"]// always be careful here, this is OKAY because you did not "addThread" yet, so only accessed once, keep that in mind, so no mutex lock needed here. :)[/COLOR]

  msg.len = 8;
  msg.id = 0x10;

  Serial.println("SETUP DONE");

  threads.addThread(canbus);

}     // END SETUP




void loop()
{

  actualmilliseconds = millis();

  lLoops++;
  if (actualmilliseconds - lLastLoopMillis > 1000)
  {
    Serial.print("Loops: ");
    Serial.println(lLoops);
    lLastLoopMillis = actualmilliseconds;
    lLoops = 0;
  }

  while (Serial.available())
  {
    char c = Serial.read();
    debugString += c;
  }
  debugString.trim();
  if (debugString.length() > 0)
  {
    if (debugString == "send")
    {
      Serial.println("Sending canbus message");
      { Threads::Scope scope(can0_lock); [COLOR="#FF0000"]// we start the lock here[/COLOR]
        memcpy(msg.buf, canmessage1, msg.len);
        Can0.write(msg);
      } [COLOR="#FF0000"]// it automatically unlocks here[/COLOR]
    }

    debugString = "";
  }

}     // END LOOP
 
Last edited:
Thank you soooo much :)

I am using the can0.write so often in a lot of tasks, so maybe i just set a variable and do the write stuff in the thread.

But i will test both, mutex and only write in the thread.

I will see which is better for me.

Thanks again :)
Thomas
 
no prob, you also had an additional function in the thread calling another one, above i took it out and pointed the addThread to canbus() directly, and put the while(1) in there for you
 
after you got it working, I would suggest dropping the String class and look into character arrays, unless you want your heap to look like swiss cheeze :)
 
I am trying to set up a structure where the whole structure is atomic. Found there are some issues with doing it this way. i.e., not being able to reference/change members of the atomic structure that I managed to figure out from a stackoverflow question. Doing it this way avoids any race conditions (according to what I read) if you are reading the data from a thread into a global variable etc.

So essentially I do what I read the data a temp structure so I can do an any changes and then assign it to the atomic version of the structure. In the main loop I put it back into another structure of the same type again so the main loop can use it (or another thread for doing some calculations - future updates).

I am getting an error to _defined_referenced to atomic_store or atomic_load if I try to set the atomic struct equal to the non-atomic struct. Probably can't do it this way but not sure how to do it. Can any one tell me a way to do this or if there is better way. I am using a T3.5 for this project.

Thanks
Mike
Happy Holidays to All


Here is the code:

Main:
Code:
//============================================================================
// -------------------------------------------------------------------
#include <EEPROM.h>
#include <NMEAGPS.h>
#include <Wire.h>
#include <Streaming.h>
#include <StopWatch.h>
#include <elapsedMillis.h>
#include <atomic>

#include "TeensyThreads.h"

#include <Adafruit_Sensor.h>
#include <Adafruit_BNO055.h>
#include <utility/imumaths.h>

NMEAGPS  gps; // This parses the GPS characters
#define gpsPort Serial1
gps_fix  fix;

#define telem Serial

elapsedMillis sensorIMU;
elapsedMillis sensorGPS;

StopWatch etm_millis;

// the interval in mS 
unsigned long currentTime;
unsigned long currentTime_avg;
unsigned long sensorRunTime;

/* Set the delay between fresh samples for BNO055*/
#define BNO055_SAMPLERATE_DELAY_MS (100)
Adafruit_BNO055 bno = Adafruit_BNO055(55);

#define EVENT_IMU 100
#define EVENT_GPS 1000

#define rad2deg 57.295779513082320876798154814105f
#define m2ft 3.280839895f
#define toMps 0.44704f

struct imuSensor {
    float roll;
    float pitch;
    float heading;
    float rotx;
    float roty;
    float rotz;
    float accelx;
    float accely;
    float accelz;   
};
struct imuSensor tempAhrs, ahrs;
std::atomic<imuSensor> threadAhrs;

struct ublox {
    int32_t lat;
    int32_t lng;
    int fType;
    int fQuality;
    float alt;
    int cog;
    float sog;
    int satno;
    int hdop;
    int pdop;
} rtk;

int IMU, GPS;

void setup() {
    telem.begin(57600);
    while ( !telem && millis() < 2000 ) {};

    Wire.begin();
    Wire.setClock(400000L);
    
    gpsPort.setRX(27);
    gpsPort.begin(57600);

    delay(2000);
    
    //==================================================
    // Initial Adafruit BNO055
    //==================================================
    
    BNO055_Init();
    
    delay(100);

    IMU = threads.addThread(readIMU);
    GPS = threads.addThread(readGPS);
    
    threads.setTimeSlice(0, 1);
    threads.setTimeSlice(IMU, 50);
    threads.setTimeSlice(GPS, 50);
    if (threads.getState(IMU) == Threads::RUNNING) Serial.println("BNO055 thread started");
    if (threads.getState(GPS) == Threads::RUNNING) Serial.println("GPS thread started");  

}

void loop() {
    ahrs = threadAhrs;
    if(sensorIMU > EVENT_IMU)
        printIMU();
    if(sensorGPS > EVENT_GPS)
        printGPS();


}

void printIMU() {
  telem << "IMU DATA (attitude, rotation, accel): " << endl;
  telem << _FLOAT(ahrs.roll, 6) << ", " << _FLOAT(ahrs.pitch,6) << ", " << _FLOAT(ahrs.heading,6) << endl;
  telem << _FLOAT(ahrs.rotx, 6) << ", " << _FLOAT(ahrs.roty,6) << ", " << _FLOAT(ahrs.rotz,6) << endl;
  telem << _FLOAT(ahrs.accelx, 6) << ", " << _FLOAT(ahrs.accely,6) << ", " << _FLOAT(ahrs.accelz,6) << endl;
}

void printGPS() {
  telem << "GPS DATA:" << endl;
  telem.printf("%12lf, %12lf\n",rtk.lat/1e7L, rtk.lng/1e7L);
  Serial.print( fix.latitudeL() );
  telem << rtk.fQuality << endl;
  telem << _FLOAT(rtk.alt, 6) << ", " << rtk.cog << ", ";
  telem << _FLOAT(rtk.sog, 6) << endl;
  telem << rtk.satno << endl;
}

Thread:
Code:
void readIMU(){
  while(1){
    sensors_event_t sensor;
    bno.getEvent(&sensor);
    imu::Vector<3> euler = bno.getVector(Adafruit_BNO055::VECTOR_EULER);
    imu::Vector<3> accel_linear = bno.getVector(Adafruit_BNO055::VECTOR_LINEARACCEL);
    imu::Vector<3> rot_rate = bno.getVector(Adafruit_BNO055::VECTOR_GYROSCOPE);
    
    tempAhrs.roll = (float)sensor.orientation.y;
    tempAhrs.pitch = (float)sensor.orientation.z;
    tempAhrs.heading = (float)sensor.orientation.x;

    tempAhrs.rotx = (float) rot_rate.x() * rad2deg;
    tempAhrs.roty = (float) rot_rate.y() * rad2deg;
    tempAhrs.rotz = (float) rot_rate.z() * rad2deg;  //heading
    
    tempAhrs.accelx = (float) accel_linear.x();
    tempAhrs.accely = (float) accel_linear.y();
    tempAhrs.accelz = (float) accel_linear.z();

    // Adjust heading to account for declination
    //tempAhrs.heading += DEC_ANGLE;
    
    //telem << "Compass Yar/dec Heading: " << yar_heading << " , " << heading << endl;
    
    // Correct for when signs are reversed.
    if(tempAhrs.heading < 0)
      tempAhrs.heading += 360.;
    
    // Check for wrap due to addition of declination.
    if(tempAhrs.heading > 360.)
      tempAhrs.heading -= 360.;

    threadAhrs = tempAhrs;
    
    //telem << roll << "\t" << pitch << "\t" << yar_heading << endl;
    //telem << "Changed heading: " << (float)sensor.orientation.x << endl;
    threads.delay(BNO055_SAMPLERATE_DELAY_MS);
  }
}

void readGPS(){
 while(1){
  while (gps.available( gpsPort )) {
    fix = gps.read();

    rtk.fQuality = fix.status;
    rtk.satno = fix.satellites;
    rtk.hdop = fix.hdop;
    rtk.lat = fix.latitudeL();
    rtk.lng = fix.longitudeL();
    rtk.cog = fix.heading_cd();
    rtk.sog = fix.speed_mph() * toMps;
    rtk.alt = fix.altitude();
    //cout << fix.satellites << ", " << fix.hdop << ", " << fix.pdop << ", ";
    //telem << "LAT: " << _FLOAT(fix.latitude(),8)                << ", " << _FLOAT(fix.longitude(),8) << ", "  << fix.heading() << ", ";
    //cout << fix.speed_kph() << ", " << fix.altitude() << ", " << fix.lat_err() << ", ";
    //cout << fix.lon_err() << endl;
  } 
 }
}
 
what i did for the canbus global variables since i had different functions reading different busses and handling the data, i put the variable within the same scope lock so it doesnt run unless its inside the same scope that prevents others from touching it
 
Hi tonton81. Lets see if I understand your approach with a little example on scope lock's (still gives me a headache) using something like thomas7's example:

Code:
float roll;

Threads::Mutex GPS;
Threads::Mutex IMU;

void readGPS() {
  while (1) {
    { Threads::Scope scope(IMU);  [COLOR="#0000FF"]//sets lock[/COLOR]
[COLOR="#0000FF"]       //Read global variables from imu
       //wire1 and any global variables should be locked
       //do any mods i want to the variable[/COLOR]
       roll = roll + 180;
    }  [COLOR="#0000FF"]// lock gets released[/COLOR]
  }
}

void updateWaypoint(){
    Threads::Scope scope(GPS);  [COLOR="#0000FF"]//sets lock
       //use any global variables should be locked
[/COLOR]       lat = lat - oldlat;
       [COLOR="#0000FF"]// lat is locked until thread finishes (theres a lot more to this routing of course
[/COLOR]     new heading = cccc;
[COLOR="#0000FF"]      // lock gets released and then readIMU is free to update the data[/COLOR]
}

void setup(){
[COLOR="#0000FF"]      //add thread GPS - reads GPS data
      //add thread IMU - reads IMU so i need another scope lock[/COLOR]
}
      

void loop(){
     { (Threads::Scope scope(GPS) && Threads::Scope scope(IMU)); [COLOR="#0000FF"] //sets lock  How do i add a second lock??
[/COLOR]             updateWaypoint();
             getSteeringAngle();
      } [COLOR="#0000FF"] //both locks get released???[/COLOR]
}

or would it be better to put all the sensor reads in loop because i will have a third for wheel encoders that will need to be added

Thanks
Mike
 
for reading its no problem i think even for uint16_t but the scope IMU lock above protects roll as long as roll is not updated anywhere else of course. youd have to include those rolls in a lock as well but in the same named scopes. if you have 2 different locks on 2 different scopes, and roll is in both, they have the right to edit it at same time *boom*

i prefer to put the mutex lock on the port itself, not just the devices, think of it as grouping if you will
SPI0_lock for example, and use those locks instead, thereby if your using multiple devices in the same bus, using a single lock, it doesnt matter who edits “roll”, because the group lock SPI0_lock everyone who uses it, has to wait their turn
 
Ok so if I do something like this:

Code:
Threads::Mutex wire_lock;
Threads::Mutex serial1_lock;

struct ublox {
    double lat;
    double lng;
    int fType;
    int fQuality;
    float alt;
    int cog;
    float sog;
    int satno;
    int hdop;
    int pdop;
} rtk;

void readGPS() {
  while (1) {
    { Threads::Scope scope(serial1_lock);  //sets lock
       //Read global variables from imu
       //wire1 and any global variables should be locked
       //do any mods i want to the variable
       rtk.lat = rtk.latitudeL()/10000000;
    }  // lock gets released
  }
}

void readIMU) {
  while (1) {
    { Threads::Scope scope(wire_lock);  //sets lock
       //Read global variables from imu
       //wire1 and any global variables should be locked
       //do any mods i want to the variable
       imu.yaw = imu.roll + 180;
    }  // lock gets released
  }
}

void updateWaypoint(){
       //use any global variables should be locked
       // might better to use local copies of any variables that I might change
       lat = lat - oldlat;
       // lat is locked until thread finishes (theres a lot more to this routing of course
     new heading = cccc;
      // lock gets released and then readIMU is free to update the data
}

void setup(){
      //add thread GPS - reads GPS data
      //add thread IMU - reads IMU so i need another scope lock
}
      

void loop(){
     { Threads::Scope scope(serial1_lock ) ;
          {Threads::Scope scope(wire1_lock);  //sets lock  How do i add a second lock??
             updateWaypoint();
             getSteeringAngle();
           }
      }
}
 
try to do only single locks never double, you must keep them in the exact order if you use 2 or more and has potential to deadlock and hard to keep track of. do you really need wire1 and serial locked at same time? i usually have it do one and close then do another, this gives other threads possibility to use serial as well while your in wire1 locked

but yeah to add a series of locks you did it right, but still dangerous to do
 
do you really need wire1 and serial locked at same time
Now that you mention it. Not really. Could do it with one. Would simplify the whole thing

try to do only single locks never double, you must keep them in the exact order if you use 2 or more and has potential to deadlock and hard to keep track of.
Yeah, that what I was reading when I looked up multiple locks.

Thanks for the help and the lesson on locks. Always got me confused.
Mike
 
Guys, when using TeensyThreads and you have a thread with an infinite loop as it's body, should that loop call yield() every pass through, or will the threading know when to preempt this so that all threads on the same priority get a chance to run in a timely manner?
 
you dont need to call yield on threads, the library does all the switch work for you, regardless whats in your infinite loop
 
you dont need to call yield on threads, the library does all the switch work for you, regardless whats in your infinite loop

OK, this is good to know and is what I would expect. We were having a threads.sleep(1) at the end of all our infinite loops.

In our system we have 3 threads running at the default priority, and one thread running at a higher priority to service the UI (capture button presses, update the LCD etc.). As an experiment I removed the sleeps from all our loops, but when I re-ran the system, I am pretty sure I observed a slowdown in the button/UI responsiveness. Does this sound right?
 
no doesnt sound right, if you sleep the threads it just temporarily stops queuing that thread from switching, but if 2 threads are fighting over a hardware interface that could be your problem, are two threads accessing your lcd port or library ? :)

mutex locks would ressolve this if you say yes, as it gives them turns instead of fighting over it
 
Thanks - I don't think we have > 1 thread trying to access each of the hw resources, but will verify this tonight.

Does Teensy Threads support task time-slicing (e.g. giving fair and prompt scheduling to two threads at the same priority level)?

Teensy Threads is working well for us overall, nice work guys :)
 
Back
Top