## An approach to Walsh pais with SIMD

### 11/19/2023

### 5 minutes

In a previous posts I looked into using Single instruction, multiple data, SIMD to find the `Median`

value in a sample integers.

The definition of median in case of

an odd sample size, find 'middle' element of the ordered samples.

an even sample size, find the 2 elements in the middle of an ordered sample size, and calculate the average of those.

I have recently came across a blog post from Andrey Akinshin proposing to benefits of using Hodges-Lehmann location estimator to calculate a pseudo median.

The Hodges-Lehmann estimator uses Walsh (pairwise) averages as the base of the median. In this blog post I will investigate creating pairwise averages of an array of positive integers.

## Walsh averages

Pairwise averages are averages calculated from each pair in a set, including a pair matched with itself. In case a set contains {1, 2, 3} the pairs would be {{1,1}, {1,2}, {1,3}, {2,2}, {2,3}, {3,3}} which results the following averages: {1, 1.5, 2, 2, 2.5, 3}. In the case I am solving, the input integers are *large enough*, so that the fractional parts of results are negligible.

There are 3 approaches detailed in this post: a traditional non-SIMD solution and two SIMD solutions.

The following regular approach uses two `for`

loops. The outer loop iterates over the input and takes the current element. An inner loop iterates over the remaining part of the array (including the current element). It calculates the average of the current item of the outer loop and the item indexed by the inner loop.

The input data is read from the `_source`

field and the results are written to the `_destination`

field. As the destination field is larger than required a `ReadOnlyMemory<int>`

is returned which refers to the segment of the `_destination`

containing the results.

[Benchmark] public ReadOnlyMemory<int> RegularSum() { int destIndex = 0; for (int i = 0; i < _source.Length; i++) { var current = _source[i]; for (int j = i; j < _source.Length; j++) _destination[destIndex++] = (current + _source[j]) / 2; } return _destination.AsMemory(0, Length * (Length + 1) / 2); }

The second approach uses SIMD. The idea is similar to the previous approach. The outer loop iterates over the input `_source`

field. A vector is created from each element, `vCurrent`

. All elements of this vector is set to the same value, `_source[i]`

. Then an inner loop iterates over the remaining portion of the input array in step sizes matching the vector's length. The average is calculated with the division replaced by a right shift operation. Each element of the vector is right shifted with one bit. The results are written to the `_destination`

field. Finally, the loop needs to calculate the `destIndex`

variable, which points to the end of the written data in the `_destination`

field. This is required because the `_source`

array's length may not be a multiple of the `Vector<int>.Count`

. While the very last iteration of the inner loop still process `Vector<int>.Count`

number of items (LoadUnsafe, StoreUnsafe loads/stores data), the index of valid data can be only be bumped by the number of elements remaining in `_source`

. For example, if the inner loop iterates 8 items at a time, but only 3 items left in `_source`

, we shall only bump the `destIndex`

by 3, as the remaining items written is garbage.

Notice, for this reason it is required for this solution to have a `_destination`

array that is at least *a vector's length* larger than otherwise required. With a smaller array it might try to write data beyond the array's length resulting in an `IndexOutOfRangeException`

.

[Benchmark] public ReadOnlyMemory<int> SimdSumLinear() { int destIndex = 0; for (int i = 0; i < _source.Length; i++) { var vCurrent = new Vector<int>(_source[i]); int j; for (j = i; j < _source.Length; j += Vector<int>.Count) { Vector.StoreUnsafe(Vector.ShiftRightArithmetic(vCurrent + Vector.LoadUnsafe(ref _source[j]), 1), ref _destination[destIndex]); destIndex += int.Min(_source.Length - j, Vector<int>.Count); } } return _destination.AsMemory(0, Length * (Length + 1) / 2); }

I have a second proposal for the SIMD solution. While the overall idea is the same as for the previous solution, this one uses different step sizes for the `for`

loops. The outer loop's step size is increased to `Vector<int>.Count`

. In each outer iteration, a vector is created from the subsequent elements of `_source`

. At the same time, the inner loop step count is set to 1. This way we still generate the same pairs as before but in a different order.

[Benchmark] public ReadOnlyMemory<int> SimdSum() { int destIndex = 0; for (int i = 0; i < _source.Length; i += Vector<int>.Count) { var vCurrent = Vector.LoadUnsafe(ref _source[i]); for (int j = i; j < _source.Length; j++) { Vector.StoreUnsafe(Vector.ShiftRightArithmetic(vCurrent + Vector.LoadUnsafe(ref _source[j]), 1), ref _destination[destIndex]); destIndex += int.Min(_source.Length - j, Vector<int>.Count); } } return _destination.AsMemory(0, Length * (Length + 1) / 2); }

Let's assume we have an input array of the following items {1, 2, 3}. The first SIMD solution generates pairs in the following order:

{1,1}, {1,2}, {1,3}

{2,2}, {2,3}

{3,3}

where each line corresponds to an outer loop iteration, and the inner loop iterates once in all cases.

The second SIMD solution generates items as:

{1,1}, {2,2}, {3,3}

{1,2}, {2,3}

{1,3}

Notice, that in this case the outer loop iterates once, while the inner loop iterates three times.

## Benchmarks

I compared the solutions with BenchmarkDotNet. The input `_source`

has a length of 40000.

The first observations to make: the SIMD solutions are generally faster to the non-SIMD one.

| Method | Mean | Error | StdDev | |-------------- |---------:|--------:|--------:| | RegularSum | 602.3 ms | 8.54 ms | 7.57 ms | | SimdSumLinear | 238.3 ms | 2.06 ms | 1.93 ms | | SimdSum | 232.2 ms | 1.18 ms | 0.92 ms |

The second observation is that `SimdSum`

is slightly faster compared to `SimdSumLinear`

. This is due to how memory is cached by the CPU. The first case iterates over the whole array *n* times, the second approach only has to do it *n / Vector.Count* times. This means memory that the first case has to bring data into the CPU cache less frequently as the second solution, hence gaining a slight performance benefit.