Here's a more substantial example of how to use the run functional API.
<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers. Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers. The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/
use \parallel\{Runtime, Future, Channel, Events};
$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;
print "Total IDs: " . $totalIds . "\n";
print "Batch Size: " . $batchSize . "\n";
print "Last Batch: " . $lastBatch . "\n";
$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;
print "Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";
while($tempMinId < $endId) {
for($i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print "Worker " . $worker . " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
break;
}
// Now we move on to the next sub-batch for this worker
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if($tempMaxId > $endId) {
$tempMaxId = $endId;
}
// Introduce some timing randomness
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}
// This worker has completed their entire batch
print "Worker " . $worker . " finished\n";
};
// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if($i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}
?>parallel\run
Почист и полокален преглед на PHP референцата, со задржана структура од PHP.net и подобра читливост за примери, секции и белешки.
parallel\run
Референца за `parallel.run.php` со подобрена типографија и навигација.
parallel\run
(1.0.0)
parallel\run — Извршување
= NULL
Ќе закаже task ќе се распоредат
Ќе закаже task за извршување паралелно. argv за извршување паралелно, поминувајќи
Автоматско закажување
Ако \parallel\Runtime внатрешно креирано и кеширано од претходен повик до parallel\run() е неактивен, ќе се користи за извршување на задачата. Ако не \parallel\Runtime е неактивен, паралелно ќе креира и кешира \parallel\Runtime.
Забелешка: \parallel\Runtime објекти креирани од програмерот не се користат за автоматско закажување.
Параметри
на аргументи со специфични карактеристики што треба да се поминат на
Карактеристики на задачи
- Затворањата распоредени за паралелно извршување не смеат:
- прифаќаат или враќаат преку референца
- прифаќаат или враќаат внатрешни објекти (види белешки)
извршуваат ограничен сет на инструкции
- yield
- користи по референца
- декларирај класа
- декларирај именувана функција
Забелешка: Вгнездени затворачи може да дадат или користат по референца, но не смеат да содржат декларации за класи или именувани функции.
Забелешка: Ниту една инструкција не е забранета во датотеките што може да ги вклучува задачата.
Инструкциите забранети во затворањата наменети за паралелно извршување се:
Карактеристики на аргументи
- Аргументите не смеат:
- да содржат референци
- да содржат ресурси
Забелешка: да содржат внатрешни објекти (види белешки) int Во случај на ресурси од стрим на датотеки, ресурсот ќе биде префрлен на дескрипторот на датотеката и ќе биде поминат како
каде што е можно, ова не е поддржано на Windows.
Внатрешните објекти генерално користат прилагодена структура што не може безбедно да се копира по вредност, PHP моментално нема механизми за ова (без серијализација) и затоа само објекти што не користат прилагодена структура може да се споделат.
Некои внатрешни објекти не користат прилагодена структура, на пример parallel\Events\Event и затоа може да се споделат.
Затворањата се посебен вид внатрешен објект и поддржуваат копирање по вредност, и затоа може да се споделат.
Каналите се централни за пишување паралелен код и поддржуваат истовремен пристап и извршување по потреба, и затоа може да се споделат.
Корисничка класа што наследува внатрешна класа може да користи прилагодена структура како што е дефинирана од внатрешната класа, во кој случај тие не можат безбедно да се копираат по вредност, и затоа не може да се споделат.
Вратени вредности
Враќањето parallel\Future не смее да се игнорира кога задачата содржи изјава за враќање или фрлање.
Исклучоци
Тековната Дефиниција parallel\Runtime\Error\Closed if parallel\Runtime беше затворено.
Тековната Дефиниција parallel\Runtime\Error\IllegalFunction if task е затворање создадено од внатрешна функција.
Тековната Дефиниција parallel\Runtime\Error\IllegalInstruction if task содржи недозволени инструкции.
Тековната Дефиниција parallel\Runtime\Error\IllegalParameter if task прифаќа или argv содржи недозволени променливи.
Тековната Дефиниција parallel\Runtime\Error\IllegalReturn if task враќа недозволено.
Види Исто така
Белешки од корисници 3 белешки
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime = new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include "included.php";
return add (1, 3);
}, [ ] );
echo $future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
return $a + $b;
}<?php
/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/
use \parallel\{Runtime, Future, Channel, Events};
// Generate list of items to process with a generator
function generator(int $item_count) {
for ($i=1; $i <= $item_count; $i++) {
yield $i;
}
}
function testConcurrency(int $concurrency, int $item_count) {
$generator = generator($item_count);
// Function executing in each thread. Have a snap for a random time for example !
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return ['item_id' => $item_id, 'sleep_seconds' => $seconds];
};
// Fill up threads with initial 'inactive' state
$threads = array_fill(1, $concurrency, ['is_active' => false]);
while (true) {
// Loop through threads until all threads are finished
foreach ($threads as $thread_id => $thread) {
if (!$thread['is_active'] and $generator->valid()) {
// Thread is inactive and generator still has values : run something in the thread
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo "ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset($threads[$thread_id]['run'])) {
// Destroy supplementary threads in case generator closes sooner than number of threads
echo "Destroy ThreadId: $thread_id\n";
unset($threads[$thread_id]);
} elseif ($threads[$thread_id]['run']->done()) {
// Thread finished. Get results
$item = $threads[$thread_id]['run']->value();
echo "ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";
if (!$generator->valid()) {
// Generator is closed then destroy thread
echo "Destroy ThreadId: $thread_id\n";
unset($threads[$thread_id]);
} else {
// Thread is ready to run again
$threads[$thread_id]['is_active'] = false;
}
}
}
// Escape loop when all threads are destroyed
if (empty($threads)) break;
}
}
$concurrency = 5;
$item_count = 50;
testConcurrency($concurrency, $item_count);
?>