Funkcje asynchroniczne#
Future: “[…] an object that acts as a proxy for a result that is initially not known.”
—Baker and Hewitt, 1977
Obiekt future<T>
obsługuje możliwość odczytania przyszłych wartości typu T
, które mogą być wyliczane asynchronicznie w osobnych wątkach lub synchronicznie w tym samym wątku.
Obiekty future mogą pełnić rolę wysokopoziomowych konstrukcji synchronizujących pracę wielu wątków, zastępując przy tym niskopoziomowe konstrukcje takie jak np. zmienne warunkowe.
Obiektów future można używać nie tylko w programowaniu wielowątkowym, ale także w programach jednowątkowych.
Do obsługi wartości typu future wykorzystywane są cztery klasy:
std::future<T>
std::shared_future<T>
std::promise<T>
std::packaged_task<T>
oraz funkcja
std::async()
Futures#
Obiekty typu futures umożliwiają odczytanie wyników funkcji asynchronicznych:
std::future<typename T>
Obiekt typu
std::future<T>
reprezentuje unikalną wartość typu future (unikalne prawo własności do przyszłego wyniku wywołania funkcji).T get()
wstrzymuje bieżący wątek do momentu zakończenia asynchronicznej funkcji i następnie zwraca otrzymaną wartość lub rzuca przechowany w shared state wyjątek
#include <iostream> #include <future> #include <thread> int fibonacci(int n) { if (n < 3) return 1; else return fibonacci(n-1) + fibonacci(n-2); } int main() { std::future<int> f1 = std::async(std::launch::async, [] { return fibonacci(20); }); std::future<int> f2 = std::async(std::launch::async, [] { return fibonacci(25); }); std::cout << "waiting...\n"; std::cout << "f1: " << f1.get() << '\n'; std::cout << "f2: " << f2.get() << '\n'; }
void wait() const
blokuje bieżący wątek dopóki wynik nie zostanie obliczony
std::future_status wait_for(const std::chrono::duration<R, P>& timeout_duration) const
czeka przez określony interwał czasu aż wynik zostanie obliczony. Zwracany jest status, który może przyjąć jedną z trzech wartości:
future_status::deferred
- funkcja nie została jeszcze uruchomionafuture_status::ready
- wynik obliczenia jest gotowyfuture_status::timeout
- czas oczekiwania na wynik się zakończył (funkcja jeszcze nie zwróciła wyniku)
#include <iostream> #include <future> #include <thread> #include <chrono> int main() { std::future<int> future = std::async(std::launch::async, [](){ std::this_thread::sleep_for(std::chrono::seconds(3)); return 8; }); std::cout << "waiting...\n"; std::future_status status; do { status = future.wait_for(std::chrono::seconds(1)); if (status == std::future_status::deferred) { std::cout << "deferred\n"; } else if (status == std::future_status::timeout) { std::cout << "timeout\n"; } else if (status == std::future_status::ready) { std::cout << "ready!\n"; } } while (status != std::future_status::ready); std::cout << "result is " << future.get() << '\n'; }
std::shared_future<typename T>
Implementacja współdzielonych wartości typu future. Może być użyta do sygnalizacji między wieloma wątkami, które mogą współdzielić obiekt future.
Możliwa jest konwersja obiektu std::future<T>
do współdzielonego obiektu std::shared_future<T>
przy pomocy:
konstruktora przenoszącego
std::shared_future<T>(std::future&& other)
metody
std::future<T>::share()
zwracającej obiekt typustd::shared_future<T>
Wywołania asynchroniczne za pomocą funkcji std::async()
#
Funkcja szablonowa async()
umożliwia asynchroniczne uruchomienie obiektu wywoływalnego Callable (potencjalnie w osobnym wątku). Zwracana jest instancja std::future
, która przechowuje przyszły wynik wywołania funkcji.
auto async(std::launch policy, Function&& f, Args&&... args)
Wywołuje funkcję
f
przekazując do niej argumentyargs
zgodnie wybraną polityką uruchomienia:std::launch::async
- wywołanie w osobnym wątku (wywołanie asynchroniczne)std::lauch::deferred
- nie tworzy osobnego wątku. Leniwie wykonuje funkcjęf
(wywołanie następuje w momencie pierwszego wywołania na obiekcie future metodget()
lubwait()
auto async(Function&& f, Args&&... args)
Zachowuje się tak samo jak
async(std::launch::async | std::launch::deferred, f, args...)
, tzn. funkcjaf
może zostać wykonana w osobnym wątku lub synchronicznie w wątku bieżącym, gdy obiekt future zostanie odpytany o wynik operacji. Od wersji C++14 ta wersja funkcjiasync()
jest uznana za deprecated.
Klasa std::promise
#
std::promise<T>
Implementuje niskopoziomowe mechanizmy przechowanie wartości lub wyjątku, które później mogą być odczytane asynchronicznie poprzez obiekt
std::future<T>
wygenerowany przez instancjęstd::promise<T>
.
Obiekt promise<T>
jest związany ze współdzielonym stanem (shared state), który przechowuje pewną informację o
stanie oraz wynik, który może być jeszcze nie obliczony, obliczony do wartości lub obliczony do wyjątku.
Obiekt promise<T>
może zrobić trzy rzeczy ze stanem współdzielonym:
oznaczyć stan współdzielony jako gotowy do odczytu -
promise<T>
zapisuje wartośćset_value(T)
lub wyjątekset_exception(T)
zwolnić go - instancja
promise<T>
zwalnia referencję do stanu współdzielonego. Jeśli to była ostatnia referencja do stanu współdzielonego, stan ten jest niszczony. Operacja ta nie jest blokująca, za wyjątkiem sytuacji, gdy stan współdzielony został utworzony przy pomocy funkcjistd::async()
porzucić go - ustawiając jako wyjątek instancję typu
std::future_error
z kodem błędustd::future_error::broken_promise
Każdy obiekt std::promise<T>
związany jest jednym obiektem std::future<T>
.
Wątek z dostępem do obiektu future po wywołaniu metody wait()
będzie czekać na rezultat zwrócony przez obiekt
std::promise
z innego wątku przy pomocy metody set_value()
.
#include <thread>
#include <future>
#include <chrono>
#include <iostream>
std::string load_file_content()
{
std::string content = "FILE content";
using namespace std::chrono;
if (duration_cast<milliseconds>(
steady_clock::now().time_since_epoch()).count() % 2 == 0)
throw std::runtime_error("I/O error");
return content;
}
class BackgroundTask
{
std::promise<std::string> promise_;
public:
void operator()()
{
std::this_thread::sleep_for(std::chrono::seconds(3));
try
{
std::string result = load_file_content();
promise_.set_value(result);
}
catch (...)
{
promise_.set_exception(std::current_exception());
}
}
std::future<std::string> get_future()
{
return promise_.get_future();
}
};
void waiter(std::future<std::string> f)
{
try
{
std::cout << f.get() << std::endl;
}
catch(const std::runtime_error& e)
{
std::cout << "Exception caught: " << e.what() << std::endl;
}
}
int main()
{
BackgroundTask bt;
std::future<std::string> future = bt.get_future();
std::thread thd1(waiter, std::move(future));
std::thread thd2(std::ref(bt));
thd1.join();
thd2.join();
}
Klasa std::packaged_task
#
std::packaged_task<R(TArgs...)>
Instancja tej klasy jest pomocniczym obiektem opakowującym wywołanie funkcji lub funktora (obiektu wywoływalnego - callable) i implementującym zapisanie wyniku w shared state, który może być odczytany przez obiekt
std::future<T>
.
#include <future>
#include <thread>
int main()
{
// definition of tasks
std::packaged_task<int (int)> pt_1([](int i) { return i * i; });
std::packaged_task<int (int)> pt_2([](int i) { return i * i; });
// getting futures
std::future<int> fresult_1 = pt_1.get_future();
std::future<int> fresult_2 = pt_2.get_future();
// calling task synchronously
pt_1(17);
// calling task in new thread
std::jthread(std::move(pt_2));
// odczytanie wyniku
std::cout << "result#1: " << fresult_1.get();
std::cout << "result#2: " << fresult_2.get();
}
Wrapper std::packaged_task
pozwala oddzielić:
definicję zadania, które ma być wykonane
wywołanie zadania, które może zostać wykonane w osobnym wątku
przetworzenie wyników zadania
Przykład wykorzystania obiektów std::packaged_task
:
#include <iostream>
#include <thread>
#include <future>
#include <vector>
void run_all_tasks(std::vector<std::packaged_task<int (char)>>& tasks)
{
int i = 0;
// iterate over tasks and start them with parameters 1, 2, 3...
for(auto& task : tasks)
{
char c = '1' + i++;
task(c); // run packaged_task with argument c
}
}
int func(char c)
{
for(int i = 0; i < 10; ++i)
{
std::cout.put(c).flush();
std::this_thread::sleep_for(chrono::milliseconds(300));
}
return c;
}
int main()
{
std::vector<std::packaged_task<int (char)>> tasks;
// definition of some tasks
std::packaged_task<int (char)> t1(&func);
tasks.push_back(move(t1));
std::packaged_task<int (char)> t2([](char c) { return c; });
tasks.push_back(move(t2));
tasks.emplace_back(&func);
// start all tasks in one separate thread
auto future_all_tasks = std::async(std::launch::async, &run_all_tasks, std::ref(tasks));
// wait until all tasks are done
future_all_tasks.wait();
// process results of each task
for(auto& task : tasks)
{
cout << "result: " << task.get_future().get() << endl;
}
}
Obiekt packaged_task
może zostać użyty w połączeniu z obiektami shared_future
do implementacji funkcji asynchronicznych.
#include <iostream>
#include <string>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;
typedef std::string FileContent;
FileContent download_file(const std::string& url)
{
std::this_thread::sleep_for(2s);
return "content of file: " + url;
}
std::shared_future<FileContent> download_file_async(const std::string& u
{
std::packaged_task<FileContent ()> task([=] { return download_file(u
std::shared_future<FileContent> sf(task.get_future());
std::thread thd(move(task));
thd.detach();
return sf;
}
void process(std::shared_future<FileContent> file_content)
{
std::cout << "Processing " << file_content.get() << " in a thread#"
}
int main()
{
std::shared_future<FileContent> content =
download_file_async("http://infotraining.pl");
std::thread thd1{&process, content};
std::thread thd2{ [content]() -> void {
cout << "Processed " << content.get()
<< " by lambda in a thread#" << this_thread::get_id() << en
};
thd1.join();
thd2.join();
}