Teensy 4.0 - Corrupted Serial Data

syvo

Member
Technology stack:
Processing (Java), Arduino IDE/Teensy4.0 with the FlexCAN_T4 library

Issue:
Teensy sends a burst of data over Serial to Processing. All good with that. When a Processing code sends a small amount of data (14 bytes) to Teensy4.0 at the same time, Teensy gets the correct data (according to our second hardware COM monitor). But, the data which Teensy sends to Processing gets received corrupted on the Processing side. If I do not send any data from Processing to Teensy, all works fine, and the correct data from Teensy comes to Processing at the maximum speed. It looks like the incoming data from Processing interrupts the outcoming data from Teensy in its Tx Serial buffer, i.e. a part of previous Teensy data is sent, then a new data is added to the Tx Serial buffer, and the rest of the previous messages shifts to the end of Tx Serial buffer. This way, I get a constant shift in the data flow. It seems I need to somehow tell Teensy to complete its current transmission, stop any new transmission, receive the data from Processing over Serial, and resume transmission over Serial to Processing. It seems like both Teensy and Processing prioritize serial reception over transmission. Based on what I observed by using a second FTDI hardware serial, the data is not corrupted when put in the Tx Serial buffer. It gets corrupted specifically at the moment of transmission at low level.

The Teensy code is attached. If you can point me out what is wrong or have any ideas, I would appreciate your comment.

C-like:
#include <FlexCAN_T4.h>

/* ============= defines ============= */
#define CAN_PAYLOAD_SIZE_BYTES 8  // the maximum payload length

#define NUM_RX_MB_CAN1 5          // number of Rx mailboxes (ID pairs) for CAN1
#define NUM_RX_MB_CAN2 5

#define PAYLOAD_START_INDEX 4
#define ID_START_INDEX 0
#define BUS_INDEX 12
#define PAYLOAD_LEN_INDEX 13

#define SERIAL_BAUDRATE 500000

#define CAN1_BAUDRATE 500000
#define CAN2_BAUDRATE 50000  

#define CAN1_FILTERS_ON
#define CAN2_FILTERS_ON

#define HardwareSerial Serial3

unsigned char serialRxBuf[500];

/* ============= data types ============= */
#pragma pack(1) // instruct the compiler to avoid padding
struct CanPacket
{
  uint32_t id;                                // 4 bytes [0 1 2 3]
  uint8_t payload[CAN_PAYLOAD_SIZE_BYTES];    // 8 bytes [4 5 6 7 8 9 10 11]
  uint8_t bus;                                // 1 byte  [12]
  uint8_t payload_length;                     // 1 byte  [13]
};

#pragma pack(1)
union CanPacketByte
{
  struct CanPacket packet;
  unsigned char binary[sizeof(CanPacket)];
};

/* ============= variables ============= */
FlexCAN_T4<CAN1, RX_SIZE_256, TX_SIZE_16> can1;  // CAN2.0: can1 port (500kbps data rate): 256-byte receive buffer / 16 byte transmit buffer
FlexCAN_T4<CAN2, RX_SIZE_256, TX_SIZE_16> can2;  // CAN2.0: can2 port (50kbps data rate)
CAN_message_t msg;                               // a structure from the FlexCAN_T4 library representing a CAN message (used to send and receive CAN messages)
long int cntrMsgSerialPrint = 0;                 // keep track of the sequence number of the messages printed

/* ============= function declarations ============= */
void canSniff20(const CAN_message_t &msg);
int readHardwareSerial(union CanPacketByte& p);
void virtualSerialPrintCanMsg(uint32_t id, uint8_t CAN_Msg_Length, const uint8_t buf_1[8], uint8_t bus);

