From ba815049424c59e33a5647f95af1dc07c9cdc6bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Sat, 22 Apr 2023 02:04:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=98=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Abstracts/AsyncServer.php | 128 +++++++++++------ Abstracts/TraitServer.php | 95 +++++++------ Abstracts/WebSocketServer.php | 166 ++++++++++++++++++++++ CoroutineServer.php | 253 ---------------------------------- Server.php | 78 +++-------- ServerInterface.php | 8 -- Task/OnTaskFinish.php | 17 +++ Task/Task.php | 85 ++++++++++++ Task/TaskInterface.php | 17 +++ 9 files changed, 446 insertions(+), 401 deletions(-) create mode 100644 Abstracts/WebSocketServer.php delete mode 100644 CoroutineServer.php create mode 100644 Task/OnTaskFinish.php create mode 100644 Task/Task.php create mode 100644 Task/TaskInterface.php diff --git a/Abstracts/AsyncServer.php b/Abstracts/AsyncServer.php index a16d562..3ddb196 100644 --- a/Abstracts/AsyncServer.php +++ b/Abstracts/AsyncServer.php @@ -5,21 +5,21 @@ namespace Kiri\Server\Abstracts; use Exception; use Kiri; use Kiri\Abstracts\Config; -use Kiri\Server\Handler\OnServer; +use Kiri\Abstracts\Logger; use Kiri\Exception\ConfigException; +use Kiri\Exception\NotFindClassException; +use Kiri\Server\Config as SConfig; +use Kiri\Server\Constant; +use Kiri\Server\Events\OnServerBeforeStart; use Kiri\Server\Events\OnShutdown; +use Kiri\Server\Handler\OnServer; +use Kiri\Server\ServerInterface; +use Kiri\Server\Task\TaskInterface; +use Kiri\Server\Task\Task; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use ReflectionException; -use Kiri\Server\Config as SConfig; -use Kiri\Di\LocalService; use Swoole\Server; -use Kiri\Server\ServerInterface; -use Kiri\Server\Constant; -use Kiri\Events\EventDispatch; -use Kiri\Exception\NotFindClassException; -use Kiri\Server\Events\OnServerBeforeStart; -use Kiri\Abstracts\Logger; /** * @@ -33,46 +33,63 @@ class AsyncServer implements ServerInterface /** * @var Server|null */ - private Server|null $server = null; + private ?Server $server = null; /** * @param array $service * @param int $daemon * @return void - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFindClassException - * @throws NotFoundExceptionInterface * @throws Exception */ public function initCoreServers(array $service, int $daemon = 0): void { - $service = $this->genConfigService($service); - - $this->createBaseServer(array_shift($service), $daemon); - foreach ($service as $value) { - $this->addListener($value); - } - $rpcService = Config::get('rpc', []); - if (!empty($rpcService)) { - $this->addListener(instance(SConfig::class, [], $rpcService)); - } - - $processManager = Kiri::getDi()->get(ProcessManager::class); - $processManager->batch(Config::get('processes', [])); - - $this->onSignal(Config::get('signal', [])); + $this->listener($service, $daemon); + $this->initRpcListen(); + $this->initProcess(); + $this->onSignal(); } /** - * @param string $name - * @return Server|null + * @param array $service + * @param $daemon + * @return void + * @throws */ - public function getServer(string $name = ''): Server|null + private function listener(array $service, $daemon): void { - return $this->server; + $service = $this->genConfigService($service); + foreach ($service as $value) { + if (is_null($this->server)) { + $this->createBaseServer($value, $daemon); + } else { + $this->addListener($value); + } + } + } + + + /** + * @return void + * @throws Exception + */ + private function initRpcListen(): void + { + $rpcService = Config::get('rpc', []); + if (!empty($rpcService)) { + $this->addListener(instance(SConfig::class, [], $rpcService)); + } + } + + /** + * @return void + */ + private function initProcess(): void + { + foreach ($this->_process as $process) { + $this->server->addProcess($process); + } } @@ -85,8 +102,7 @@ class AsyncServer implements ServerInterface { $this->server->shutdown(); - $processManager = Kiri::getDi()->get(EventDispatch::class); - $processManager->dispatch(new OnShutdown()); + event(new OnShutdown()); return true; } @@ -106,6 +122,23 @@ class AsyncServer implements ServerInterface if (is_null($match)) { throw new NotFindClassException('Unknown server type ' . $config->type); } + $this->initServer($match, $config, $daemon); + $this->onEventListen($this->server, Config::get('server.events', [])); + $this->onEventListen($this->server, $config->events); + $this->onTaskListen(); + } + + + /** + * @param $match + * @param $config + * @param $daemon + * @return void + * @throws ConfigException + * @throws ReflectionException + */ + private function initServer($match, $config, $daemon): void + { $this->server = new $match($config->host, $config->port, $config->mode, $config->socket); $this->server->set($this->systemConfig($config, $daemon)); @@ -113,8 +146,23 @@ class AsyncServer implements ServerInterface if (!isset($config->events[Constant::SHUTDOWN])) { $config->events[Constant::SHUTDOWN] = [OnServer::class, 'onShutdown']; } - $this->onEventListen($this->server, Config::get('server.events', [])); - $this->onEventListen($this->server, $config->events); + Kiri::service()->set('server', $this->server); + } + + + /** + * @return void + * @throws + */ + private function onTaskListen(): void + { + if (!isset($this->server->setting[Constant::OPTION_TASK_WORKER_NUM])) { + return; + } + $container = Kiri::getDi(); + $task = $container->get(Task::class); + $container->set(TaskInterface::class, $task); + $task->initTaskWorker($this->server); } @@ -150,12 +198,11 @@ class AsyncServer implements ServerInterface throw new Exception('Listen port fail.' . swoole_last_error()); } - \Kiri::getLogger()->alert('Listen ' . $config->type . ' address ' . $config->host . '::' . $config->port); + Logger::_waring('Listen ' . $config->type . ' address ' . $config->host . '::' . $config->port); $port->set($this->resetSettings($config->type, $config->settings)); $this->onEventListen($port, $config->getEvents()); - Kiri::getDi()->get(LocalService::class)->set($config->getName(), $port); } /** @@ -167,7 +214,7 @@ class AsyncServer implements ServerInterface public function onSigint($no, array $signInfo): void { try { - \Kiri::getLogger()->alert('Pid ' . getmypid() . ' get signo ' . $no); + Logger::_alert('Pid ' . getmypid() . ' get signo ' . $no); $this->shutdown(); } catch (\Throwable $exception) { error($exception); @@ -179,6 +226,7 @@ class AsyncServer implements ServerInterface * @param string $type * @param array $settings * @return array + * @throws */ private function resetSettings(string $type, array $settings): array { diff --git a/Abstracts/TraitServer.php b/Abstracts/TraitServer.php index 25f68ce..d77137b 100644 --- a/Abstracts/TraitServer.php +++ b/Abstracts/TraitServer.php @@ -3,12 +3,11 @@ namespace Kiri\Server\Abstracts; use Exception; -use Kiri\Server\CoroutineServer; -use Psr\Container\ContainerExceptionInterface; -use Psr\Container\NotFoundExceptionInterface; +use Kiri; use ReflectionException; use Swoole\Coroutine; use Swoole\Http\Server as HServer; +use Swoole\Process; use Swoole\Server; use Kiri\Server\Constant; use Kiri\Server\Config; @@ -16,11 +15,11 @@ use Swoole\WebSocket\Server as WServer; trait TraitServer { - - + + private array $_process = []; - - + + /** * @param string|array|BaseProcess $class * @return void @@ -28,30 +27,48 @@ trait TraitServer */ public function addProcess(string|array|BaseProcess $class): void { - $container = \Kiri::getDi()->get(ProcessManager::class); - if (!is_array($class)) { $class = [$class]; } foreach ($class as $name) { - $container->add($name); + if (is_string($name)) { + $name = Kiri::getDi()->get($name); + } + if (isset($this->_process[$name->getName()])) { + throw new Exception('Process(' . $name->getName() . ') is exists.'); + } + $this->_process[$name->getName()] = $this->genProcess($name); } } - - + + + /** + * @param BaseProcess $name + * @return Process + */ + private function genProcess(BaseProcess $name): Process + { + return new Process(function (Process $process) use ($name) { + $process->name($name->getName()); + $name->onSigterm()->process($process); + }, + $name->getRedirectStdinAndStdout(), + $name->getPipeType(), + $name->isEnableCoroutine()); + } + + /** - * @param array $signal * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface * @throws Exception */ - public function onSignal(array $signal): void + public function onSignal(): void { + $signal = \Kiri\Abstracts\Config::get('signal', []); $this->onPcntlSignal(SIGINT, [$this, 'onSigint']); foreach ($signal as $sig => $value) { if (is_array($value) && is_string($value[0])) { - $value[0] = $this->container->get($value[0]); + $value[0] = \Kiri::getDi()->get($value[0]); } if (!is_callable($value, true)) { throw new Exception('Register signal callback must can exec.'); @@ -59,8 +76,8 @@ trait TraitServer $this->onPcntlSignal($sig, $value); } } - - + + /** * @param $signal * @param $callback @@ -68,19 +85,19 @@ trait TraitServer */ private function onPcntlSignal($signal, $callback): void { - if (get_called_class() != CoroutineServer::class) { - pcntl_signal(SIGINT, [$this, 'onSigint']); - } else { - Coroutine::create(static function () use ($signal, $callback) { - $data = Coroutine::waitSignal($signal); - if ($data) { - $callback($signal, [true]); - } - }); - } +// if (get_called_class() != CoroutineServer::class) { + pcntl_signal(SIGINT, [$this, 'onSigint']); +// } else { +// Coroutine::create(static function () use ($signal, $callback) { +// $data = Coroutine::waitSignal($signal); +// if ($data) { +// $callback($signal, [true]); +// } +// }); +// } } - - + + /** * @return array */ @@ -88,8 +105,8 @@ trait TraitServer { return $this->_process; } - - + + /** * @param array $ports * @return array @@ -139,8 +156,8 @@ trait TraitServer } return $array; } - - + + /** * @param $type * @return string|null @@ -155,8 +172,8 @@ trait TraitServer default => null }; } - - + + /** * @param $type * @return string|null @@ -169,6 +186,6 @@ trait TraitServer default => null }; } - - + + } diff --git a/Abstracts/WebSocketServer.php b/Abstracts/WebSocketServer.php new file mode 100644 index 0000000..cca8285 --- /dev/null +++ b/Abstracts/WebSocketServer.php @@ -0,0 +1,166 @@ +has('server')) { + $server = new \Swoole\WebSocket\Server($this->host, $this->port, SWOOLE_PROCESS); + $application->set('server', $server); + } else { + $server = $application->get('server'); + } + + $socket = $server->addlistener($this->host, $this->port, $this->socket_type); + $socket->set($this->settings); + $socket->on('handshake', [$this, 'onHandshake']); + $socket->on('message', [$this, 'onMessage']); + $socket->on('disconnect', [$this, 'onDisconnect']); + + $this->collector = \Kiri::getDi()->get(DataGrip::class)->get('wss'); + + $this->handler = $this->collector->query('/', 'GET'); + } + + + /** + * @param Request $request + * @param Response $response + * @return void + * @throws Exception + */ + public function onHandshake(Request $request, Response $response): void + { + // TODO: Implement onHandshake() method. + $secWebSocketKey = $request->header['sec-websocket-key']; + $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#'; + if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) { + throw new Exception('protocol error.', 500); + } + $key = base64_encode(sha1($request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', TRUE)); + $headers = [ + 'Upgrade' => 'websocket', + 'Connection' => 'Upgrade', + 'Sec-websocket-Accept' => $key, + 'Sec-websocket-Version' => '13', + ]; + if (isset($request->header['sec-websocket-protocol'])) { + $explode = explode(',', $request->header['sec-websocket-protocol']); + $headers['Sec-websocket-Protocol'] = $explode[0]; + } + foreach ($headers as $key => $val) { + $response->setHeader($key, $val); + } + if ($this->handler->implement(OnHandshakeInterface::class)) { + $handler = $this->handler->getClass(); + + call_user_func([$handler, 'onHandshake'], $request, $response); + } else { + $response->setStatusCode(101); + $response->end(); + } + } + + + /** + * @param Server $server + * @param Frame $frame + * @return void + */ + public function onMessage(Server $server, Frame $frame): void + { + // TODO: Implement onMessage() method. + if (!$this->handler->implement(OnMessageInterface::class)) { + return; + } + + $handler = $this->handler->getClass(); + + call_user_func([$handler, 'onMessage'], $server, $frame); + } + + + /** + * @param \Swoole\WebSocket\Server $server + * @param int $fd + * @return void + */ + public function onDisconnect(\Swoole\WebSocket\Server $server, int $fd): void + { + // TODO: Implement onDisconnect() method. + if (!$this->handler->implement(OnDisconnectInterface::class)) { + return; + } + + $handler = $this->handler->getClass(); + + call_user_func([$handler, 'onDisconnect'], $server, $fd); + } + + + /** + * @param Server $server + * @param int $fd + * @return void + */ + public function onClose(Server $server, int $fd): void + { + // TODO: Implement onDisconnect() method. + if (!$this->handler->implement(OnCloseInterface::class)) { + return; + } + + $handler = $this->handler->getClass(); + + call_user_func([$handler, 'onClose'], $server, $fd); + } + +} diff --git a/CoroutineServer.php b/CoroutineServer.php deleted file mode 100644 index d8ef78c..0000000 --- a/CoroutineServer.php +++ /dev/null @@ -1,253 +0,0 @@ - */ - private array $servers = []; - - - /** - * @var Server|null - */ - private Server|null $server = null; - - - /** - * @param Config $config - * @param ContainerInterface $container - * @param EventDispatch $dispatch - * @param LoggerInterface $logger - * @param ProcessManager $processManager - */ - public function __construct(#[Container(Config::class)] public Config $config, - #[Container(ContainerInterface::class)] public ContainerInterface $container, - #[Container(EventDispatch::class)] public EventDispatch $dispatch, - #[Container(LoggerInterface::class)] public LoggerInterface $logger, - #[Container(ProcessManager::class)] public ProcessManager $processManager) - { - } - - - /** - * @param array $service - * @param int $daemon - * @return void - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws Exception - */ - public function initCoreServers(array $service, int $daemon = 0): void - { - $service = $this->genConfigService($service); - foreach ($service as $value) { - if ($value->getType() == Constant::SERVER_TYPE_HTTP) { - $this->addListener($value); - } - } - $rpcService = Config::get('rpc', []); - if (!empty($rpcService)) { - $this->addListener(instance(SConfig::class, [], $rpcService)); - } - - \Kiri::service()->set('server', $this); - - $processManager = \Kiri::getDi()->get(ProcessManager::class); - $processManager->batch(Config::get('processes', [])); - } - - - /** - * @param SConfig $config - * @throws ReflectionException - */ - public function addListener(SConfig $config): void - { - $server = new SchServer($config->getHost(), $config->getPort(), false, true); - - $events = $config->getEvents()[Constant::REQUEST] ?? null; - if (is_null($events)) { - $events = [\Kiri\Router\Server::class, 'onRequest']; - } - - $events[0] = \Kiri::getDi()->get($events[0]); - $server->handle('/', $events); - - $this->servers[] = $server; - } - - - /** - * @param string $name - * @return ScServer|SchServer|null - */ - public function getServer(string $name = ''): ScServer|SchServer|null - { - return $this->servers[$name] ?? null; - } - - - /** - * @return bool - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function shutdown(): bool - { - foreach ($this->servers as $server) { - $server->shutdown(); - } - - $dispatch = \Kiri::getDi()->get(EventDispatch::class); - $dispatch->dispatch(new OnShutdown()); - - return true; - } - - - /** - * @param $no - * @param array $signInfo - * @return void - */ - public function onSigint($no, array $signInfo): void - { - try { - \Kiri::getLogger()->alert('Pid ' . getmypid() . ' get signo ' . $no); - $this->shutdown(); - } catch (\Throwable $exception) { - error($exception); - } - } - - - /** - * @param Server\Port|Server $base - * @param array $events - * @return void - * @throws ReflectionException - */ - private function onEventListen(Server\Port|Server $base, array $events): void - { - foreach ($events as $name => $event) { - if (is_array($event) && is_string($event[0])) { - $event[0] = \Kiri::getDi()->get($event[0]); - } - $base->on($name, $event); - } - } - - - /** - * @return void - */ - public function start(): void - { - Coroutine\run(function () { - $dispatch = \Kiri::getDi()->get(EventDispatch::class); - $dispatch->dispatch(new OnServerBeforeStart()); - - $this->onSignal(Config::get('signal', [])); - - $this->onTasker(); - foreach ($this->servers as $server) { - Coroutine::create(static function () use ($server) { - -// $this->dispatch->dispatch(new OnWorkerStart($server, 0)); - - $server->start(); - -// $this->dispatch->dispatch(new OnWorkerExit($server, 0)); - }); - } - }); - } - - - private Coroutine\Channel $channel; - - /** - * @return void - * @throws ReflectionException - */ - private function onTasker(): void - { - $config = Config::get('server.settings', []); - - if (isset($config[Constant::OPTION_TASK_WORKER_NUM])) { - if ($config[Constant::OPTION_TASK_WORKER_NUM] < 1) { - return; - } - } - - $taskEvents = $config['events'][Constant::TASK] ?? null; - $finishEvents = $config['events'][Constant::FINISH] ?? null; - - if (is_null($taskEvents)) { - return; - } - - $container = \Kiri::getDi(); - $taskEvents[0] = $container->get($taskEvents[0]); - if (!is_null($finishEvents)) { - $finishEvents[0] = $container->get($finishEvents[0]); - } - - $this->channel = new Coroutine\Channel($config[Constant::OPTION_TASK_WORKER_NUM]); - for ($i = 0; $i < $config[Constant::OPTION_TASK_WORKER_NUM]; $i++) { - Coroutine::create(static fn() => $this->taskRunner($i, $taskEvents, $finishEvents)); - } - } - - - /** - * @param $taskId - * @param $callback - * @param $finishEvents - * @return void - */ - private function taskRunner($taskId, $callback, $finishEvents): void - { - $taskData = $this->channel->pop(); - if (!is_null($taskData)) { - $result = $callback($taskId, $taskData); - if (is_callable($finishEvents, true)) { - $finishEvents($taskId, $result); - } - } - $this->taskRunner($taskId, $callback, $finishEvents); - } - -} diff --git a/Server.php b/Server.php index 44b47d4..59525a4 100644 --- a/Server.php +++ b/Server.php @@ -7,21 +7,18 @@ use Exception; use Kiri; use Kiri\Abstracts\Config; use Kiri\Events\EventDispatch; -use Kiri\Events\EventProvider; use Kiri\Exception\ConfigException; use Kiri\Router\Router; -use Kiri\Server\Events\OnShutdown; -use Kiri\Server\Events\OnWorkerStart; -use Kiri\Server\Events\OnTaskerStart; -use Psr\Container\ContainerExceptionInterface; -use Psr\Container\ContainerInterface; -use Psr\Container\NotFoundExceptionInterface; -use Kiri\Server\Events\OnWorkerStop; -use ReflectionException; -use Swoole\Coroutine; use Kiri\Server\Abstracts\ProcessManager; +use Kiri\Server\Events\OnShutdown; +use Kiri\Server\Events\OnTaskerStart; +use Kiri\Server\Events\OnWorkerStart; +use Kiri\Server\Events\OnWorkerStop; use Kiri\Server\Abstracts\AsyncServer; -use Kiri\Di\Inject\Container; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; +use ReflectionException; +use Swoole\Timer; defined('PID_PATH') or define('PID_PATH', APP_PATH . 'storage/server.pid'); @@ -49,42 +46,21 @@ class Server /** - * @return AsyncServer|CoroutineServer * @throws ReflectionException */ - private function manager(): AsyncServer|CoroutineServer + private function manager(): AsyncServer { return Kiri::getDi()->get($this->class); } - /** - * @return void - */ - public function init(): void - { - $enable_coroutine = Config::get('server.settings.enable_coroutine', false); - if (!$enable_coroutine) { - return; - } - Coroutine::set([ - 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, - 'enable_deadlock_check' => FALSE, - 'exit_condition' => function () { - return Coroutine::stats()['coroutine_num'] === 0; - } - ]); - } - - /** * @param $process * @throws Exception */ public function addProcess($process): void { - $manager = Kiri::getDi()->get(ProcessManager::class); - $manager->add($process); + $this->manager()->addProcess($process); } @@ -97,29 +73,21 @@ class Server */ public function start(): void { - $this->onHotReload(); + on(OnWorkerStop::class, [Timer::class, 'clearAll'], 9999); + on(OnWorkerStart::class, [$this, 'setWorkerName']); + on(OnTaskerStart::class, [$this, 'setTaskerName']); + + $manager = Kiri::getDi()->get(Router::class); + $manager->scan_build_route(); + $manager = $this->manager(); $manager->initCoreServers(Config::get('server', [], true), $this->daemon); $manager->start(); } - /** - * @return void - * @throws Exception - */ - protected function onWorkerListener(): void - { - $manager = Kiri::getDi()->get(EventProvider::class); - $manager->on(OnWorkerStop::class, '\Swoole\Timer::clearAll', 9999); - $manager->on(OnWorkerStart::class, [$this, 'setWorkerName']); - $manager->on(OnTaskerStart::class, [$this, 'setTaskerName']); - } - - /** * @param OnWorkerStart $onWorkerStart - * @throws ConfigException */ public function setWorkerName(OnWorkerStart $onWorkerStart): void { @@ -148,18 +116,6 @@ class Server } - /** - * @return void - * @throws Exception - */ - public function onHotReload(): void - { - $this->onWorkerListener(); - $manager = Kiri::getDi()->get(Router::class); - $manager->scan_build_route(); - } - - /** * @return void * @throws ContainerExceptionInterface diff --git a/ServerInterface.php b/ServerInterface.php index e371bbb..c4436b9 100644 --- a/ServerInterface.php +++ b/ServerInterface.php @@ -11,14 +11,6 @@ use Swoole\Server; interface ServerInterface { - - /** - * @param string $name - * @return Server|\Swoole\Coroutine\Server|\Swoole\Coroutine\Http\Server|null - */ - public function getServer(string $name = ''): Server|\Swoole\Coroutine\Server|\Swoole\Coroutine\Http\Server|null; - - /** * @param array $service * @param int $daemon diff --git a/Task/OnTaskFinish.php b/Task/OnTaskFinish.php new file mode 100644 index 0000000..e240b45 --- /dev/null +++ b/Task/OnTaskFinish.php @@ -0,0 +1,17 @@ +setting[Constant::OPTION_TASK_WORKER_NUM])) { + return; + } + if ($server->setting[Constant::OPTION_TASK_WORKER_NUM] < 1) { + return; + } + $server->on('finish', [$this, 'onFinish']); + $server->on('task', [$this, 'onTask']); + } + + + /** + * @param Server $server + * @param int $task_id + * @param mixed $data + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws ReflectionException + */ + private function onFinish(Server $server, int $task_id, mixed $data): void + { + event(new OnTaskFinish($task_id, $data)); + } + + + /** + * @param Server $server + * @param int $task_id + * @param int $src_worker_id + * @param mixed $data + * @return mixed + * @throws ReflectionException + */ + private function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): mixed + { + $data = json_decode($data, true); + if (is_null($data)) { + return null; + } + $data[0] = \Kiri::getDi()->get($data[0]); + return call_user_func($data, $task_id, $src_worker_id); + } + + + /** + * @param array $handler + * @param int|null $workerId + * @return void + * @throws ReflectionException + */ + public function dispatch(array $handler, ?int $workerId = null): void + { + /** @var Server $server */ + $server = \Kiri::service()->get('server'); + if (is_null($workerId)) { + $worker = $server->setting[Constant::OPTION_TASK_WORKER_NUM]; + $workerId = rand(0, $worker); + } + $server->task(json_encode($handler), $workerId); + } + + +} diff --git a/Task/TaskInterface.php b/Task/TaskInterface.php new file mode 100644 index 0000000..333f35e --- /dev/null +++ b/Task/TaskInterface.php @@ -0,0 +1,17 @@ +