Algorytmy współbieżne STL w C++17

W C++17 istnieje możliwość współbieżnego wykonania algorytmów STL. Bazą dla wprowadzenia współbieżności do STL był dokument Parallel TS.

Jak na razie nie ma implementacji dostępnej domyślnie z popularnymi kompilatorami (np. gcc, clang, VS). Istnieje kilka eksperymentalnych implementacji dostarczanych jako biblioteki:

Lista algorytmów współbieżnych

Większość algorytmów STL posiada w C++17 wersje współbieżne:

adjacent_difference inplace_merge replace_copy
adjacent_find is_heap replace_copy_if
all_of is_heap_until replace_if
any_of is_partitioned reverse
copy is_sorted reverse_copy
copy_if is_sorted rotate
copy_n is_sorted_until rotate_copy
count lexicographical_compare search
count_if max_element search_n
equal merge set_difference
exclusive_scan min_element set_intersection
fill minmax_element set_symmetric_difference
fill_n mismatch set_union
find move sort
find_end none_of stable_partition
find_first_of nth_element stable_sort
find_if partial_sort swap_ranges
find_if_not partial_sort_copy transform
for_each partition transform_exclusive_scan
for_each_n partition_copy transform_inclusive_scan
generate reduce transform_reduce
generate_n remove uninitialized_copy
includes remove_copy uninitialized_copy_n
inclusive_scan remove_copy_if uninitialized_fill
inner_product remove_if uninitialized_fill_n
  replace unique
    unique_copy

Funkcje dostępu do danych

Algorytmy współbieżne korzystają z tzw. funkcji dostępu do danych (element access functions). Należą do nich:

  • wszystkie operacje danej kategorii iteratora przekazanego przy wywołaniu algorytmu
  • operacje wykonywane na elementach wymagane przez specyfikację algorytmu
  • obiekty funkcyjne przekazane przez użytkownika oraz operacje na tych obiektach wymagane przez specyfikację

Na przykład algorytm sort():

  • wykorzystuje operacje iteratora o dostępie swobodnym
  • wywołuje funkcję swap() na elementach sekwencji
  • wywołuje funktor Compare przekazany przez użytkownika

Ważne

Funkcje lub obiekty funkcyjne przekazane do algorytmu współbieżnego nie powinny bezpośrednio lub pośrednio modyfikować wartości obiektów przekazanych do nich jako argumenty.

Wytyczne wykonania algorytmów współbieżnych

Dla nowych wersji algorytmów standardowych przyjęto rozwiązanie polegające na tym, że żądanie wykonania algorytmu we współbieżny lub sekwencyjny sposób jest realizowane poprzez przekazanie jako pierwszego argumentu wywołania funkcji tzw. wytycznej wykonania (execution policy).

Standard definiuje trzy podstawowe wytyczne:

  • sequential policy - std::seq

    • jeden wątek (ten, w którym został wywołany algorytm) wykonuje wszystkie zadania sekwencyjnie w określonej kolejności (od początku do końca)
    • tryb wykonania przydatny przy debugowaniu i w testach
      • brak współbieżności wykonania oraz brak reorderingu
    std::transform(std::execution::seq, data.begin(), data.end(), dest.begin(),
                   [](auto x) {
                      log_file << x; // concurrent access is race
                      return x * x;
                   }
    );
    
  • parallel sequenced - std::par

    • wiele wątków może wykonywać zadania (multithreading)
    • zadania są w obrębie swojego wątku roboczego wykonywane sekwencyjnie w określonej kolejności => wszystkie zadania muszą być thread safe
    • istnieje możliwość użycia konstrukcji synchronizujących współbieżny dostęp do danych (np. std::mute, std::atomic<T>)
    std::atomic<int> x{0};
    int a[] = {1,2};
    std::for_each(std::par, std::begin(a), std::end(a), [&](auto item) {
        x.fetch_add(item, std::memory_order_relaxed);
        // spin wait for another iteration to change the value of x
        while (x.load(std::memory_order_relaxed) >= 1) { } // Error: assumes execution order
    });
    
    std::mutex mtx;
    int shared_value{665};
    
    std::transform(std::execution::par, data.begin(), data.end(), dest.begin(),
                   [&](auto x) {
                       std::lock_guard lk{mtx}; // OK for par but not for par_unseq
                       auto new_value = x * shared_value;
                       return new_value;
                   }
    );
    
  • parallel unsequenced - std::par_unseq

    • zadania mogą być wykonywane z wykorzystaniem wielowątkowości (multithreading) i współbieżności wektorowej
    • zadania mogą być wykonywane w różnej kolejności w różnych wątkach
    • wywołania obiektów funkcyjnych mogą być przemieszane (interleaved) w ramach konkretnego wątku - w rezultacie wywołania operacji synchronizujących (np. mutex::lock()) grożą zakleszczeniem
    • nie można używać dynamicznej alokacji i dealokacji pamięci
    int x = 0;
    std::mutex m;
    int a[] = {1,2};
    
    std::for_each(std::par_unseq, std::begin(a), std::end(a),
                  [&](int) {
                      std::lock_guard lk(m); // Error: lock_guard constructor calls m.lock()
                      ++x;
                  }
    );
    
    std::transform(std::execution::par_unseq, data.begin(), data.end(), dest.begin(),
                   [&](auto x) {
                       auto new_value = x * x;
                       return new_value;
                   }
    );
    