/* ============= initial settings ============= */
void setup() {
  // hardware serial (DEV)
  HardwareSerial.begin(SERIAL_BAUDRATE);
  HardwareSerial.addMemoryForRead(serialRxBuf, 500);
  HardwareSerial.attachRts(18);   // Serial RTS (request to send), flow control
  HardwareSerial.attachCts(19);   // Serial CTS (clear to send), flow control

  /* configure CAN 1 */
  can1.begin();
  can1.setBaudRate(CAN1_BAUDRATE);                  
 
  can1.mailboxStatus();                       // check the status of the mailbox before configuration
  can1.setMaxMB(NUM_RX_MB_CAN1 + 1);          // +1 for Tx
 
  // initialize Tx mailboxes
  can1.setMB((FLEXCAN_MAILBOX)0, TX, EXT);

  // initialize Rx mailboxes
  can1.setMB((FLEXCAN_MAILBOX)1, RX, STD);
  can1.setMB((FLEXCAN_MAILBOX)2, RX, EXT);
  can1.setMB((FLEXCAN_MAILBOX)3, RX, EXT);
  can1.setMB((FLEXCAN_MAILBOX)4, RX, EXT);
  can1.setMB((FLEXCAN_MAILBOX)5, RX, EXT);

  #ifdef CAN1_FILTERS_ON
    can1.setMBFilter(REJECT_ALL);   // block all data before setting filters
  #endif

  can1.enableMBInterrupts();        // make all mailboxes interrupt enabled
  can1.onReceive(canSniff20);   // allows all MBs to be received in the supplied callback

  /* configure filters */
  #ifdef CAN1_FILTERS_ON
    can1.enhanceFilter(MB1);   // prevent the frames "bleeding through" filters (to avoid limitations of the lib)
    can1.enhanceFilter(MB2);
    can1.enhanceFilter(MB3);
    can1.enhanceFilter(MB4);
    can1.enhanceFilter(MB5);
  #endif

  #ifdef CAN1_FILTERS_ON
    can1.setMBFilter(MB1, 0x4E0, 0x620);              
    can1.setMBFilter(MB2, 0x18DA28F1, 0x18DAF128);    
    can1.setMBFilter(MB3, 0x18DA40F1, 0x18DAF140);    
    can1.setMBFilter(MB4, 0x18DA10F1, 0x18DAF110);    
    can1.setMBFilter(MB5, 0x18DB33F1, 0x18DBF133);    
  #endif
 
  can1.mailboxStatus(); // get the updated settings of mailboxes

  /* configure CAN 2 */
  can2.begin();
  can2.setBaudRate(CAN2_BAUDRATE);                

  can2.mailboxStatus(); // get default initialization of mailboxes
  can2.setMaxMB(NUM_RX_MB_CAN2 + 1);         // +1 for Tx
 
  // initialize Tx mailboxes
  can2.setMB((FLEXCAN_MAILBOX)0, TX, EXT);

  // initialize Rx mailboxes
  can2.setMB((FLEXCAN_MAILBOX)1, RX, EXT);
  can2.setMB((FLEXCAN_MAILBOX)2, RX, EXT);
  can2.setMB((FLEXCAN_MAILBOX)3, RX, EXT);
  can2.setMB((FLEXCAN_MAILBOX)4, RX, EXT);
  can2.setMB((FLEXCAN_MAILBOX)5, RX, EXT);

  #ifdef CAN2_FILTERS_ON
    can2.setMBFilter(REJECT_ALL);   // block all data before setting filters
  #endif

  can2.enableMBInterrupts();      // make all mailboxes interrupt enabled

  can2.onReceive(canSniff20);   // allows all MBs to be received in the supplied callback

  /* configure filters */
  #ifdef CAN2_FILTERS_ON
    can2.enhanceFilter(MB1);
    can2.enhanceFilter(MB2);
    can2.enhanceFilter(MB3);
    can2.enhanceFilter(MB4);
    can2.enhanceFilter(MB5);
  #endif

  #ifdef CAN2_FILTERS_ON
    can2.setMBFilter(MB1, 0x18DA98F1, 0x18DAF198);  
    can2.setMBFilter(MB2, 0x18DA60F1, 0x18DAF160);  
    can2.setMBFilter(MB3, 0x18DAF1C0, 0x18DAC0F1);  
    can2.setMBFilter(MB4, 0x18DAA1F1, 0x18DAF1A1);  
    can2.setMBFilter(MB5, 0x18DAF140, 0x18DA40F1);  
  #endif

  can2.mailboxStatus(); // get the updated settings of mailboxes
  delay(10000);   // ms
}

