Most robust method to stream UDP using QNEthernet

strud

Well-known member
I have an application where I need to stream 128k packets per second of 18bytes over 100Mbit ethernet.

I am able to succeed at 64ksps using QNEthernet and the function:

Udp.send(clientIP,loggerConfiguration.udpStreamPort, fileBuffer, bytecount);

I do check the success of this function and at the higher rate it fails very often indeed, often with only 1 to 2 bytes of the packet making it out.

Should I be reverting to the previous 'higher overhead' method using

Udp.beginpacket()
Udp.write()
Udp.endpacket()

So that I can see how many bytes have been written and then keep trying until they are all written?

Is there another approach to obtain a more robust method of getting all the bytes out?

I do not think that the 128k x 18 is a high byte rate, hence maybe there is a setting change I need to make for this QNEthernet library?

Craig
 
The packet rate has been tried at various multiples of the sample rate ie 10x, 100x and 200x without any change.

I've determined the issue is with the data source getting corrupted not the packet sending....doesn't make the issue easier but at least I now have it narrowed down a bit more.
 
I can confirm that `EthernetUDP::send()` does just about the exact same thing as beginPacket()/write()/endPacket(). The difference being you're using less internal memory with `send()` (there's a `std::vector` allocated with a size of (likely) 1472 bytes the first time when `beginPacket()` is called). If you already have your data buffered, then `send()` is better.

If `send()` is returning false then it's either a memory issue or an IP stack issue. Could you try to increase `MEM_SIZE` in lwipopts.h and see what happens? If that doesn't work, see what increasing `PBUF_POOL_SIZE` does.

I'm pondering the idea of changing the `EthernetUDP::send()` API to return an `int` instead of a `bool`. This way, error codes could be returned, and that would give more information about what's failing.
 
