An approach to Walsh pais with SIMD

In a previous posts I looked into using Single instruction, multiple data, SIMD to find the Median value in a sample integers.

The definition of median in case of

  • an odd sample size, find 'middle' element of the ordered samples.

  • an even sample size, find the 2 elements in the middle of an ordered sample size, and calculate the average of those.

I have recently came across a blog post from Andrey Akinshin proposing to benefits of using Hodges-Lehmann location estimator to calculate a pseudo median.

The Hodges-Lehmann estimator uses Walsh (pairwise) averages as the base of the median. In this blog post I will investigate creating pairwise averages of an array of positive integers.

Walsh averages

Pairwise averages are averages calculated from each pair in a set, including a pair matched with itself. In case a set contains {1, 2, 3} the pairs would be {{1,1}, {1,2}, {1,3}, {2,2}, {2,3}, {3,3}} which results the following averages: {1, 1.5, 2, 2, 2.5, 3}. In the case I am solving, the input integers are large enough, so that the fractional parts of results are negligible.

There are 3 approaches detailed in this post: a traditional non-SIMD solution and two SIMD solutions.

The following regular approach uses two for loops. The outer loop iterates over the input and takes the current element. An inner loop iterates over the remaining part of the array (including the current element). It calculates the average of the current item of the outer loop and the item indexed by the inner loop.

The input data is read from the _source field and the results are written to the _destination field. As the destination field is larger than required a ReadOnlyMemory<int> is returned which refers to the segment of the _destination containing the results.

[Benchmark]
public ReadOnlyMemory<int> RegularSum()
{
    int destIndex = 0;
    for (int i = 0; i < _source.Length; i++)
    {
        var current = _source[i];
        for (int j = i; j < _source.Length; j++)
            _destination[destIndex++] = (current + _source[j]) / 2;
    }
    return _destination.AsMemory(0, Length * (Length + 1) / 2);
}

The second approach uses SIMD. The idea is similar to the previous approach. The outer loop iterates over the input _source field. A vector is created from each element, vCurrent. All elements of this vector is set to the same value, _source[i]. Then an inner loop iterates over the remaining portion of the input array in step sizes matching the vector's length. The average is calculated with the division replaced by a right shift operation. Each element of the vector is right shifted with one bit. The results are written to the _destination field. Finally, the loop needs to calculate the destIndex variable, which points to the end of the written data in the _destination field. This is required because the _source array's length may not be a multiple of the Vector<int>.Count. While the very last iteration of the inner loop still process Vector<int>.Count number of items (LoadUnsafe, StoreUnsafe loads/stores data), the index of valid data can be only be bumped by the number of elements remaining in _source. For example, if the inner loop iterates 8 items at a time, but only 3 items left in _source, we shall only bump the destIndex by 3, as the remaining items written is garbage.

Notice, for this reason it is required for this solution to have a _destination array that is at least a vector's length larger than otherwise required. With a smaller array it might try to write data beyond the array's length resulting in an IndexOutOfRangeException.

[Benchmark]
public ReadOnlyMemory<int> SimdSumLinear()
{
    int destIndex = 0;
    for (int i = 0; i < _source.Length; i++)
    {
        var vCurrent = new Vector<int>(_source[i]);
        int j;
        for (j = i; j < _source.Length; j += Vector<int>.Count)
        {
            Vector.StoreUnsafe(Vector.ShiftRightArithmetic(vCurrent + Vector.LoadUnsafe(ref _source[j]), 1), ref _destination[destIndex]);
            destIndex += int.Min(_source.Length - j, Vector<int>.Count);
        }
    }
    return _destination.AsMemory(0, Length * (Length + 1) / 2);
}

I have a second proposal for the SIMD solution. While the overall idea is the same as for the previous solution, this one uses different step sizes for the for loops. The outer loop's step size is increased to Vector<int>.Count. In each outer iteration, a vector is created from the subsequent elements of _source. At the same time, the inner loop step count is set to 1. This way we still generate the same pairs as before but in a different order.

    [Benchmark]
    public ReadOnlyMemory<int> SimdSum()
    {
        int destIndex = 0;
        for (int i = 0; i < _source.Length; i += Vector<int>.Count)
        {
            var vCurrent = Vector.LoadUnsafe(ref _source[i]);
            for (int j = i; j < _source.Length; j++)
            {
                Vector.StoreUnsafe(Vector.ShiftRightArithmetic(vCurrent + Vector.LoadUnsafe(ref _source[j]), 1), ref _destination[destIndex]);
                destIndex += int.Min(_source.Length - j, Vector<int>.Count);
            }
        }
        return _destination.AsMemory(0, Length * (Length + 1) / 2);
    }

Let's assume we have an input array of the following items {1, 2, 3}. The first SIMD solution generates pairs in the following order:

  • {1,1}, {1,2}, {1,3}

  • {2,2}, {2,3}

  • {3,3}

where each line corresponds to an outer loop iteration, and the inner loop iterates once in all cases.

The second SIMD solution generates items as:

  • {1,1}, {2,2}, {3,3}

  • {1,2}, {2,3}

  • {1,3}

Notice, that in this case the outer loop iterates once, while the inner loop iterates three times.

Benchmarks

I compared the solutions with BenchmarkDotNet. The input _source has a length of 40000.

The first observations to make: the SIMD solutions are generally faster to the non-SIMD one.

|        Method |     Mean |   Error |  StdDev |
|-------------- |---------:|--------:|--------:|
|    RegularSum | 602.3 ms | 8.54 ms | 7.57 ms |
| SimdSumLinear | 238.3 ms | 2.06 ms | 1.93 ms |
|       SimdSum | 232.2 ms | 1.18 ms | 0.92 ms |

The second observation is that SimdSum is slightly faster compared to SimdSumLinear. This is due to how memory is cached by the CPU. The first case iterates over the whole array n times, the second approach only has to do it n / Vector.Count times. This means memory that the first case has to bring data into the CPU cache less frequently as the second solution, hence gaining a slight performance benefit.