std::execution: Inclusive Scan
Before I present the asynchronous inclusive scan, I introduce the inclusive scan, aka prefix sum.
Prefix Sum
In computer science, the prefix sum, cumulative sum, inclusive scan, or simply scan of a sequence of numbers x0, x1, x2, … is a second sequence of numbers y0, y1, y2, …, the sums of prefixes (running totals) of the input sequence:
y0 = x0
y1 = x0 + x1
y2 = x0 + x1+ x2
...
Proposal P2300R10 has an asynchronous implementation of inclusive scan.
Asynchronous Inclusive Scan
using namespace std::execution;
sender auto async_inclusive_scan(scheduler auto sch, // 2
std::span<const double> input, // 1
std::span<double> output, // 1
double init, // 1
std::size_t tile_count) // 3
{
std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count;
std::vector<double> partials(tile_count + 1); // 4
partials[0] = init; // 4
return just(std::move(partials)) // 5
| continues_on(sch)
| bulk(tile_count, // 6
[ = ](std::size_t i, std::vector<double>& partials) { // 7
auto start = i * tile_size; // 8
auto end = std::min(input.size(), (i + 1) * tile_size); // 8
partials[i + 1] = *--std::inclusive_scan(begin(input) + start, //9
begin(input) + end, // 9
begin(output) + start); // 9
}) // 10
| then( // 11
[](std::vector<double>&& partials) {
std::inclusive_scan(begin(partials), end(partials), // 12
begin(partials)); // 12
return std::move(partials); // 13
})
| bulk(tile_count, // 14
[ = ](std::size_t i, std::vector<double>& partials) { // 14
auto start = i * tile_size; // 14
auto end = std::min(input.size(), (i + 1) * tile_size); // 14
std::for_each(begin(output) + start, begin(output) + end, // 14
[&] (double& e) { e = partials[i] + e; } // 14
);
})
| then( // 15
[ = ](std::vector<double>&& partials) { // 15
return output; // 15
}); // 15
}
Here’s the explanation from the proposal P2300R10:
Sender
Would it not be nice to see this program in action? Currently (December 2024), no compiler supports std::execution or the concepts sender and scheduler.
Here comes the reference implementation stdexec to our rescue:
I changed the data type of the processed elements from double to int.
Recommended by LinkedIn
Modernes C++ Mentoring
Do you want to stay informed: Subscribe.
// inclusiveScanExecution.cpp
#include <algorithm>
#include <exec/static_thread_pool.hpp>
#include <iostream>
#include <numeric>
#include <span>
#include <stdexec/execution.hpp>
#include <vector>
auto async_inclusive_scan(auto sch, // 2
std::span<const int> input, // 1
std::span<int> output, // 1
int init, // 1
std::size_t tile_count) // 3
{
std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count;
std::vector<int> partials(tile_count + 1); // 4
partials[0] = init; // 4
return stdexec::just(std::move(partials)) // 5
| stdexec::continues_on(sch) |
stdexec::bulk(tile_count, // 6
[=](std::size_t i, std::vector<int> &partials) { // 7
auto start = i * tile_size; // 8
auto end =
std::min(input.size(), (i + 1) * tile_size); // 8
partials[i + 1] =
*--std::inclusive_scan(begin(input) + start, // 9
begin(input) + end, // 9
begin(output) + start); // 9
}) // 10
| stdexec::then( // 11
[](std::vector<int> &&partials) {
std::inclusive_scan(begin(partials), end(partials), // 12
begin(partials)); // 12
return std::move(partials); // 13
}) |
stdexec::bulk(
tile_count, // 14
[=](std::size_t i, std::vector<int> &partials) { // 14
auto start = i * tile_size; // 14
auto end = std::min(input.size(), (i + 1) * tile_size); // 14
std::for_each(begin(output) + start, begin(output) + end, // 14
[&](int &e) { e = partials[i] + e; } // 14
);
}) |
stdexec::then( // 15
[=](std::vector<int> &&partials) { // 15
return output; // 15
}); // 15
}
int main() {
std::cout << '\n';
std::vector<int> input(30);
std::iota(begin(input), end(input), 0);
for (auto e : input) {
std::cout << e << ' ';
}
std::cout << '\n';
std::vector<int> output(input.size());
exec::static_thread_pool pool(8);
auto sch = pool.get_scheduler();
auto [out] =
stdexec::sync_wait(async_inclusive_scan(sch, input, output, 0, 4))
.value();
for (auto e : out) {
std::cout << e << ' ';
}
std::cout << '\n';
}
Here is an explanation of the main program:
In the main a std::vector<int> input is then created with 30 elements. The std::iota function fills the input vector with sequential integers starting from 0. The program then prints the contents of the input vector to the console.
Next, a std::vector<int> output is created with the same size as the input vector to store the results of the inclusive scan operation. The exec::static_thread_pool pool has 8 threads, which will be used to execute tasks concurrently. The get_scheduler member function of the thread pool creates a scheduler object sch.
The async_inclusive_scan function takes the scheduler sch, the input vector, the output vector, an initial value of 0, and a tile count of 4. This function performs the inclusive scan operation asynchronously using the specified scheduler and returns a future-like object. The stdexec::sync_wait function synchronously wait for the completion of the async_inclusive_scan operation, and the result is unpacked into the variable out.
Finally, the program prints the contents of the out vector to the console:
What’s Next?
In my next blog post, I will step one step back and explain the composition of senders via operator |.