/* ============= main loop ============= */
void loop() {
  // In interrupt mode (without events() being used), the message is fired directly to callback without using queues.

  // In interrupt mode (with events() being used), the ISR sends the mailbox frame directly to the queue where it can be dequeued to the callback via events() in loop().

  // In polling mode, message is read directly from mailbox and nothing is queued. Failing to read the messages will cause message overruns if the mailboxes are full.

  CanPacketByte cpb;
  memset(&cpb, 0, sizeof(cpb));
 
  int serialLen = readHardwareSerial(cpb);

  // if we received a packet of correct length
  if (serialLen == sizeof(CanPacket)) {
    // if the size of payload is correct
    if (cpb.packet.payload_length <= CAN_PAYLOAD_SIZE_BYTES) {
      // prepare a CAN message
      for (uint8_t i = 0; i < cpb.packet.payload_length; i++) {
        msg.buf[i] = cpb.packet.payload[i];
      }
     
      msg.len = cpb.packet.payload_length;
      msg.id = cpb.packet.id;
      msg.flags.extended = 1;

      // write to a CAN bus
      if(cpb.packet.bus == 1) can1.write(msg);
      else can2.write(msg);
    //  delay(1);
    }  
  }

  // in interrupt mode, dequeue to the callback (push received interrupt frames from the queue to the callback)
  can1.events();
  can2.events();
}

/* ============= function definition ============= */

/*
 * @brief: a global callback invoked by the FlexCAN library when a CAN message is received.
 * @params:
 *    msg - the received CAN message
 */
void canSniff20(const CAN_message_t &msg) {
  CanPacket cp;
  memset(&cp, 0, sizeof(cp));
  cp.id = msg.id;
  cp.payload_length = msg.len;
  cp.bus = msg.bus;

  for (int i = 0; i < cp.payload_length; i++) {
    cp.payload[i] = msg.buf[i];
  }

  HardwareSerial.write( (byte*)&cp, sizeof(CanPacket) );            // send the packet
  HardwareSerial.flush();                                           // wait for any transmitted data still in buffers to actually transmit (returns immediately if no data is waiting in a buffer to transmit)
}

/*
 * @brief: reads a CAN packet (represented as a union of bytes) from the serial port of the Arduino board
 * @params:
 *  p - represent a CAN packet in two different formats: as a struct of separate fields, or as an array of bytes
 */
int readHardwareSerial(union CanPacketByte& p)
{
  // if a partial packet received, skip
  if (HardwareSerial.available() < (int)sizeof(CanPacket)) return 0;

  size_t nByte = 0;
  while (HardwareSerial.available() > 0) {
    char rc = HardwareSerial.read();
    p.binary[nByte++] = rc;

    if (nByte >= sizeof(CanPacket)) {
      // Log error here
      break;
    }
  }
  return (int)nByte;
}
 
Sorry, I have not done much of anything with CAN. Hopefully one of other members who use it can give
some advice. Not sure if you are allowed to still edit the above message header, but maybe mentioning CAN might
get more attention from those who use it.

You might check up on github: https://github.com/tonton81/FlexCAN_T4
as well to see if there might be any hints, in areas like issues.
 
Is canSniff20 called in the CAN interrupt routine? Probably yes. Is it possible to use Hardwareserial.write in an interrupt context ????
 
I narrowed down the problem with the following code. The issue is that both Processing and Teensy send data asynchronously. I am looking for a solution on how to eliminate the data shift. In most cases, right after I send some data from Processing, I receive broken data from Teensy.

It seems to me that not the entire tx serial buffer on Teensy gets sent, and then it is interrupted by the incoming data from Processing. It causes the tx from Teensy to stop. Then, a new data is added to the tx serial buffer on Teensy while the remnants of the previous message remains in the buffer and gets shifted towards its end.

I have attached an archive with some test code. It contains a Teensy4.0 sketch and a Processing sketch to recreate the issue.
 

Attachments

  • serial_test.zip
    5.6 KB · Views: 5
cpb should be static, and not zeroed everytime round the loop() function. You've coupled the RX and TX state machines together, they need to be independent. Basically loop() should look like this:
Code:
void loop()
{
  progress_read();
  progress_write();
  if (have_whole_incoming_packet())
    process_incoming_packet();
}

