diff --git a/Broadcast.php b/Broadcast/Broadcast.php similarity index 69% rename from Broadcast.php rename to Broadcast/Broadcast.php index c484a60..68a9fb9 100644 --- a/Broadcast.php +++ b/Broadcast/Broadcast.php @@ -1,8 +1,10 @@ setting['worker_num'] + $server->setting['task_worker_num']; for ($i = 0; $i < $total; $i++) { - $server->sendMessage($message, $i); + $server->sendMessage(serialize(new Message($message)), $i); } } diff --git a/Broadcast/Message.php b/Broadcast/Message.php new file mode 100644 index 0000000..e39bd0e --- /dev/null +++ b/Broadcast/Message.php @@ -0,0 +1,29 @@ +data); + } + +} diff --git a/Handler/OnPipeMessage.php b/Handler/OnPipeMessage.php index 65bd07a..2209d72 100644 --- a/Handler/OnPipeMessage.php +++ b/Handler/OnPipeMessage.php @@ -2,11 +2,11 @@ namespace Kiri\Server\Handler; -use Kiri\Annotation\Inject; -use Kiri\Server\Abstracts\Server; use Exception; -use Kiri\Server\Contract\OnPipeMessageInterface; +use Kiri\Annotation\Inject; use Kiri\Events\EventDispatch; +use Kiri\Server\Abstracts\Server; +use Kiri\Server\Contract\OnPipeMessageInterface; /** * @@ -15,7 +15,7 @@ class OnPipeMessage extends Server { - /** @var EventDispatch */ + /** @var EventDispatch */ #[Inject(EventDispatch::class)] public EventDispatch $eventDispatch; @@ -28,6 +28,9 @@ class OnPipeMessage extends Server */ public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, mixed $message) { + if (is_string($message)) { + $message = unserialize($message); + } if (!is_object($message) || !($message instanceof OnPipeMessageInterface)) { return; } diff --git a/ProcessManager.php b/ProcessManager.php index ed687ce..f9fb2d6 100644 --- a/ProcessManager.php +++ b/ProcessManager.php @@ -7,6 +7,7 @@ use Kiri\Context; use Kiri\Exception\ConfigException; use Kiri\Kiri; use Kiri\Server\Abstracts\BaseProcess; +use Kiri\Server\Broadcast\Message; use Kiri\Server\Contract\OnProcessInterface; use Swoole\Coroutine; use Swoole\Process; @@ -48,13 +49,16 @@ class ProcessManager if (Kiri::getPlatform()->isLinux()) { $process->name($system . '(' . $customProcess->getName() . ')'); } - $channel = Coroutine::create(function () use ($process, $customProcess) { while (!$customProcess->isStop()) { $message = $process->read(); if (!empty($message)) { - $customProcess->onBroadcast($message); + $message = unserialize($message); } + if (is_null($message)) { + continue; + } + $customProcess->onBroadcast($message); } }); Context::setContext('waite:process:message', $channel); @@ -93,7 +97,7 @@ class ProcessManager $processes = [$this->_process[$name]]; } foreach ($processes as $process) { - $process->write($message); + $process->write(serialize(new Message($message))); } } diff --git a/ServerManager.php b/ServerManager.php index 2f48085..df8d968 100644 --- a/ServerManager.php +++ b/ServerManager.php @@ -6,19 +6,15 @@ use Exception; use Kiri\Abstracts\Component; use Kiri\Abstracts\Config; use Kiri\Annotation\Inject; -use Kiri\Context; use Kiri\Error\Logger; use Kiri\Events\EventDispatch; use Kiri\Exception\ConfigException; -use Kiri\Kiri; -use Kiri\Server\Abstracts\BaseProcess; use Kiri\Server\Contract\OnCloseInterface; use Kiri\Server\Contract\OnConnectInterface; use Kiri\Server\Contract\OnDisconnectInterface; use Kiri\Server\Contract\OnHandshakeInterface; use Kiri\Server\Contract\OnMessageInterface; use Kiri\Server\Contract\OnPacketInterface; -use Kiri\Server\Contract\OnProcessInterface; use Kiri\Server\Contract\OnReceiveInterface; use Kiri\Server\Events\OnServerBeforeStart; use Kiri\Server\Handler\OnPipeMessage; @@ -30,7 +26,6 @@ use Kiri\Server\Tasker\OnServerTask; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use ReflectionException; -use Swoole\Coroutine; use Swoole\Http\Server as HServer; use Swoole\Process; use Swoole\Server; @@ -282,6 +277,8 @@ class ServerManager extends Component $this->server = new $match($host, $port, SWOOLE_PROCESS, $mode); $this->server->set(array_merge(Config::get('server.settings', []), $settings['settings'])); + $this->container->setBindings(SwooleServerInterface::class, $this->server); + $id = Config::get('id', 'system-service'); $this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port)); @@ -300,7 +297,6 @@ class ServerManager extends Component if (($this->server->setting['task_worker_num'] ?? 0) > 0) { $this->addTaskListener($settings['events']); } - $this->container->setBindings(SwooleServerInterface::class, $this->server); $this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server); if (!empty($settings['events']) && is_array($settings['events'])) { $this->addServiceEvents($settings['events'], $this->server);