10000 Child process queue workers (#450) · NativePHP/laravel@101ddf0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 101ddf0

Browse files
XbNzsimonhamp
andauthored
Child process queue workers (#450)
* Child process queue workers - Remove `ShouldBroadcast` in favor of `ShouldBroadcastNow` in `Events\\ChildProcess` namespace - Implement `QueueWorker::class` - Add new binding to `NativeServiceProvider::class` - Fire up workers: iterate through queue worker config in `NativeServiceProvider::configureApp()` - Test `QueueWorker::up()`, `QueueWorker::down()` - Test `QueueWorkerFake::class` assertions work as expected * Prevent attempting to boot workers on CLI calls * Remove creating workers just by string Rely on keys of the config array to assert uniqueness of worker aliases * Allow workers to be instantiated directly by alias * Fix styling * Fix tests --------- Co-authored-by: Simon Hamp <simon.hamp@me.com> Co-authored-by: simonhamp <simonhamp@users.noreply.github.com>
1 parent 8267087 commit 101ddf0

14 files changed

+394
-8
lines changed

config/nativephp.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,12 @@
114114
],
115115
],
116116
],
117+
118+
'queue_workers' => [
119+
'default' => [
120+
'queues' => ['default'],
121+
'memory_limit' => 128,
122+
'timeout' => 60,
123+
],
124+
],
117125
];

src/Contracts/QueueWorker.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
namespace Native\Laravel\Contracts;
4+
5+
use Native\Laravel\DTOs\QueueConfig;
6+
7+
interface QueueWorker
8+
{
9+
public function up(QueueConfig $config): void;
10+
11+
public function down(string $alias): void;
12+
}

src/DTOs/QueueConfig.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
namespace Native\Laravel\DTOs;
4+
5+
class QueueConfig
6+
{
7+
/**
8+
* @param array<int, string> $queuesToConsume
9+
*/
10+
public function __construct(
11+
public readonly string $alias,
12+
public readonly array $queuesToConsume,
13+
public readonly int $memoryLimit,
14+
public readonly int $timeout,
15+
) {}
16+
17+
/**
18+
* @return array<int, self>
19+
*/
20+
public static function fromConfigArray(array $config): array
21+
{
22+
return array_map(
23+
function (array|string $worker, string $alias) {
24+
return new self(
25+
$alias,
26+
$worker['queues'] ?? ['default'],
27+
$worker['memory_limit'] ?? 128,
28+
$worker['timeout'] ?? 60,
29+
);
30+
},
31+
$config,
32+
array_keys($config),
33+
);
34+
}
35+
}

src/Events/ChildProcess/ErrorReceived.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class ErrorReceived implements ShouldBroadcast
10+
class ErrorReceived implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Events/ChildProcess/MessageReceived.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class MessageReceived implements ShouldBroadcast
10+
class MessageReceived implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Events/ChildProcess/ProcessExited.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class ProcessExited implements ShouldBroadcast
10+
class ProcessExited implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Events/ChildProcess/ProcessSpawned.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class ProcessSpawned implements ShouldBroadcast
10+
class ProcessSpawned implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Facades/QueueWorker.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Native\Laravel\Facades;
4+
5+
use Illuminate\Support\Facades\Facade;
6+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
7+
use Native\Laravel\DTOs\QueueConfig;
8+
use Native\Laravel\Fakes\QueueWorkerFake;
9+
10+
/**
11+
* @method static void up(QueueConfig $config)
12+
* @method static void down(string $alias)
13+
*/
14+
class QueueWorker extends Facade
15+
{
16+
public static function fake()
17+
{
18+
return tap(static::getFacadeApplication()->make(QueueWorkerFake::class), function ($fake) {
19+
static::swap($fake);
20+
});
21+
}
22+
23+
protected static function getFacadeAccessor(): string
24+
{
25+
self::clearResolvedInstance(QueueWorkerContract::class);
26+
27+
return QueueWorkerContract::class;
28+
}
29+
}

src/Fakes/QueueWorkerFake.php

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
namespace Native\Laravel\Fakes;
4+
5+
use Closure;
6+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
7+
use Native\Laravel\DTOs\QueueConfig;
8+
use PHPUnit\Framework\Assert as PHPUnit;
9+
10+
class QueueWorkerFake 2851 implements QueueWorkerContract
11+
{
12+
/**
13+
* @var array<int, QueueConfig>
14+
*/
15+
public array $ups = [];
16+
17+
/**
18+
* @var array<int, string>
19+
*/
20+
public array $downs = [];
21+
22+
public function up(QueueConfig $config): void
23+
{
24+
$this->ups[] = $config;
25+
}
26+
27+
public function down(string $alias): void
28+
{
29+
$this->downs[] = $alias;
30+
}
31+
32+
public function assertUp(Closure $callback): void
33+
{
34+
$hit = empty(
35+
array_filter(
36+
$this->ups,
37+
fn (QueueConfig $up) => $callback($up) === true
38+
)
39+
) === false;
40+
41+
PHPUnit::assertTrue($hit);
42+
}
43+
44+
public function assertDown(string|Closure $alias): void
45+
{
46+
if (is_callable($alias) === false) {
47+
PHPUnit::assertContains($alias, $this->downs);
48+
49+
return;
50+
}
51+
52+
$hit = empty(
53+
array_filter(
54+
$this->downs,
55+
fn (string $down) => $alias($down) === true
56+
)
57+
) === false;
58+
59+
PHPUnit::assertTrue($hit);
60+
}
61+
}

