Revert "改名"

This reverts commit fdf58326
This commit is contained in:
2022-01-12 11:29:50 +08:00
parent 33a01eda28
commit 97dc2f559b
5 changed files with 49 additions and 15 deletions
+4 -2
View File
@@ -1,8 +1,10 @@
<?php
namespace Kiri\Server;
namespace Kiri\Server\Broadcast;
use Kiri\Kiri;
use Kiri\Server\ProcessManager;
use Kiri\Server\SwooleServerInterface;
class Broadcast
{
@@ -21,7 +23,7 @@ class Broadcast
$total = $server->setting['worker_num'] + $server->setting['task_worker_num'];
for ($i = 0; $i < $total; $i++) {
$server->sendMessage($message, $i);
$server->sendMessage(serialize(new Message($message)), $i);
}
}
+29
View File
@@ -0,0 +1,29 @@
<?php
namespace Kiri\Server\Broadcast;
use Kiri\Server\Contract\OnPipeMessageInterface;
class Message implements OnPipeMessageInterface
{
/**
* @param mixed $data
*/
public function __construct(public mixed $data)
{
}
/**
* @return void
*/
public function process(): void
{
$workerId = func_get_args()[1];
var_dump($workerId, $this->data);
}
}
+7 -4
View File
@@ -2,11 +2,11 @@
namespace Kiri\Server\Handler;
use Kiri\Annotation\Inject;
use Kiri\Server\Abstracts\Server;
use Exception;
use Kiri\Server\Contract\OnPipeMessageInterface;
use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Server\Abstracts\Server;
use Kiri\Server\Contract\OnPipeMessageInterface;
/**
*
@@ -15,7 +15,7 @@ class OnPipeMessage extends Server
{
/** @var EventDispatch */
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
@@ -28,6 +28,9 @@ class OnPipeMessage extends Server
*/
public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, mixed $message)
{
if (is_string($message)) {
$message = unserialize($message);
}
if (!is_object($message) || !($message instanceof OnPipeMessageInterface)) {
return;
}
+7 -3
View File
@@ -7,6 +7,7 @@ use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\Broadcast\Message;
use Kiri\Server\Contract\OnProcessInterface;
use Swoole\Coroutine;
use Swoole\Process;
@@ -48,13 +49,16 @@ class ProcessManager
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
$channel = Coroutine::create(function () use ($process, $customProcess) {
while (!$customProcess->isStop()) {
$message = $process->read();
if (!empty($message)) {
$customProcess->onBroadcast($message);
$message = unserialize($message);
}
if (is_null($message)) {
continue;
}
$customProcess->onBroadcast($message);
}
});
Context::setContext('waite:process:message', $channel);
@@ -93,7 +97,7 @@ class ProcessManager
$processes = [$this->_process[$name]];
}
foreach ($processes as $process) {
$process->write($message);
$process->write(serialize(new Message($message)));
}
}
+2 -6
View File
@@ -6,19 +6,15 @@ use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Context;
use Kiri\Error\Logger;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnDisconnectInterface;
use Kiri\Server\Contract\OnHandshakeInterface;
use Kiri\Server\Contract\OnMessageInterface;
use Kiri\Server\Contract\OnPacketInterface;
use Kiri\Server\Contract\OnProcessInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Events\OnServerBeforeStart;
use Kiri\Server\Handler\OnPipeMessage;
@@ -30,7 +26,6 @@ use Kiri\Server\Tasker\OnServerTask;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
@@ -282,6 +277,8 @@ class ServerManager extends Component
$this->server = new $match($host, $port, SWOOLE_PROCESS, $mode);
$this->server->set(array_merge(Config::get('server.settings', []), $settings['settings']));
$this->container->setBindings(SwooleServerInterface::class, $this->server);
$id = Config::get('id', 'system-service');
$this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port));
@@ -300,7 +297,6 @@ class ServerManager extends Component
if (($this->server->setting['task_worker_num'] ?? 0) > 0) {
$this->addTaskListener($settings['events']);
}
$this->container->setBindings(SwooleServerInterface::class, $this->server);
$this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server);
if (!empty($settings['events']) && is_array($settings['events'])) {
$this->addServiceEvents($settings['events'], $this->server);