Don't have any blocking code (such as flush()), then each direction can run independently and your packet handling is triggered when a full packet is present.
 
cpb should be static, and not zeroed everytime round the loop() function. You've coupled the RX and TX state machines together, they need to be independent. Basically loop() should look like this:
Code:
void loop()
{
  progress_read();
  progress_write();
  if (have_whole_incoming_packet())
    process_incoming_packet();
}

Don't have any blocking code (such as flush()), then each direction can run independently and your packet handling is triggered when a full packet is present.
Thank you. The issue is not with serial Rx itself, where cpb is used. Serial Rx always receives correct data. The issue is with serial Tx which gets interrupted by serial Rx. I made cpb a global variable, not to recreate it in each loop. I also erased flush(). However, the issue still remains.
 
I read some older threads, and tried to use yield() (with the wrapper attachYieldFunc() from this library). It did not help. Then, I tried serialEvent() and it seems not to help either. I am posting my most recent Teensy code.


C-like:
#include <FlexCAN_T4.h>
// #include "attachYieldFunc.h"

/* ============= defines ============= */
#define CAN_PAYLOAD_SIZE_BYTES 8  // the maximum payload length
#define SERIAL_BAUDRATE 500000

#define CustomSerial Serial
#define MonitorSerial Serial3

/* ============= data types ============= */
#pragma pack(1) // instruct the compiler to avoid padding
struct CanPacket
{
  uint32_t id;                                // 4 bytes [0 1 2 3]
  uint8_t payload[CAN_PAYLOAD_SIZE_BYTES];    // 8 bytes [4 5 6 7 8 9 10 11]
  uint8_t bus;                                // 1 byte  [12]
  uint8_t payload_length;                     // 1 byte  [13]
};

#pragma pack(1)
union CanPacketByte
{
  struct CanPacket packet;
  unsigned char binary[sizeof(CanPacket)];
};

/* ============= variables ============= */
// static unsigned char serialRxBuf[500];
long int cntrMsgSerialPrint = 0;                 // keep track of the sequence number of the messages printed
CanPacket cp;
CAN_message_t msg;                               // a structure from the FlexCAN_T4 library representing a CAN message (used to send and receive CAN messages)
CanPacketByte cpb;
bool sendDataFlag;
int txMsgCounter;

/* ============= function declarations ============= */
void txCanPacketViaSerial();
int readCustomSerial(union CanPacketByte& p);
void MonitorSerialPrintCanMsg(uint32_t id, uint8_t CAN_Msg_Length, const uint8_t buf_1[8], uint8_t bus);
void myCallback();

/* ============= initial settings ============= */
void setup() {
  sendDataFlag = false;
  txMsgCounter = 0;

  // prepare the test message
  memset(&cp, 0, sizeof(cp));
  cp.id             = 0x18DABBCC;
  cp.payload_length = 8;
  cp.bus            = 2;
  cp.payload[0]     = 0x10;
  cp.payload[1]     = 0x01;
  cp.payload[2]     = 0x02;
  cp.payload[3]     = 0x03;
  cp.payload[4]     = 0x04;
  cp.payload[5]     = 0x05;
  cp.payload[6]     = 0x06;
  cp.payload[7]     = 0x07;

  // attachYieldFunc(myCallback); // attach our callback to the yield stack
 
  // main serial
  CustomSerial.begin(SERIAL_BAUDRATE);
}

/* ============= main loop ============= */
void loop() { 
  serialEvent();

  /* send data over via serial */
  if (sendDataFlag) {
    txCanPacketViaSerial();
    txCanPacketViaSerial();
    txCanPacketViaSerial();
    txMsgCounter++;
    if (txMsgCounter == 1000) {
      sendDataFlag = false;
      txMsgCounter = 0;
    }
  }
}

void serialEvent() {
  myCallback();
}