Wyjątki w algorytmach współbieżnych

Jeśli jakikolwiek wyjątek wydostanie się z algorytmu współbieżnego, wywołana zostanie funkcja terminate().

Nowe algorytmy współbieżne

std::reduce

  • T reduce(ExecutionPolicy&& policy, FwdIt first, FwdIt last, T init)
  • typename std::iterator_traits<FwdIt>::value_type reduce(ExecutionPolicy&& policy, FwdIt first, FwdIt last)
  • T reduce(ExecutionPolicy&& policy, FwdIt first, FwdIt last, T init, BinaryOp binary_op)

Działa jak algorytm std::accumulate(), ale aplikuje funktor binary_op w nieokreślonej kolejności

  • rezultat w przypadku przekazania funktora, który nie jest przechodni i komutatywny, jest nieokreślony
    • np: dodawanie zmiennych typu float

std::transform_reduce

  • T transform_reduce(ExecutionPolicy&& policy, FwdIt1 first1, FwdIt1 last1, FwdIt2 first2, T init)
  • T transform_reduce(ExecutionPolicy&& policy, FwdIt1 first1, FwdIt1 last1, FwdIt2 first2, T init, BinaryOp1 binary_op1, BinaryOp2 binary_op2)
  • T transform_reduce(ExecutionPolicy&& policy, FwdIt first, FwdIt last, T init, BinaryOp binary_op, UnaryOp unary_op)

Aplikuje transformację funktorem binary_op2() lub unary_op(), a następnie redukuje wyniki transformacji funktorem binary_op1.

Domyślnymi funktorami są odpowiednio: std::multiplies<> i std::plus<>.

std::exclusive_scan

  • FwdIt2 exclusive_scan( ExecutionPolicy&& policy, FwdIt1 first, FwdIt1 last, FwdIt2 d_first, T init)
  • FwdIt2 exclusive_scan( ExecutionPolicy&& policy, FwdIt1 first, FwdIt1 last, FwdIt2 d_first, T init, BinaryOperation binary_op )

Oblicza sumę poprzedzającą używając funktora binary_op() (używając wartości początkowej init) i zapisuje wyniki do sekwencji wskazywanej iteratorem d_first.

Przedrostek exlusive oznacza, że i-ty element nie jest uwzględniany w i-tej sumie.

std::inclusive_scan

  • FwdIt2 inclusive_scan( ExecutionPolicy&& policy, FwdIt1 first, FwdIt1 last, FwdIt2 d_first, T init)
  • FwdIt2 inclusive_scan( ExecutionPolicy&& policy, FwdIt1 first, FwdIt1 last, FwdIt2 d_first, T init, BinaryOperation binary_op )

Oblicza sumę poprzedzającą używając funktora binary_op() (używając wartości początkowej init) i zapisuje wyniki do sekwencji wskazywanej iteratorem d_first.

Przedrostek inclusive oznacza, że i-ty element jest uwzględniany w i-tej sumie.

std::transform_inclusive_scan oraz transform_exclusive_scan

Algorytmy będące połączeniem transformacji i i odpowiedniego algorytmu skanującego.