Hello!
I'm working on a project which requires the online implementation of a large number of digital low pass filters (moving averages). I am using an array of uint16_t ring buffers to store all of these moving averages and handle the supporting data members (i.e. current sum, size, etc...). I used to store all of these in either RAM1 or RAM2 depending on which one had the most space, but I wanted to extend the maximum filter length by using the external memory chip option. The problem I am facing is that reading and writing to PSRAM takes time (I believe the communication is handled over SPI if I'm not mistaken) and since this is a realtime system, microseconds (or tens of nanoseconds) can impact system performance by introducing delays in my ADC sampling loop. What I am seeing now is that when I move my ringbuffer array from RAM1 over to PSRAM using the EXTMEM keyword, I observe a large amount of variability in my system's sampling rate. What used to be 60us period +-1us is now ~80us +- 20us.
Relevant Code:
Here is the ring buffer class (modified lib from Jean-Luc)
Here is the ring buffer declaration:
And an example of usage (noting that typically the "i" here is defined within some loop, and this code would be contained within a helper function in the main code):
My question is this:
Other than reducing the number of function calls (i.e. memory reads and writes), is there a way I can speed up or maximize efficiency of memory access so that I don't run into severe latency issues here? One thought was to try moving only the ring buffer onto PSRAM and all helper data members on to RAM but I'm not exactly sure how I can do this since defining data members as EXTMEM within a class definition throws an error on compile.
Looking at the PSRAM chip (APS6404L_3SQR) datasheet, it seems there is a fast read mode and various clock settings as well so perhaps there's hope for some optimizing here? Any input here would be greatly appreciated!
Thanks!
Andy
I'm working on a project which requires the online implementation of a large number of digital low pass filters (moving averages). I am using an array of uint16_t ring buffers to store all of these moving averages and handle the supporting data members (i.e. current sum, size, etc...). I used to store all of these in either RAM1 or RAM2 depending on which one had the most space, but I wanted to extend the maximum filter length by using the external memory chip option. The problem I am facing is that reading and writing to PSRAM takes time (I believe the communication is handled over SPI if I'm not mistaken) and since this is a realtime system, microseconds (or tens of nanoseconds) can impact system performance by introducing delays in my ADC sampling loop. What I am seeing now is that when I move my ringbuffer array from RAM1 over to PSRAM using the EXTMEM keyword, I observe a large amount of variability in my system's sampling rate. What used to be 60us period +-1us is now ~80us +- 20us.
Relevant Code:
Here is the ring buffer class (modified lib from Jean-Luc)
Code:
/*
* Ring Buffer Library for Arduino
*
* Copyright Jean-Luc Béchennec 2018
*
* This software is distributed under the GNU Public Licence v2 (GPLv2)
*
* Please read the LICENCE file
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
/*
* Note about interrupt safe implementation
*
* To be safe from interrupts, a sequence of C instructions must be framed
* by a pair of interrupt disable and enable instructions and ensure that the
* compiler will not move writing of variables to memory outside the protected
* area. This is called a critical section. Usually the manipulated variables
* receive the volatile qualifier so that any changes are immediately written
* to memory. Here the approach is different. First of all you have to know
* that volatile is useless if the variables are updated in a function and
* that this function is called within the critical section. Indeed, the
* semantics of the C language require that the variables in memory be updated
* before returning from the function. But beware of function inlining because
* the compiler may decide to delete a function call in favor of simply
* inserting its code in the caller. To force the compiler to use a real
* function call, __attribute__((noinline)) have been added to the push and
* pop functions. In this way the lockedPush and lockedPop functions ensure
* that in the critical section a push and pop function call respectively will
* be used by the compiler. This ensures that, because of the function call,
* the variables are written to memory in the critical section and also
* ensures that, despite the reorganization of the instructions due to
* optimizations, the critical section will be well opened and closed at the
* right place because function calls, due to potential side effects, are not
* subject to such reorganizations.
*/
#ifndef __RINGBUF_H__
#define __RINGBUF_H__
#include <Arduino.h>
/*
* Set the integer size used to store the size of the buffer according of
* the size given in the template instanciation. Thanks to Niklas Gürtler
* to share his knowledge of C++ template meta programming.
* https://niklas-guertler.de/
*
* If Index argument is true, the ring buffer has a size and an index
* stored in an uint8_t (Type below) because its size is within [1,255].
* Intermediate computation may need an uint16_t (BiggerType below).
* If Index argument is false, the ring buffer has a size and an index
* stored in an uint16_t (Type below) because its size is within [256,65535].
* Intermediate computation may need an uint32_t (BiggerType below).
*/
namespace RingBufHelper {
template<bool fits_in_uint8_t> struct Index {
using Type = uint16_t; /* index of the buffer */
using BiggerType = uint32_t; /* for intermediate calculation */
};
template<> struct Index<false> {
using Type = uint8_t; /* index of the buffer */
using BiggerType = uint16_t; /* for intermediate calculation */
};
}
template <
typename ET,
size_t S,
typename IT = typename RingBufHelper::Index<(S > 255)>::Type,
typename BT = typename RingBufHelper::Index<(S > 255)>::BiggerType
>
class RingBuf
{
/*
* check the size is greater than 0, otherwise emit a compile time error
*/
static_assert(S > 0, "RingBuf with size 0 are forbidden");
/*
* check the size is lower or equal to the maximum uint16_t value,
* otherwise emit a compile time error
*/
static_assert(S <= UINT16_MAX, "RingBuf with size greater than 65535 are forbidden");
private:
ET mBuffer[S];
IT mReadIndex;
IT mSize;
uint32_t curSum = 0;
IT writeIndex();
public:
/* Constructor. Init mReadIndex to 0 and mSize to 0 */
RingBuf();
//Return the curSum value
uint32_t getCurSum();
/* Push a large number of elements to the end of the buffer. */
bool largePush(const ET * const inElement, uint16_t numElements);
/* Push a data at the end of the buffer */
bool push(const ET inElement) __attribute__ ((noinline));
/* Push a data at the end of the buffer. Copy it from its pointer */
bool push(const ET * const inElement) __attribute__ ((noinline));
/* Push a data at the end of the buffer with interrupts disabled */
bool lockedPush(const ET inElement);
/* Push a data at the end of the buffer with interrupts disabled. Copy it from its pointer */
bool lockedPush(const ET * const inElement);
/* Pop a chunk of data (of size numElements) from the ring buffer */
bool largePop(ET *outElement, uint16_t numElements);
/* Pop the data at the beginning of the buffer */
bool pop(ET &outElement) __attribute__ ((noinline));
/* Pop the data at the beginning of the buffer with interrupt disabled */
bool lockedPop(ET &outElement);
/* Return true if the buffer is full */
bool isFull() { return mSize == S; }
/* Return true if the buffer is empty */
bool isEmpty() { return mSize == 0; }
/* Reset the buffer to an empty state */
void clear() { mSize = 0; }
/* return the size of the buffer */
IT size() { return mSize; }
/* return the maximum size of the buffer */
IT maxSize() { return S; }
/* access the buffer using array syntax, not interrupt safe */
ET &operator[](IT inIndex);
bool peek(ET &outElement, const std::size_t distance = 0) __attribute__ ((noinline));
bool lockedPeek(ET &outElement, const std::size_t distance = 0);
};
template <typename ET, size_t S, typename IT, typename BT>
IT RingBuf<ET, S, IT, BT>::writeIndex()
{
BT wi = (BT)mReadIndex + (BT)mSize;
if (wi >= (BT)S) wi -= (BT)S;
return (IT)wi;
}
template <typename ET, size_t S, typename IT, typename BT>
RingBuf<ET, S, IT, BT>::RingBuf() :
mReadIndex(0),
mSize(0)
{
}
template <typename ET, size_t S, typename IT, typename BT>
uint32_t RingBuf<ET, S, IT, BT>::getCurSum()
{
return curSum;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::push(const ET inElement)
{
if (isFull()) return false;
mBuffer[writeIndex()] = inElement;
mSize++;
curSum += inElement;//May want to check here for overflow?
return true;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::largePush(const ET * const inElement, uint16_t numElements)
{
if ((uint32_t)mSize + (uint32_t)numElements >= S) return false;
//mBuffer[writeIndex()] = *inElement;
uint16_t wi = writeIndex();
uint32_t tmp = (uint32_t)wi + (uint32_t)numElements;
uint32_t size1 = numElements;
uint32_t size2 = 0;
if(tmp >= S){
//Need to handle the case when the working index splits and we need to copy two chunks of data from inElement
size2 = tmp-S;
size1 = numElements - size2;
memcpy(&mBuffer[wi],inElement,size1*2);//times two since these are uint16_t's which have two bytes
memcpy(&mBuffer[wi+size1-S],inElement+size1,size2*2);
}
else{
memcpy(&mBuffer[wi],inElement,numElements*2);
}
mSize += numElements;
return true;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::push(const ET * const inElement)
{
if (isFull()) return false;
mBuffer[writeIndex()] = *inElement;
mSize++;
return true;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::lockedPush(const ET inElement)
{
noInterrupts();
bool result = push(inElement);
interrupts();
return result;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::lockedPush(const ET * const inElement)
{
noInterrupts();
bool result = push(inElement);
interrupts();
return result;
}
//Need to check if the number of elements we want to pop actually are in the ring buffer
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::largePop(ET *outElement, uint16_t numElements)
{
if (uint32_t(mSize) < uint32_t(numElements)) return false;
uint32_t tmp = (uint32_t)mReadIndex + (uint32_t)numElements;
uint32_t size1 = numElements;
uint32_t size2 = 0;
if(tmp >= S){
//Need to handle the case when the working index splits and we need to copy two chunks of data from inElement
size2 = tmp - S;
size1 = numElements - size2;
memcpy(outElement,&mBuffer[mReadIndex],size1*2);
memcpy(outElement+size1,&mBuffer[mReadIndex+size1-S],size2*2);
}
else{
memcpy(outElement, &mBuffer[mReadIndex], numElements*2);
}
//Serial.write((byte*)mBuffer[mReadIndex], numElements*2);
mReadIndex = ((uint32_t)mReadIndex + (uint32_t)numElements);
if (mReadIndex >= S) mReadIndex -= S;
mSize -= numElements;
return true;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::pop(ET &outElement)
{
if (isEmpty()) return false;
outElement = mBuffer[mReadIndex];
mReadIndex++;
mSize--;
curSum -= uint32_t(outElement);
if (mReadIndex == S) mReadIndex = 0;
return true;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::lockedPop(ET &outElement)
{
noInterrupts();
bool result = pop(outElement);
interrupts();
return result;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::peek(ET &outElement, const std::size_t distance)
{
if (isEmpty() || size() < distance) return false;
//Take care of the wrap around
std::size_t temp_read_index = mReadIndex + distance;
if(temp_read_index >= S) {
temp_read_index -= S;
}
outElement = mBuffer[temp_read_index];
return true;
}
template <typename ET, size_t S, typename IT, typename BT>
bool RingBuf<ET, S, IT, BT>::lockedPeek(ET &outElement, const std::size_t distance)
{
noInterrupts();
bool result = peek(outElement,distance);
interrupts();
return result;
}
template <typename ET, size_t S, typename IT, typename BT>
ET &RingBuf<ET, S, IT, BT>::operator[](IT inIndex)
{
if (inIndex >= mSize) return mBuffer[0];
BT index = (BT)mReadIndex + (BT)inIndex;
if (index >= (BT)S) index -= (BT)S;
return mBuffer[(IT)index];
}
#endif /* __RINGBUF_H__ */
Here is the ring buffer declaration:
Code:
// Rolling average allocation space size
#define maxRollingAvgLengthFilter 8000
EXTMEM RingBuf<uint16_t, maxRollingAvgLengthFilter> rollingAvgDAC[16][16];
And an example of usage (noting that typically the "i" here is defined within some loop, and this code would be contained within a helper function in the main code):
Code:
i = 0;
uint32_t tmpRaDAC = 0, tmpSize = rollingAvgDAC[curCol][i].size();
tmpRaDAC = ((rollingAvgDAC[curCol][i].getCurSum() + tmpSize/2) / tmpSize); //Calculate the new filtered DAC value
My question is this:
Other than reducing the number of function calls (i.e. memory reads and writes), is there a way I can speed up or maximize efficiency of memory access so that I don't run into severe latency issues here? One thought was to try moving only the ring buffer onto PSRAM and all helper data members on to RAM but I'm not exactly sure how I can do this since defining data members as EXTMEM within a class definition throws an error on compile.
Looking at the PSRAM chip (APS6404L_3SQR) datasheet, it seems there is a fast read mode and various clock settings as well so perhaps there's hope for some optimizing here? Any input here would be greatly appreciated!
Thanks!
Andy