This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.
<?php
use parallel\{Runtime, Channel};
main($argv);
function main(array $argv)
{
if (count($argv) !== 3) {
echo "Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
echo "Example: hello-parallel.php 5 3" . PHP_EOL;
die;
} else {
$numberOfTasks = intval($argv[1]);
$maximumTimeOfSleep = intval($argv[2]);
$t1 = microtime(true);
parallelize($numberOfTasks, $maximumTimeOfSleep);
$endTime = microtime(true) - $t1;
echo PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
}
}
function parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
$channel = new Channel();
$taskIds = array_map(function () use ($maximumTimeOfSleep) {
return $id = uniqid("task::");
}, range(0, $numberOfTasks - 1));
$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
return rand(1, $maximumTimeOfSleep);
}, $taskIds);
$producer = new Runtime();
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
foreach ($timesToSleep as $timeToSleep) {
$channel->send($timeToSleep);
}
}, [$channel, $timesToSleep]);
$consumerFutures = array_map(function (string $id) use ($channel) {
$runtime = new Runtime();
return $runtime->run(function (string $id, Channel $channel) {
$timeToSleep = $channel->recv();
echo "Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
sleep($timeToSleep);
echo "$id slept for $timeToSleep second(s).".PHP_EOL;
return $timeToSleep;
}, [$id, $channel]);
}, $taskIds);
wait($consumerFutures);
wait([$producerFuture]);
}
function wait(array $futures)
{
return array_map(function ($future) {
return $future->value();
}, $futures);
}parallel\Channel
Почист и полокален преглед на PHP референцата, со задржана структура од PHP.net и подобра читливост за примери, секции и белешки.
parallel\Channel
Референца за `class.parallel-channel.php` со подобрена типографија и навигација.
Класата parallel\Channel
(0.9.0)
Нетампонирани канали
Нетампониран канал ќе блокира при повици до parallel\Channel::send() додека не постои приемник, и ќе блокира при повици до parallel\Channel::recv() додека не постои испраќач. Ова значи дека нетампонираниот канал не е само начин за споделување податоци меѓу задачи, туку и едноставен метод за синхронизација.
Нетампонираниот канал е најбрзиот начин за споделување податоци меѓу задачи, барајќи најмалку копирање.
Тампонирани канали
Тампониран канал нема да блокира при повици до parallel\Channel::send() додека не се достигне капацитетот, повици до parallel\Channel::recv() ќе блокираат додека нема податоци во тампонот.
Затворања преку канали
Моќна карактеристика на паралелните канали е што тие дозволуваат размена на затворања меѓу задачи (и средини за извршување).
Кога затворање се испраќа преку канал, затворањето се тампонира, тоа не ја менува тампонирањето на каналот што го пренесува затворањето, но влијае на статичкиот опсег внатре во затворањето: Истото затворање испратено до различни средини за извршување, или истата средина за извршување, нема да го споделуваат својот статички опсег.
Ова значи дека секогаш кога ќе се изврши затворање што било пренесено преку канал, статичката состојба ќе биде онаква каква што била кога затворањето било тампонирано.
Анонимни канали
Конструкторот за анонимни канали му овозможува на програмерот да избегне доделување имиња на секој канал: parallel ќе генерира уникатно име за анонимни канали.
Синопсис на класата
Содржина
- parallel\Channel::__construct — Resolution
- parallel\Channel::make — Channel Construction
- parallel\Channel::open — Channel Construction
- parallel\Channel::recv — Access
- parallel\Channel::send — Access
- parallel\Channel::close — Sharing
Белешки од корисници 5 белешки
an example used unbuffered channel.
<?php
use parallel\{Channel,Runtime};
$sum=function(array $a, Channel $ch) {
$sum=0;
foreach ($a as $v) {
$sum+=$v;
}
$ch->send($sum);
};
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
//unbuffered channel
$runtime=new Runtime;
$ch2=new Channel;
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
$runtime->run($sum, [array_slice($a, $num), $ch2]);
//receive from channel
$x=$ch2->recv();
$y=$ch2->recv();
$ch2->close();
echo "\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(Error $err) {
echo "\nError:", $err->getMessage();
} catch(Exception $e) {
echo "\nException:", $e->getMessage();
}
//output
//ch2:18 23 41<?php
// the very weird way to calculate factorial ^_^
// we create one thread and synching them with buffered channel
// at fact only one thread is executing at the time
use parallel\{Channel, Future, Runtime};
for ($n = 0; $n <= 10; $n++) {
echo "!$n = " . factorial($n) . PHP_EOL;
}
/**
* Creates $n threads.
*/
function factorial(int $n): int
{
// buffered channel - using for sync threads ^_^
$channel = new Channel(1);
$futureList = [];
for ($i = 2; $i <= $n; $i++) {
$runtime = new Runtime();
$futureList[] = $runtime->run(
static function (Channel $channel, $multiplier): void {
$f = $channel->recv();
$channel->send($f * $multiplier);
},
[$channel, $i]
);
}
$channel->send(1);
// waiting until all threads are done
do {
$allDone = array_reduce(
$futureList,
function (bool $c, Future $future): bool {
return $c && $future->done();
},
true
);
} while (false === $allDone);
return $channel->recv();
}
// output:
// !0 = 1
// !1 = 1
// !2 = 2
// !3 = 6
// !4 = 24
// !5 = 120
// !6 = 720
// !7 = 5040
// !8 = 40320
// !9 = 362880
// !10 = 3628800<?php
use parallel\Channel;
function sum(array $a, Channel $ch) {
$sum=0;
foreach ($a as $v) {
$sum+=$v;
}
$ch->send($sum);
}
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
$ch1=Channel::make('sum', 2);
$ch2=new Channel;
$num=count($a) / 2;
sum(array_slice($a, 0, $num), $ch1);
sum(array_slice($a, $num), $ch1);
//receive from channel
$x=$ch1->recv();
$y=$ch1->recv();
$ch1->close();
echo "\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(Error $err) {
echo "\nError:", $err->getMessage();
} catch(Exception $e) {
echo "\nException:", $e->getMessage();
}<?php
/**
* Bzz reloaded!
* Run two simple tasks in parallel and synchronize them with a channel
*
* parallel\Channel(int $capacity): Buffered channel
* Creates a buffered channel for communication between tasks
* @ref https://www.php.net/manual/en/class.parallel-channel.php
*/
echo "zzz... " . PHP_EOL;
// Create new buffered channel
$channel = new \parallel\Channel(2);
\parallel\run(
function($channel) {
$snaps_count = rand (8, 12);
echo "Number of snaps: $snaps_count" . PHP_EOL;
for ($i=1; $i<=$snaps_count; $i++) {
$other_sleep_time = rand(3, 5);
$my_sleep_time = rand(1, 3);
echo "Send sleep time to buffer" . PHP_EOL;
$start = microtime(true);
$channel->send($other_sleep_time);
$wait_time = microtime(true) - $start;
if ($wait_time > .1) {
echo "Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;
}
echo "I sleep for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
echo "I finished sleeping. Closing channel" . PHP_EOL;
$channel->close();
},
[$channel]
);
\parallel\run(
function($channel) {
try {
while(true) {
$my_sleep_time = $channel->recv();
echo "Other sleeps for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
} catch(\parallel\Channel\Error\Closed $e) {
echo "Channel is closed. Other die.";
die;
}
},
[$channel]
);