Help me optimise this code

frankzappa

Well-known member
I wrote a class that does some calculations. It compares a 1d signal with a template of a peak and calculates the absolute difference after normalisation. I have a function that triggers based on a threshold but I want to only trigger signals of a certain shape and this class does this.

I'm using the circular buffer library to store previous values, it's inside the m_dataStorage referenced member.

Note that I'm only using the ComputeManhattan method called from the "ExecuteAndStore" method and I'm creating the object at compile time. Can I speed this up somehow? It works great but it's too slow. I'm planning to prune the values because I can calculate on every other sensor reading but if something more can be done I'm all ears. I'm using the teensy 4.

Here is the .h file:

Code:
#pragma once
#include <cmath>
#include <algorithm>
#include "ProcessedSensorData.h"

class CrossCorrelation {
private:
    float* m_templateSignal;
    float* m_targetSignal;
    const int m_signalLength;
    ProcessedSensorData& m_dataStorage;

public:
    CrossCorrelation(const float* templateSignal, const int signalLength, ProcessedSensorData& dataStorage);
    ~CrossCorrelation() {
        delete[] m_templateSignal;
        delete[] m_targetSignal;
    }
    
    float ExecuteAndStore(int sensorID) {
        //calculate similarity of signals with a template signal
        float xCorr = 0.0f;
        xCorr = ComputeManhattan(sensorID);
        xCorr = std::min(xCorr, 30.0f);
        //store the result in buffer
        m_dataStorage.headPeakSimilarity.at(sensorID).push_front(xCorr);
        return xCorr;
    }
private:
    void NormalizeSignal(int sensorID);
    float Compute(int sensorID);
    inline float ComputeManhattan(int sensorID) {
    NormalizeSignal(sensorID);
    float result = 0.0f;
    for (int i = 0; i < m_signalLength; i++) {
        result += (std::fabs(m_targetSignal[i] - m_templateSignal[i]));
    }
    return result;
    }
};

Here is the .cpp file:

Code:
#include "CrossCorrelation.h"

CrossCorrelation::CrossCorrelation(const float* templateSignal, const int signalLength, ProcessedSensorData& dataStorage) 
    :
    m_signalLength(signalLength),
    m_dataStorage(dataStorage)
{
    m_templateSignal = new float[signalLength];
    m_targetSignal = new float[signalLength];
    
    for (int i = 0; i < m_signalLength; i++) {
        m_templateSignal[i] = templateSignal[i];
        m_targetSignal[i] = 0.0f;
    }
    std::reverse(m_templateSignal, m_templateSignal + m_signalLength);
    //normalise template signal
    float max = 0.0f;
    //float average = 0.0f;
    for (int i = 0; i < m_signalLength; i++) {
        max = std::max(m_templateSignal[i], max);
        //average += m_templateSignal[i];
    }
    //average /= m_signalLength;
    //max -= average;
    //std::max(max, 10.0f);

    for (int i = 0; i < m_signalLength; i++) {
        m_templateSignal[i] /= max;
    }
}

float CrossCorrelation::Compute(int sensorID) {
    NormalizeSignal(sensorID);
    float result = 0.0f;
    for (int i = 0; i < m_signalLength; i++) {
        result += m_targetSignal[i] * m_templateSignal[i];
    }
    return result;
}



void CrossCorrelation::NormalizeSignal(int sensorID) {
    float max = 0.0f;
    //float average = 0.0f;

    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] = m_dataStorage.headSimilaritySignal.at(sensorID).peek(i);
        if(m_targetSignal[i] > max) {
            max = m_targetSignal[i];
        }
        //average += m_targetSignal[i];
    }
    //average /= m_signalLength;
    //max -= average;
    std::max(max, 10.0f);
    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] /= max;
    }
}
 
These are the functions that are run continuously and I'm wondering if there is anything I'm doing that's stupid and will waste cycles because it's pretty slow right now.
I'm normalising a 1d signal of 60 elements and calculating the absolute difference between that and a template of the same size. It takes like 2100 cycles which seems kind of high. I need to do this between sensor reads and it's too slow.

Code:
    inline float ComputeManhattan(int sensorID) {
    NormalizeSignal(sensorID);
    float result = 0.0f;
    for (int i = 0; i < m_signalLength; i++) {
        result += (std::fabs(m_targetSignal[i] - m_templateSignal[i]));
    }
    return result;
    }
};



Code:
void CrossCorrelation::NormalizeSignal(int sensorID) {
    float max = 0.0f;
   

    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] = m_dataStorage.headSimilaritySignal.at(sensorID).peek(i);
        if(m_targetSignal[i] > max) {
            max = m_targetSignal[i];
        }
       
    }
    
    std::max(max, 10.0f);
    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] /= max;
    }
}
 
I've solved it (sort of). Made it go from 3.5 microseconds per sensor to 1.7 which is good enough. I got rid of one of the loops by combining the nompalise into the function. I tried various things but unrolling the loop basically cut it in half and that works for my use case although it's a monstrosity. Also std::array was more performant than c-arrays strangely. The std::fabs was faster than any home cooked version I could come up with like x > 0 ? -x : x for instance.

BTW this is not premature optimisation, it was a bottleneck.
 
Cool. 35 cycles per float element didn't seem unexpectedly out of bounds
Wonder if the array diff is because it might use *ptr indexing versus [ii] indexing ...
 