src/NativeServiceProvider.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
1818
use Native\Laravel\Contracts\GlobalShortcut as GlobalShortcutContract;
1919
use Native\Laravel\Contracts\PowerMonitor as PowerMonitorContract;
20+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
2021
use Native\Laravel\Contracts\WindowManager as WindowManagerContract;
22+
use Native\Laravel\DTOs\QueueConfig;
2123
use Native\Laravel\Events\EventWatcher;
2224
use Native\Laravel\Exceptions\Handler;
2325
use Native\Laravel\GlobalShortcut as GlobalShortcutImplementation;
@@ -73,6 +75,10 @@ public function packageRegistered()
7375
return $app->make(PowerMonitorImplementation::class);
7476
});
7577

78+
$this->app->bind(QueueWorkerContract::class, function (Foundation $app) {
79+
return $app->make(QueueWorker::class);
80+
});
81+
7682
if (config('nativephp-internal.running')) {
7783
$this->app->singleton(
7884
\Illuminate\Contracts\Debug\ExceptionHandler::class,
@@ -112,6 +118,11 @@ protected function configureApp()
112118

113119
config(['session.driver' => 'file']);
114120
config(['queue.default' => 'database']);
121+
122+
// XXX: This logic may need to change when we ditch the internal web server
123+
if (! $this->app->runningInConsole()) {
124+
$this->fireUpQueueWorkers();
125+
}
115126
}
116127

117128
protected function rewriteStoragePath()
@@ -210,4 +221,13 @@ protected function configureDisks(): void
210221
]);
211222
}
212223
}
224+
225+
protected function fireUpQueueWorkers(): void
226+
{
227+
$queueConfigs = QueueConfig::fromConfigArray(config('nativephp.queue_workers'));
228+
229+
foreach ($queueConfigs as $queueConfig) {
230+
$this->app->make(QueueWorkerContract::class)->up($queueConfig);
231+
}
232+
}
213233
}

src/QueueWorker.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Native\Laravel;
4+
5+
use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
6+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
7+
use Native\Laravel\DTOs\QueueConfig;
8+
9+
class QueueWorker implements QueueWorkerContract
10+
{
11+
public function __construct(
12+
private readonly ChildProcessContract $childProcess,
13+
) {}
14+
15+
public function up(string|QueueConfig $config): void
16+
{
17+
if (is_string($config) && config()->has("nativephp.queue_workers.{$config}")) {
18+
$config = QueueConfig::fromConfigArray([
19+
$config => config("nativephp.queue_workers.{$config}"),
20+
])[0];
21+
}
22+
23+
if (! $config instanceof QueueConfig) {
24+
throw new \InvalidArgumentException("Invalid queue configuration alias [$config]");
25+
}
26+
27+
$this->childProcess->php(
28+
[
29+
'-d',
30+
"memory_limit={$config->memoryLimit}M",
31+
'artisan',
32+
'queue:work',
33+
"--name={$config->alias}",
34+
'--queue='.implode(',', $config->queuesToConsume),
35+
"--memory={$config->memoryLimit}",
36+
"--timeout={$config->timeout}",
37+
],
38+
$config->alias,
39+
persistent: true,
40+
);
41+
}
42+
43+
public function down(string $alias): void
44+
{
45+
$this->childProcess->stop($alias);
46+
}
47+
}

tests/DTOs/QueueWorkerTest.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
use Illuminate\Support\Arr;
4+
use Native\Laravel\DTOs\QueueConfig;
5+
6+
test('the factory method generates an array of config objects for several formats', function (array $config) {
7+
$configObject = QueueConfig::fromConfigArray($config);
8+
9+
expect($configObject)->toBeArray();
10+
expect($configObject)->toHaveCount(count($config));
11+
12+
foreach ($config as $alias => $worker) {
13+
if (is_string($worker)) {
14+
expect(
15+
Arr::first(
16+
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker))
17+
)->queuesToConsume->toBe(['default']
18+
);
19+
20+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->memoryLimit->toBe(128);
21+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->timeout->toBe(60);
22+
23+
continue;
24+
}
25+
26+
expect(
27+
Arr::first(
28+
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias))
29+
)->queuesToConsume->toBe($worker['queues'] ?? ['default']
30+
);
31+
32+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->memoryLimit->toBe($worker['memory_limit'] ?? 128);
33+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->timeout->toBe($worker['timeout'] ?? 60);
34+
}
35+
})->with([
36+
[
37+
'queue_workers' => [
38+
'some_worker' => [
39+
'queues' => ['default'],
40+
'memory_limit' => 64,
41+
'timeout' => 60,
42+
],
43+
],
44+
],
45+
[
46+
'queue_workers' => [
47+
'some_worker' => [],
48+
'another_worker' => [],
49+
],
50+
],
51+
[
52+
'queue_workers' => [
53+
'some_worker' => [
54+
],
55+
'another_worker' => [
56+
'queues' => ['default', 'another'],
57+
],
58+
'yet_another_worker' => [
59+
'memory_limit' => 256,
60+
],
61+
'one_more_worker' => [
62+
'timeout' => 120,
63+
],
64+
],
65+
],
66+
]);

0 commit comments

Comments
 (0)
0