There. I just changed `EthernetUDP::send()` to return an `int` and pushed to the GitHub repo (https://github.com/ssilverman/QNEthernet). Could you retry and post which error code (the return value from `send()`) you see?

Since you haven't posted code, I'm not sure specifically which failure type you're seeing. For example, are you not seeing the packets at the other end, or was `send()` returning `false`? Update: Oh, you said above you check the function success, so I guess you mean the second. :)
 
Hi Shawn,

Will give it a go tonight, many thanks.

Hi Shawn, sorry but have not checked that response specifically, however I do believe the issue was caused by the circular buffer being corrupted.

I have implemented my own very lightweight circular buffer and the data coming through is no longer turning to garbage.

I will be adding debug data to my stream so that I can help debug the state of the buffer and what is being returned from the Ethernet::send() call.

One question for you though: do you have any experience on what average throughput should be possible with your QNEthernet library using UDP?

I am needing an average throughput of at least 40Mbits to stream 30byte samples at 128ksps.
 
To which circular buffer do you refer? Which library or source file?

If you run the iperf (v2) app on your computer, and the IPerfServer example on the Teensy, you'll likely see something exceeding 70 or 80Mbps over TCP, and even faster, perhaps close to 95Mbps, if connecting directly to the Teensy. UDP should be able to maintain high speeds too.
 
Hi Shawn,

After further testing I believe my issues are simply running out of processing time in between my interrupts at 128kHz.

I added debugging information to my data stream (going over UDP or Serial) and could see that at the 128kHz rate there are very odd issues occuring such as variables not incrementing and the circular buffer falling over due to this.
At slightly lower interrupt rates (eg 85kHz), everything works completely fine never missing a beat.

I think the only way to improve this is for me to work on methods to reduce processing time in the ISR and on any streaming method.

Just for completeness, here is my ISR: (the encoder counter is being directly incremented for debugging to detect loss of packets, and the last 3 long ints are also for debugging purposes

Code:
void adcDataReadyIsrStream(){

    adc.readData(&res);
    ultemp = micros() - loggerStatus.logStartTimeMicro;
    //mCurPosValue = myEnc1.read();     // taking 
    mCurPosValue++;
    
    // update digital inputs here as the ISRs for the digital inputs will be disabled 
    bitWrite(digitalInputs, 0,digitalReadFast(DIN0));
    bitWrite(digitalInputs, 1,digitalReadFast(DIN1));
    bitWrite(digitalInputs, 2,digitalReadFast(DIN2));
    bitWrite(digitalInputs, 3,digitalReadFast(DIN3));
    bitWrite(digitalInputs, 4,digitalReadFast(DIN4));
    bitWrite(digitalInputs, 5,digitalReadFast(DIN5));
    bitWrite(digitalInputs, 6,digitalReadFast(DIN6));
    bitWrite(digitalInputs, 7,digitalReadFast(DIN7));
  
    myQueueWrite(0xAA);
    myQueueWrite((uint8_t)(0xFF & ultemp));               // elapsed time
    myQueueWrite((uint8_t)(ultemp>>8)&0xFF);
    myQueueWrite((uint8_t)(ultemp>>16)&0xFF);
    myQueueWrite((uint8_t)(ultemp>>24)&0xFF);

    myQueueWrite((uint8_t)(res.chan1_16&0xFF));       // analog channel 1
    myQueueWrite((uint8_t)(res.chan1_16>>8)&0xFF);
    myQueueWrite((uint8_t)(res.chan2_16&0xFF));       // analog channel 1
    myQueueWrite((uint8_t)(res.chan2_16>>8)&0xFF);
    myQueueWrite((uint8_t)(res.chan3_16&0xFF));       // analog channel 1
    myQueueWrite((uint8_t)(res.chan3_16>>8)&0xFF);
    myQueueWrite((uint8_t)(res.chan4_16&0xFF));       // analog channel 1
    myQueueWrite((uint8_t)(res.chan4_16>>8)&0xFF);

    myQueueWrite((uint8_t)(mCurPosValue)&0xFF);      // 32 bit encoder counter
    myQueueWrite((uint8_t)(mCurPosValue>>8)&0xFF);
    myQueueWrite((uint8_t)(mCurPosValue>>16)&0xFF);
    myQueueWrite((uint8_t)(mCurPosValue>>24)&0xFF);
    myQueueWrite(digitalInputs);             // digital inputs, automatically update in own ISRs  
    // all added for debugging
    myQueueWrite((uint8_t)(myCircBuffer.length)&0xFF);      // 32 bit buffer length
    myQueueWrite((uint8_t)(myCircBuffer.length>>8)&0xFF);
    myQueueWrite((uint8_t)(myCircBuffer.length>>16)&0xFF);
    myQueueWrite((uint8_t)(myCircBuffer.length>>24)&0xFF);

    myQueueWrite((uint8_t)(myCircBuffer.writeIndex)&0xFF);      // 32 bit buffer head index
    myQueueWrite((uint8_t)(myCircBuffer.writeIndex>>8)&0xFF);
    myQueueWrite((uint8_t)(myCircBuffer.writeIndex>>16)&0xFF);
    myQueueWrite((uint8_t)(myCircBuffer.writeIndex>>24)&0xFF);

    myQueueWrite((uint8_t)(myCircBuffer.readIndex)&0xFF);      // 32 bit buffer tail index
    myQueueWrite((uint8_t)(myCircBuffer.readIndex>>8)&0xFF);
    myQueueWrite((uint8_t)(myCircBuffer.readIndex>>16)&0xFF);
    myQueueWrite((uint8_t)(myCircBuffer.readIndex>>24)&0xFF);

    
}


And here is the section of my state machine responsible for UDP streaming

Code:
    case st_stream_udp: {
         if ((loggerControl.execute==0)||(timeAnt <= millis())) {
            // shut off interrupts
            noInterrupts();
            detachInterrupt(ADC_DRDY);
            interrupts();
            if (bytecount>0) Udp.send(clientIP,loggerConfiguration.udpStreamPort, udpStreamBuffer, bytecount);
            Serial.println("->st_stream_completed");
            state = st_stream_completed;
         }
         
         else {
            if (myCircBuffer.length>0) { 
                bytecount=0;
                while ((myCircBuffer.length>0)&&(loggerControl.execute==1)&&(timeAnt >= millis())){
                    udpStreamBuffer[bytecount++]=myQueueDeque(); 
                    if ((bytecount >= 20*(SAMPLE_SIZE_ALL+13))){
                      bytesWritten=0;
                      attempts=0;
                      Udp.send(clientIP,loggerConfiguration.udpStreamPort, udpStreamBuffer, bytecount);
                      bytecount=0;
                    }
                    if (myQueueFull()) loggerStatus.queueFull=1;
                    else loggerStatus.queueFull=0;

                    if (millis()> timeAnt) {
                      noInterrupts();
                      detachInterrupt(ADC_DRDY);
                      interrupts();
                    }
                }
            }
         } 
    }
    break;

Here is my super simple circular buffer implementation

Code:
volatile uint8_t uresult;

volatile uint8_t myQueueReset(){
    myCircBuffer.writeIndex=0;
    myCircBuffer.readIndex=0;
    myCircBuffer.length=0;
    myCircBuffer.capacity = BIG_BUFFER_SIZE;//  CIRC_BUFFER_SIZE;
    myCircBuffer.full=0;
    myCircBuffer.behind=0;
  return 0;
}
void myQueueWrite(volatile uint8_t value){
  //circularBuffer[myCircBuffer.writeIndex++] = value;
  bigbuffer[myCircBuffer.writeIndex] = value;
  myCircBuffer.writeIndex++;
  myCircBuffer.length++;
  if (myCircBuffer.writeIndex == BIG_BUFFER_SIZE) myCircBuffer.writeIndex=0;
}
volatile uint8_t myQueueDeque(void){
   //uresult = circularBuffer[myCircBuffer.readIndex++]; 
   uresult = bigbuffer[myCircBuffer.readIndex];
   myCircBuffer.readIndex++; 
   myCircBuffer.length--;
   if (myCircBuffer.readIndex == BIG_BUFFER_SIZE) myCircBuffer.readIndex=0;
   
   return uresult;
}
volatile uint8_t myQueueBehind(void){
  if (myCircBuffer.length >0) return 1;
  else return 0;
}
volatile uint8_t myQueueFull(void){
  if (myCircBuffer.length >= myCircBuffer.capacity) return 1;
  else return 0;
}
volatile uint32_t myQueueSpaceRemaining(void){
  return BIG_BUFFER_SIZE - myCircBuffer.length;
}
 
Back
Top