Cool. 35 cycles per float element didn't seem unexpectedly out of bounds
Wonder if the array diff is because it might use *ptr indexing versus [ii] indexing ...

After pruning so that every other sample is used and unrolling it's now really fast. Went from 3.5 to below 1 micros per call. It's less elements but sufficient for my use case.

std::array was faster but negligible. I did 14 calls with arrays of size 55 and measured the time and it was 3 micros faster, from 48 to 45 micros. I think *ptr indexing is faster.

I have one thing yet to try, copying the whole chunk of data from the circular buffer into the array (there is a built in function I think). Now I'm peek() ing into every element in the unrolled loop, might save a few cycles as well.
 
Before you posted the enormous gains I was going to ask how the samples are acquired? ... Snippets don't show that ...

And given floats - maybe it doesn't involve an ADC digital value read at all ...

If reading is waiting on ADC to complete - at least with the ADC library that has a non-blocking read Start - the Read could be started - then MATH done on PRIOR Read array element - then read the NEW value 'when ADC complete'.

At the end of that the prior values would be normalized for use - and the next set of RAW values would be ready for the next pass.

This would always put the normalization one step behind the most recent data - but those MATH cycles would be done 'for free' while the ADC sample was acquired.
 
Before you posted the enormous gains I was going to ask how the samples are acquired? ... Snippets don't show that ...

And given floats - maybe it doesn't involve an ADC digital value read at all ...

If reading is waiting on ADC to complete - at least with the ADC library that has a non-blocking read Start - the Read could be started - then MATH done on PRIOR Read array element - then read the NEW value 'when ADC complete'.

At the end of that the prior values would be normalized for use - and the next set of RAW values would be ready for the next pass.

This would always put the normalization one step behind the most recent data - but those MATH cycles would be done 'for free' while the ADC sample was acquired.

I'm reading sensor pairs and processing the two previous pairs so it's as good as it can get I think. I'm doing all calculations while waiting for sensor reads. Problem before was I had about 5 micros before the sensor pair was ready but the calculations took 7 micros so was slowing it down and making all my filters not have the right values any more because they are pre calculated for a certain sample rate.

It's actually based on ADC code you wrote for me when I just started programming. Still using that technique for reading sensors haha. I'm filtering the values before the normalisation and curve fitting stuff so they are floats.

Now since I'm doing every other loop cycle for this code I'm processing only one sensor for every pair at a time so it's super fast, like 1 microsecond or so. Problem is solved.
 
Code:
    std::max(max, 10.0f);
    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] /= max;
    }

Divisions are slow. I have not tried it, but maybe it helps to multiply with 1.0f/max instead:
Code:
    std::max(max, 10.0f);
    max = 1.0f/max;
    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] *= max;
    }

Not sure if this is helpful for your one-loop solution.
 
Divisions are slow. I have not tried it, but maybe it helps to multiply with 1.0f/max instead:
Code:
    std::max(max, 10.0f);
    max = 1.0f/max;
    for (int i = 0; i < m_signalLength; i++) {
        m_targetSignal[i] *= max;
    }

Not sure if this is helpful for your one-loop solution.

Thanks. I’ll see if it makes a difference. I think division and multiplication are the same on the teensy 4 but I haven’t measured it.
 
No, a floating point multiply needs up to 3 cycles, division up to 18.

https://www.quinapalus.com/cm7cycles.html

Really? In that case then it should make a big difference. I think I read in a post on the forum that it was 1 cycle for division/multiplication/addition/subtraction with floats and two cycles for doubles. I believe it was PaulStoffegren who wrote it but I guess I'm mistaken. Also he said teensy will use double precision if you don't append literals with "f".
 
1 cycle for a div is not even for 16/32-bit-integer true :) udiv is 3 cycle minimum, in most cases more.
a div is expensive.
 
I will try it, it would explain why it is so slow. I was expecting a lot fewer cycles. I assumed it was all the comparisons for the loop. Unrolling the loop made it twice as fast but if this works and is much faster maybe I can keep the for loop or maybe make a loop that executes 10 calculations per loop cycle. Loop is much more flexible.
 
It will be not _that_ much faster, because the count of loops is important, too. an unrolled loop has much less jumps and compares. jumps are a bit expensive, too (not as much as a division)
But both together is ok :)
 
Well I should be able to save like 1000 cycles per sensor pair and that is a lot. I'm doing this for 5 sensor pairs in between the sensor readings and I only have 5 microseconds of total time to calculate a sensor pair until the next pair is ready.
 
I changed the division to a multiplication as suggested and the program seems to work the same. I'll have to measure it.
 
I executed 10 of these calculations and printed the result, it's like 10 times faster lol.

Thanks for the link and the suggestion for the optimization. This knowledge will come in handy later when I have to do a lot of similar calculations.
 
You could remove the second loop in NormalizeSignal() and just return the max, then perform the normalization while calculating the abs delta in ComputeManhattan().
 
Just out of curiosity, have you tried asking openAI for suggestions on how to optimize it? That seems like it would be one of its ideal intended purposes.
 
You could remove the second loop in NormalizeSignal() and just return the max, then perform the normalization while calculating the abs delta in ComputeManhattan().

I already did that if you read my comments above :)

Thanks for the suggestion.
 
Back
Top