void myCallback() {
  /* read incoming data from serial */
  memset(&cpb, 0, sizeof(cpb));
  int serialLen = readCustomSerial(cpb);
 
  /* process incoming packet */
  if (serialLen == sizeof(CanPacket)) {
    if ( (!sendDataFlag) && (cpb.packet.bus == 2)) {
      // start
      sendDataFlag = true;
    } else {
      if (cpb.packet.bus == 1) {
        // prepare a CAN message
        for (uint8_t i = 0; i < cpb.packet.payload_length; i++) {
          msg.buf[i] = cpb.packet.payload[i];
        }
        
        msg.len = cpb.packet.payload_length;
        msg.id = cpb.packet.id;
        msg.flags.extended = 1;

      }
    }
  }
}

void txCanPacketViaSerial() {
  CustomSerial.write( (byte*)&cp, sizeof(CanPacket) );            // send the packet
  // CustomSerial.flush();                                           // wait for any transmitted data still in buffers to actually transmit (returns immediately if no data is waiting in a buffer to transmit)
}

int readCustomSerial(union CanPacketByte& p)
{
  // if a partial packet received, skip
  if (CustomSerial.available() < (int)sizeof(CanPacket)) return 0;

  size_t nByte = 0;
  while (CustomSerial.available() > 0) {
    char rc = CustomSerial.read();
    p.binary[nByte++] = rc;

    if (nByte >= sizeof(CanPacket)) {
      // Log error here
      break;
    }
  }
  return (int)nByte;
}

void MonitorSerialPrintCanMsg(uint32_t id, uint8_t CAN_Msg_Length, const uint8_t buf_1[8], uint8_t bus) {
  unsigned long prevTime_ms = 0;
  static unsigned long currTime_ms = 0;   // keeps its value between function calls, created/initialized only once

  prevTime_ms = currTime_ms;
  currTime_ms = millis();
  float dt_s = (currTime_ms - prevTime_ms) / 1000.0f; // msec --> sec
 
   // print CAN identifier
  MonitorSerial.print(id, HEX);
  MonitorSerial.print("\t\t");

  // print CAN data
  for (int i = 0; i < CAN_Msg_Length; i++) {
    if (buf_1[i] <= 15) MonitorSerial.print("0");
    MonitorSerial.print(buf_1[i], HEX);
    MonitorSerial.print(" ");
  }

  // add stubs if data length is less than CAN_PAYLOAD_SIZE_BYTES
  if (CAN_Msg_Length < CAN_PAYLOAD_SIZE_BYTES) {
    for(int k = 0; k < (CAN_PAYLOAD_SIZE_BYTES - CAN_Msg_Length); k++) {
      MonitorSerial.print("xx");
      MonitorSerial.print(" ");
    }
  }
 
  MonitorSerial.print("  ");
 
  // print CAN message length (in bytes)
  MonitorSerial.print(CAN_Msg_Length, DEC);
  MonitorSerial.print("  ");

  // print the bus number
  MonitorSerial.print(bus);
  MonitorSerial.print("\t");

  // print the message counter
  MonitorSerial.print(cntrMsgSerialPrint);
  MonitorSerial.print("\t");

  // print the time interval between 2 messages
  MonitorSerial.print(dt_s, 3);
  MonitorSerial.print(" s\r\n");
  cntrMsgSerialPrint++;;
}
 
Envisioned solution:
- 2 threads on Teensy: one for Serial Tx in the main loop, another one for Serial Rx in a second loop, e.g. yield(), serialEvent(), or TeensyThread.
- synchronization between Teensy and Processing Tx buffers: either hardware (RTS/CTS pins of UART) or software (XON/XOF or a lightweight custom protocol with acknowlendgements)

Any thoughts?
 
Thank you. The issue is not with serial Rx itself, where cpb is used. Serial Rx always receives correct data. The issue is with serial Tx which gets interrupted by serial Rx. I made cpb a global variable, not to recreate it in each loop. I also erased flush(). However, the issue still remains.
You are clobbering cpb everytime round loop()...

You don't need threads with loop(), just make sure to handle serial reading, and serial writing, independently, everytime round loop.

The logic should be like this:

in loop, if there is a character to read, read it. If there is then a whole packet to process, process it, and _only then_ clear the input buffer again
in loop, if there is still characters in the output buffer to send, and space in the transmit buffer, write those characters _but no more_.

That way loop never blocks. Make sure that packet processing is quick and doesn't block (say by trying to write lots and lots immediately).
 
Back
Top