Forum Rule: Always post complete source code & details to reproduce any issue!
Results 1 to 9 of 9

Thread: Most robust method to stream UDP using QNEthernet

  1. #1

    Most robust method to stream UDP using QNEthernet

    I have an application where I need to stream 128k packets per second of 18bytes over 100Mbit ethernet.

    I am able to succeed at 64ksps using QNEthernet and the function:

    Udp.send(clientIP,loggerConfiguration.udpStreamPor t, fileBuffer, bytecount);

    I do check the success of this function and at the higher rate it fails very often indeed, often with only 1 to 2 bytes of the packet making it out.

    Should I be reverting to the previous 'higher overhead' method using

    Udp.beginpacket()
    Udp.write()
    Udp.endpacket()

    So that I can see how many bytes have been written and then keep trying until they are all written?

    Is there another approach to obtain a more robust method of getting all the bytes out?

    I do not think that the 128k x 18 is a high byte rate, hence maybe there is a setting change I need to make for this QNEthernet library?

    Craig

  2. #2
    Senior Member
    Join Date
    Oct 2016
    Posts
    1,048
    I do not think that the 128k x 18 is a high byte rate
    It may not be a high data rate, but it's a high packet rate, and a lot of overhead. Can you combine samples and send fewer packets?

  3. #3
    The packet rate has been tried at various multiples of the sample rate ie 10x, 100x and 200x without any change.

    I've determined the issue is with the data source getting corrupted not the packet sending....doesn't make the issue easier but at least I now have it narrowed down a bit more.

  4. #4
    Senior Member
    Join Date
    Mar 2017
    Location
    Oakland, CA, USA
    Posts
    702
    I can confirm that `EthernetUDP::send()` does just about the exact same thing as beginPacket()/write()/endPacket(). The difference being you're using less internal memory with `send()` (there's a `std::vector` allocated with a size of (likely) 1472 bytes the first time when `beginPacket()` is called). If you already have your data buffered, then `send()` is better.

    If `send()` is returning false then it's either a memory issue or an IP stack issue. Could you try to increase `MEM_SIZE` in lwipopts.h and see what happens? If that doesn't work, see what increasing `PBUF_POOL_SIZE` does.

    I'm pondering the idea of changing the `EthernetUDP::send()` API to return an `int` instead of a `bool`. This way, error codes could be returned, and that would give more information about what's failing.

  5. #5
    Senior Member
    Join Date
    Mar 2017
    Location
    Oakland, CA, USA
    Posts
    702
    There. I just changed `EthernetUDP::send()` to return an `int` and pushed to the GitHub repo (https://github.com/ssilverman/QNEthernet). Could you retry and post which error code (the return value from `send()`) you see?

    Since you haven't posted code, I'm not sure specifically which failure type you're seeing. For example, are you not seeing the packets at the other end, or was `send()` returning `false`? Update: Oh, you said above you check the function success, so I guess you mean the second.

  6. #6
    Hi Shawn,

    Will give it a go tonight, many thanks.

  7. #7
    Quote Originally Posted by strud View Post
    Hi Shawn,

    Will give it a go tonight, many thanks.
    Hi Shawn, sorry but have not checked that response specifically, however I do believe the issue was caused by the circular buffer being corrupted.

    I have implemented my own very lightweight circular buffer and the data coming through is no longer turning to garbage.

    I will be adding debug data to my stream so that I can help debug the state of the buffer and what is being returned from the Ethernet::send() call.

    One question for you though: do you have any experience on what average throughput should be possible with your QNEthernet library using UDP?

    I am needing an average throughput of at least 40Mbits to stream 30byte samples at 128ksps.

  8. #8
    Senior Member
    Join Date
    Mar 2017
    Location
    Oakland, CA, USA
    Posts
    702
    To which circular buffer do you refer? Which library or source file?

    If you run the iperf (v2) app on your computer, and the IPerfServer example on the Teensy, you'll likely see something exceeding 70 or 80Mbps over TCP, and even faster, perhaps close to 95Mbps, if connecting directly to the Teensy. UDP should be able to maintain high speeds too.

  9. #9
    Hi Shawn,

    After further testing I believe my issues are simply running out of processing time in between my interrupts at 128kHz.

    I added debugging information to my data stream (going over UDP or Serial) and could see that at the 128kHz rate there are very odd issues occuring such as variables not incrementing and the circular buffer falling over due to this.
    At slightly lower interrupt rates (eg 85kHz), everything works completely fine never missing a beat.

    I think the only way to improve this is for me to work on methods to reduce processing time in the ISR and on any streaming method.

    Just for completeness, here is my ISR: (the encoder counter is being directly incremented for debugging to detect loss of packets, and the last 3 long ints are also for debugging purposes

    Code:
    void adcDataReadyIsrStream(){
    
        adc.readData(&res);
        ultemp = micros() - loggerStatus.logStartTimeMicro;
        //mCurPosValue = myEnc1.read();     // taking 
        mCurPosValue++;
        
        // update digital inputs here as the ISRs for the digital inputs will be disabled 
        bitWrite(digitalInputs, 0,digitalReadFast(DIN0));
        bitWrite(digitalInputs, 1,digitalReadFast(DIN1));
        bitWrite(digitalInputs, 2,digitalReadFast(DIN2));
        bitWrite(digitalInputs, 3,digitalReadFast(DIN3));
        bitWrite(digitalInputs, 4,digitalReadFast(DIN4));
        bitWrite(digitalInputs, 5,digitalReadFast(DIN5));
        bitWrite(digitalInputs, 6,digitalReadFast(DIN6));
        bitWrite(digitalInputs, 7,digitalReadFast(DIN7));
      
        myQueueWrite(0xAA);
        myQueueWrite((uint8_t)(0xFF & ultemp));               // elapsed time
        myQueueWrite((uint8_t)(ultemp>>8)&0xFF);
        myQueueWrite((uint8_t)(ultemp>>16)&0xFF);
        myQueueWrite((uint8_t)(ultemp>>24)&0xFF);
    
        myQueueWrite((uint8_t)(res.chan1_16&0xFF));       // analog channel 1
        myQueueWrite((uint8_t)(res.chan1_16>>8)&0xFF);
        myQueueWrite((uint8_t)(res.chan2_16&0xFF));       // analog channel 1
        myQueueWrite((uint8_t)(res.chan2_16>>8)&0xFF);
        myQueueWrite((uint8_t)(res.chan3_16&0xFF));       // analog channel 1
        myQueueWrite((uint8_t)(res.chan3_16>>8)&0xFF);
        myQueueWrite((uint8_t)(res.chan4_16&0xFF));       // analog channel 1
        myQueueWrite((uint8_t)(res.chan4_16>>8)&0xFF);
    
        myQueueWrite((uint8_t)(mCurPosValue)&0xFF);      // 32 bit encoder counter
        myQueueWrite((uint8_t)(mCurPosValue>>8)&0xFF);
        myQueueWrite((uint8_t)(mCurPosValue>>16)&0xFF);
        myQueueWrite((uint8_t)(mCurPosValue>>24)&0xFF);
        myQueueWrite(digitalInputs);             // digital inputs, automatically update in own ISRs  
        // all added for debugging
        myQueueWrite((uint8_t)(myCircBuffer.length)&0xFF);      // 32 bit buffer length
        myQueueWrite((uint8_t)(myCircBuffer.length>>8)&0xFF);
        myQueueWrite((uint8_t)(myCircBuffer.length>>16)&0xFF);
        myQueueWrite((uint8_t)(myCircBuffer.length>>24)&0xFF);
    
        myQueueWrite((uint8_t)(myCircBuffer.writeIndex)&0xFF);      // 32 bit buffer head index
        myQueueWrite((uint8_t)(myCircBuffer.writeIndex>>8)&0xFF);
        myQueueWrite((uint8_t)(myCircBuffer.writeIndex>>16)&0xFF);
        myQueueWrite((uint8_t)(myCircBuffer.writeIndex>>24)&0xFF);
    
        myQueueWrite((uint8_t)(myCircBuffer.readIndex)&0xFF);      // 32 bit buffer tail index
        myQueueWrite((uint8_t)(myCircBuffer.readIndex>>8)&0xFF);
        myQueueWrite((uint8_t)(myCircBuffer.readIndex>>16)&0xFF);
        myQueueWrite((uint8_t)(myCircBuffer.readIndex>>24)&0xFF);
    
        
    }

    And here is the section of my state machine responsible for UDP streaming

    Code:
        case st_stream_udp: {
             if ((loggerControl.execute==0)||(timeAnt <= millis())) {
                // shut off interrupts
                noInterrupts();
                detachInterrupt(ADC_DRDY);
                interrupts();
                if (bytecount>0) Udp.send(clientIP,loggerConfiguration.udpStreamPort, udpStreamBuffer, bytecount);
                Serial.println("->st_stream_completed");
                state = st_stream_completed;
             }
             
             else {
                if (myCircBuffer.length>0) { 
                    bytecount=0;
                    while ((myCircBuffer.length>0)&&(loggerControl.execute==1)&&(timeAnt >= millis())){
                        udpStreamBuffer[bytecount++]=myQueueDeque(); 
                        if ((bytecount >= 20*(SAMPLE_SIZE_ALL+13))){
                          bytesWritten=0;
                          attempts=0;
                          Udp.send(clientIP,loggerConfiguration.udpStreamPort, udpStreamBuffer, bytecount);
                          bytecount=0;
                        }
                        if (myQueueFull()) loggerStatus.queueFull=1;
                        else loggerStatus.queueFull=0;
    
                        if (millis()> timeAnt) {
                          noInterrupts();
                          detachInterrupt(ADC_DRDY);
                          interrupts();
                        }
                    }
                }
             } 
        }
        break;
    Here is my super simple circular buffer implementation

    Code:
    volatile uint8_t uresult;
    
    volatile uint8_t myQueueReset(){
        myCircBuffer.writeIndex=0;
        myCircBuffer.readIndex=0;
        myCircBuffer.length=0;
        myCircBuffer.capacity = BIG_BUFFER_SIZE;//  CIRC_BUFFER_SIZE;
        myCircBuffer.full=0;
        myCircBuffer.behind=0;
      return 0;
    }
    void myQueueWrite(volatile uint8_t value){
      //circularBuffer[myCircBuffer.writeIndex++] = value;
      bigbuffer[myCircBuffer.writeIndex] = value;
      myCircBuffer.writeIndex++;
      myCircBuffer.length++;
      if (myCircBuffer.writeIndex == BIG_BUFFER_SIZE) myCircBuffer.writeIndex=0;
    }
    volatile uint8_t myQueueDeque(void){
       //uresult = circularBuffer[myCircBuffer.readIndex++]; 
       uresult = bigbuffer[myCircBuffer.readIndex];
       myCircBuffer.readIndex++; 
       myCircBuffer.length--;
       if (myCircBuffer.readIndex == BIG_BUFFER_SIZE) myCircBuffer.readIndex=0;
       
       return uresult;
    }
    volatile uint8_t myQueueBehind(void){
      if (myCircBuffer.length >0) return 1;
      else return 0;
    }
    volatile uint8_t myQueueFull(void){
      if (myCircBuffer.length >= myCircBuffer.capacity) return 1;
      else return 0;
    }
    volatile uint32_t myQueueSpaceRemaining(void){
      return BIG_BUFFER_SIZE - myCircBuffer.length;
    }

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •