并行版的 std::accumulate

发布时间 2023-07-19 08:59:07作者: gcvition
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>

template <typename Iterator, typename T> struct accumulate_block {
    void operator()(Iterator first, Iterator last, T& result) { result = std::accumulate(first, last, result); }
};

template <typename Iterator, typename T> T parallel_accumulate(Iterator first, Iterator last, T init)
{
    unsigned long const length = std::distance(first, last);
    if(!length) {
        return init;
    }

    unsigned long const min_per_thread = 25;
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;

    unsigned long const hardware_threads = std::thread::hardware_concurrency();

    unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

    // 每个线程计算的数量
    unsigned long long block_size = length / num_threads;

    // 用于存储每个线程的计算结果
    std::vector<T> results(num_threads);

    // 因为在启动之前已经有了一个线程(执行此方法的线程),所以启动的线程数必须比num_threads少1。
    std::vector<std::thread> threads(num_threads - 1);

    Iterator block_start = first;
    for(unsigned long i = 0; i < (num_threads - 1); ++i) {
        Iterator block_end = block_start;
        // 将迭代器向后移动
        std::advance(block_end, block_size);
        // 开启线程计算结果
        threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start, block_end, std::ref(results[i]));

        block_start = block_end;
    }

    accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);

    for(auto& entry : threads) {
        entry.join();
    }

    // 合并计算结果并返回
    return std::accumulate(results.begin(), results.end(), init);
}

int main()
{
    std::vector vec(100000, 1);
    int init{ 0 };
    std::cout << parallel_accumulate(vec.begin(), vec.end(), init) << std::endl;
    return 0;
}