Compare commits

...

26 Commits

Author SHA1 Message Date
as2252258 14db451313 改名 2021-12-02 18:24:23 +08:00
as2252258 ced92a10e1 改名 2021-12-02 14:16:57 +08:00
as2252258 0d12baf49f 改名 2021-12-01 19:04:59 +08:00
as2252258 b798270bb7 改名 2021-12-01 17:21:15 +08:00
as2252258 955f358577 改名 2021-12-01 16:09:49 +08:00
as2252258 97a5deda23 改名 2021-12-01 15:28:59 +08:00
as2252258 8ae395e9cd 改名 2021-12-01 15:18:34 +08:00
as2252258 44a9a507f6 改名 2021-12-01 15:16:08 +08:00
as2252258 903ee0d6ce 改名 2021-11-30 16:00:39 +08:00
as2252258 a36943cd2d 改名 2021-11-30 15:10:00 +08:00
as2252258 26e0ce7778 改名 2021-11-30 14:59:51 +08:00
as2252258 5a9f9da347 改名 2021-11-30 14:43:55 +08:00
as2252258 a1bf157408 改名 2021-11-30 14:34:15 +08:00
as2252258 27834d37ed 改名 2021-11-30 14:32:56 +08:00
as2252258 c260ac6df4 1 2021-11-27 17:43:28 +08:00
as2252258 7479bed4e4 改名 2021-11-19 17:37:50 +08:00
as2252258 39394f76d5 改名 2021-11-19 14:46:07 +08:00
as2252258 065555a80e 改名 2021-11-18 18:06:45 +08:00
as2252258 13a8ca73bd 改名 2021-11-18 18:01:03 +08:00
as2252258 8456e1b7d5 改名 2021-11-18 17:05:00 +08:00
as2252258 82b5650c8a 改名 2021-11-18 16:59:45 +08:00
as2252258 03601f2db6 改名 2021-11-18 16:54:15 +08:00
as2252258 a7082b76fd 改名 2021-11-18 16:40:57 +08:00
as2252258 e872e04da1 改名 2021-11-18 15:59:46 +08:00
as2252258 370b255862 改名 2021-11-18 15:57:37 +08:00
as2252258 d4b5bed960 改名 2021-11-18 15:37:10 +08:00
16 changed files with 265 additions and 232 deletions
-60
View File
@@ -80,64 +80,4 @@ abstract class BaseProcess implements OnProcessInterface
} }
/**
* @return bool
*/
public function checkProcessIsStop(): bool
{
return $this->isStop === true;
}
/**
* @param Process $process
*/
public function signListen(Process $process): void
{
}
/**
*
*/
protected function exit(): void
{
putenv('process.status=idle');
}
/**
* @return bool
*/
#[Pure] public function isWorking(): bool
{
return env('process.status', 'working') == 'working';
}
/**
*
*/
private function waiteExit(Process $process): void
{
$this->onProcessStop();
while ($this->isWorking()) {
$this->sleep();
}
$process->exit(0);
}
/**
*
*/
private function sleep(): void
{
if ($this->enable_coroutine) {
Coroutine::sleep(0.1);
} else {
usleep(100);
}
}
} }
+1 -1
View File
@@ -4,7 +4,7 @@
namespace Server\Abstracts; namespace Server\Abstracts;
use Annotation\Inject; use Note\Inject;
use Exception; use Exception;
use Kiri\Abstracts\Config; use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
-12
View File
@@ -1,12 +0,0 @@
<?php
namespace Server\Contract;
use Swoole\Http\Response;
interface OnDownloadInterface
{
public function dispatch(Response $response);
}
+1 -19
View File
@@ -17,27 +17,9 @@ interface OnProcessInterface
/** /**
* @param Process $process * @param Process $process
* @return string
*/ */
public function getProcessName(Process $process): string; public function process(Process $process): void;
/**
* @param Process $process
*/
public function signListen(Process $process): void;
/**
* @param Process $process
*/
public function onHandler(Process $process): void;
/**
*
*/
public function onProcessStop(): void;
} }
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Server\Events;
class OnServerBeforeStart
{
}
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler; namespace Server\Handler;
use Annotation\Inject; use Note\Inject;
use Server\Abstracts\Server; use Server\Abstracts\Server;
use Exception; use Exception;
use Server\Contract\OnPipeMessageInterface; use Server\Contract\OnPipeMessageInterface;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler; namespace Server\Handler;
use Annotation\Inject; use Note\Inject;
use Kiri\Events\EventDispatch; use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Server\Abstracts\Server; use Server\Abstracts\Server;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler; namespace Server\Handler;
use Annotation\Inject; use Note\Inject;
use Kiri\Events\EventDispatch; use Kiri\Events\EventDispatch;
use Server\Abstracts\Server; use Server\Abstracts\Server;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler; namespace Server\Handler;
use Annotation\Inject; use Note\Inject;
use Kiri\Events\EventDispatch; use Kiri\Events\EventDispatch;
use Server\Events\OnAfterReload; use Server\Events\OnAfterReload;
use Server\Events\OnBeforeReload; use Server\Events\OnBeforeReload;
+1 -3
View File
@@ -2,13 +2,12 @@
namespace Server\Handler; namespace Server\Handler;
use Annotation\Inject; use Note\Inject;
use Exception; use Exception;
use Kiri\Abstracts\Config; use Kiri\Abstracts\Config;
use Kiri\Core\Help; use Kiri\Core\Help;
use Kiri\Events\EventDispatch; use Kiri\Events\EventDispatch;
use Kiri\Kiri; use Kiri\Kiri;
use Kiri\Runtime;
use Server\Events\OnAfterWorkerStart; use Server\Events\OnAfterWorkerStart;
use Server\Events\OnBeforeWorkerStart; use Server\Events\OnBeforeWorkerStart;
use Server\Events\OnTaskerStart as OnTaskStart; use Server\Events\OnTaskerStart as OnTaskStart;
@@ -16,7 +15,6 @@ use Server\Events\OnWorkerError;
use Server\Events\OnWorkerExit; use Server\Events\OnWorkerExit;
use Server\Events\OnWorkerStart; use Server\Events\OnWorkerStart;
use Server\Events\OnWorkerStop; use Server\Events\OnWorkerStop;
use Server\ServerManager;
use Swoole\Server; use Swoole\Server;
use Swoole\Timer; use Swoole\Timer;
+17 -7
View File
@@ -3,14 +3,15 @@
namespace Server; namespace Server;
use Annotation\Inject;
use Exception; use Exception;
use Http\Handler\Abstracts\HttpService; use Http\Handler\Abstracts\HttpService;
use JetBrains\PhpStorm\Pure; use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Config; use Kiri\Abstracts\Config;
use Kiri\Error\LoggerProcess;
use Kiri\Events\EventDispatch; use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Note\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Server\Events\OnShutdown; use Server\Events\OnShutdown;
@@ -42,6 +43,13 @@ class Server extends HttpService
public EventDispatch $eventDispatch; public EventDispatch $eventDispatch;
/**
* @var State
*/
#[Inject(State::class)]
public State $state;
/** /**
* *
*/ */
@@ -60,10 +68,11 @@ class Server extends HttpService
/** /**
* @return string start server * @return string
*
* start server
* @throws ConfigException * @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws Exception * @throws Exception
*/ */
public function start(): string public function start(): string
@@ -95,7 +104,7 @@ class Server extends HttpService
{ {
$configs = Config::get('server', [], true); $configs = Config::get('server', [], true);
foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) { foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) {
$this->manager->stopServer($config['port']); $this->state->exit($config['port']);
} }
$this->eventDispatch->dispatch(new OnShutdown()); $this->eventDispatch->dispatch(new OnShutdown());
} }
@@ -104,10 +113,11 @@ class Server extends HttpService
/** /**
* @return bool * @return bool
* @throws ConfigException * @throws ConfigException
* @throws Exception
*/ */
public function isRunner(): bool public function isRunner(): bool
{ {
return $this->manager->isRunner(); return $this->state->isRunner();
} }
+8 -7
View File
@@ -4,12 +4,13 @@ declare(strict_types=1);
namespace Server; namespace Server;
use Annotation\Inject; use Note\Inject;
use Exception; use Exception;
use Kiri\Abstracts\Config; use Kiri\Abstracts\Config;
use Kiri\Events\EventProvider; use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Kiri\Kiri; use Kiri\Kiri;
use Server\Events\OnServerBeforeStart;
use Swoole\Coroutine; use Swoole\Coroutine;
use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputArgument;
@@ -29,10 +30,10 @@ class ServerCommand extends Command
/** /**
* @var EventProvider * @var EventDispatch
*/ */
#[Inject(EventProvider::class)] #[Inject(EventDispatch::class)]
public EventProvider $eventProvider; public EventDispatch $eventProvider;
/** /**
@@ -68,7 +69,7 @@ class ServerCommand extends Command
} }
$manager->shutdown(); $manager->shutdown();
if ($input->getArgument('action') == 'stop') { if ($input->getArgument('action') == 'stop') {
throw new Exception('shutdown success'); return 0;
} }
return $this->generate_runtime_builder($manager); return $this->generate_runtime_builder($manager);
} }
@@ -104,7 +105,7 @@ class ServerCommand extends Command
$this->configure_set(); $this->configure_set();
Kiri::app()->getRouter()->read_files(); Kiri::app()->getRouter()->read_files();
$manager->start(); $manager->start();
return 1; return 1;
+61 -107
View File
@@ -2,22 +2,19 @@
namespace Server; namespace Server;
use Annotation\Inject;
use Closure;
use Exception; use Exception;
use Kiri\Abstracts\Config; use Kiri\Abstracts\Config;
use Kiri\Di\ContainerInterface; use Kiri\Di\Container;
use Kiri\Error\Logger; use Kiri\Error\Logger;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Kiri\Kiri; use Kiri\Kiri;
use Note\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException; use ReflectionException;
use Server\Abstracts\BaseProcess; use Server\Abstracts\BaseProcess;
use Server\Handler\OnPipeMessage;
use Server\Handler\OnServer;
use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
use Server\Handler\OnServerTask;
use Server\Handler\OnServerWorker;
use Server\Contract\OnCloseInterface; use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface; use Server\Contract\OnConnectInterface;
use Server\Contract\OnDisconnectInterface; use Server\Contract\OnDisconnectInterface;
@@ -26,7 +23,13 @@ use Server\Contract\OnMessageInterface;
use Server\Contract\OnPacketInterface; use Server\Contract\OnPacketInterface;
use Server\Contract\OnProcessInterface; use Server\Contract\OnProcessInterface;
use Server\Contract\OnReceiveInterface; use Server\Contract\OnReceiveInterface;
use Server\Contract\OnTaskInterface; use Server\Events\OnServerBeforeStart;
use Server\Handler\OnPipeMessage;
use Server\Handler\OnServer;
use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
use Server\Handler\OnServerWorker;
use Server\Tasker\OnServerTask;
use Swoole\Http\Server as HServer; use Swoole\Http\Server as HServer;
use Swoole\Process; use Swoole\Process;
use Swoole\Server; use Swoole\Server;
@@ -50,10 +53,20 @@ class ServerManager
public int $port = 0; public int $port = 0;
/**
* @var Logger
*/
#[Inject(Logger::class)] #[Inject(Logger::class)]
public Logger $logger; public Logger $logger;
/**
* @var State
*/
#[Inject(State::class)]
public State $state;
/** @var array<string,Port> */ /** @var array<string,Port> */
public array $ports = []; public array $ports = [];
@@ -64,7 +77,7 @@ class ServerManager
/** /**
* @var ContainerInterface * @var Container
*/ */
#[Inject(ContainerInterface::class)] #[Inject(ContainerInterface::class)]
public ContainerInterface $container; public ContainerInterface $container;
@@ -98,9 +111,11 @@ class ServerManager
/** /**
* @return Server|WServer|HServer|null * @return Server|WServer|HServer|null
* @throws ReflectionException
*/ */
public function getServer(): Server|WServer|HServer|null public function getServer(): Server|WServer|HServer|null
{ {
di(EventDispatch::class)->dispatch(new OnServerBeforeStart());
return $this->server; return $this->server;
} }
@@ -111,9 +126,10 @@ class ServerManager
* @param int $port * @param int $port
* @param int $mode * @param int $mode
* @param array $settings * @param array $settings
* @throws ReflectionException
* @throws ConfigException * @throws ConfigException
* @throws Exception * @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/ */
public function addListener(string $type, string $host, int $port, int $mode, array $settings = []) public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{ {
@@ -129,8 +145,12 @@ class ServerManager
/** /**
* @throws ReflectionException * @param $configs
* @param int $daemon
* @throws ConfigException * @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/ */
public function initBaseServer($configs, int $daemon = 0): void public function initBaseServer($configs, int $daemon = 0): void
{ {
@@ -142,23 +162,6 @@ class ServerManager
} }
/**
* @return bool
* @throws ConfigException
* @throws Exception
*/
public function isRunner(): bool
{
$configs = Config::get('server', [], true);
foreach ($this->sortService($configs['ports']) as $config) {
if (checkPortIsAlready($config['port'])) {
return true;
}
}
return false;
}
/** /**
* @param string|OnProcessInterface|BaseProcess $customProcess * @param string|OnProcessInterface|BaseProcess $customProcess
* @throws Exception * @throws Exception
@@ -168,19 +171,14 @@ class ServerManager
if (is_string($customProcess)) { if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess); $customProcess = Kiri::getDi()->get($customProcess);
} }
$process = new Process(function (Process $soloProcess) use ($customProcess) { $system = sprintf('[%s].process', Config::get('id', 'system-service'));
$system = sprintf('[%s].process', Config::get('id', 'system-service')); $process = new Process(function (Process $process) use ($customProcess, $system) {
if (Kiri::getPlatform()->isLinux()) { if (Kiri::getPlatform()->isLinux()) {
$soloProcess->name($system . '(' . $customProcess->getName() . ')'); $process->name($system . '(' . $customProcess->getName() . ')');
} }
$this->logger->debug($system . $customProcess->getName() . ' start.'); $customProcess->process($process);
$customProcess->signListen($soloProcess); }, $customProcess->getRedirectStdinAndStdout(), $customProcess->getPipeType(), $customProcess->isEnableCoroutine());
$customProcess->onHandler($soloProcess); $this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
},
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
$this->container->setBindings($customProcess->getName(), $process); $this->container->setBindings($customProcess->getName(), $process);
$this->server->addProcess($process); $this->server->addProcess($process);
} }
@@ -195,21 +193,13 @@ class ServerManager
} }
/**
* @param string $key
* @param string|int $value
*/
public static function setEnv(string $key, string|int $value): void
{
putenv(sprintf('%s=%s', $key, (string)$value));
}
/** /**
* @param ServerManager $context * @param ServerManager $context
* @param array $config * @param array $config
* @param int $daemon * @param int $daemon
* @throws ConfigException * @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException * @throws ReflectionException
* @throws Exception * @throws Exception
*/ */
@@ -238,6 +228,7 @@ class ServerManager
$config['settings']['log_file'] = storage('system.log'); $config['settings']['log_file'] = storage('system.log');
} }
$config['settings']['pid_file'] = storage('.swoole.pid'); $config['settings']['pid_file'] = storage('.swoole.pid');
$config['settings'][Constant::OPTION_ENABLE_REUSE_PORT] = true;
$config['events'] = $config['events'] ?? []; $config['events'] = $config['events'] ?? [];
return $config; return $config;
} }
@@ -249,13 +240,16 @@ class ServerManager
* @param int $port * @param int $port
* @param int $mode * @param int $mode
* @param array $settings * @param array $settings
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception * @throws Exception
*/ */
private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = []) private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = [])
{ {
$id = Config::get('id', 'system-service'); $id = Config::get('id', 'system-service');
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port)); $this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port));
/** @var Server\Port $service */ /** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode); $this->ports[$port] = $this->server->addlistener($host, $port, $mode);
@@ -282,8 +276,10 @@ class ServerManager
* @param int $port * @param int $port
* @param int $mode * @param int $mode
* @param array $settings * @param array $settings
* @throws ReflectionException
* @throws ConfigException * @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception * @throws Exception
*/ */
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
@@ -305,26 +301,11 @@ class ServerManager
} }
/**
* @param int $port
* @throws Exception
*/
public function stopServer(int $port)
{
if (!($pid = checkPortIsAlready($port))) {
return;
}
while (checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
/** /**
* @param array $settings * @param array $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException * @throws ReflectionException
* @throws Exception
*/ */
private function addDefaultListener(array $settings): void private function addDefaultListener(array $settings): void
{ {
@@ -342,6 +323,9 @@ class ServerManager
/** /**
* @param array $events * @param array $events
* @param Server|Port $server * @param Server|Port $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/ */
private function addServiceEvents(array $events, Server|Port $server) private function addServiceEvents(array $events, Server|Port $server)
{ {
@@ -363,40 +347,6 @@ class ServerManager
} }
/**
* @param string $class
* @return object
*/
private function getNewInstance(string $class): object
{
return $this->container->create($class);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws ReflectionException
* @throws Exception
*/
public function task(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerId = random_int($this->server->setting['worker_num'] + 1,
$this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']);
}
if (is_string($handler)) {
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
$handler = $implements->newInstanceArgs($params);
}
$this->server->task(serialize($handler), $workerId);
}
/** /**
* @param mixed $message * @param mixed $message
* @param int $workerId * @param int $workerId
@@ -410,12 +360,14 @@ class ServerManager
/** /**
* @param array $events * @param array $events
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException * @throws ReflectionException
*/ */
private function addTaskListener(array $events = []): void private function addTaskListener(array $events = []): void
{ {
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->getReflect(OnServerTask::class)?->newInstance(); $reflect = $this->container->get(OnServerTask::class);
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
if ($task_use_object || $this->server->setting['task_enable_coroutine']) { if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
@@ -427,10 +379,12 @@ class ServerManager
/** /**
* @param array|null $settings * @param array|null $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/ */
public function bindCallback(?array $settings = []) public function bindCallback(?array $settings = [])
{ {
// TODO: Implement bindCallback() method.
if (count($settings) < 1) { if (count($settings) < 1) {
return; return;
} }
+56
View File
@@ -0,0 +1,56 @@
<?php
namespace Server;
use Exception;
use Kiri\Abstracts\BaseObject;
use Kiri\Abstracts\Config;
use Swoole\Process;
class State extends BaseObject
{
use TraitServer;
public array $servers = [];
public function init()
{
$this->servers = Config::get('server.ports');
}
/**
* @return bool
* @throws Exception
*/
public function isRunner(): bool
{
$ports = $this->sortService($this->servers);
foreach ($ports as $config) {
if (checkPortIsAlready($config['port'])) {
return true;
}
}
return false;
}
/**
* @param $port
* @throws Exception
*/
public function exit($port)
{
if (!($pid = checkPortIsAlready($port))) {
return;
}
while (checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
}
+102
View File
@@ -0,0 +1,102 @@
<?php
namespace Server\Tasker;
use Exception;
use Kiri\Abstracts\BaseObject;
use Kiri\Core\HashMap;
use Kiri\Di\Container;
use Kiri\Kiri;
use Note\Inject;
use Psr\Container\ContainerInterface;
use ReflectionException;
use Server\Contract\OnTaskInterface;
use Server\SwooleServerInterface;
/**
*
*/
class AsyncTaskExecute extends BaseObject
{
/**
* @var SwooleServerInterface|null
*/
public ?SwooleServerInterface $server = null;
/**
* @var Container
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
private HashMap $hashMap;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
}
/**
* @param string $key
* @param $handler
*/
public function reg(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if (!$this->server) {
$this->server = Kiri::getDi()->get(SwooleServerInterface::class);
}
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerNum = $this->server->setting['worker_num'];
$taskerNum = $workerNum + $this->server->setting['task_worker_num'];
$workerId = random_int($workerNum, $taskerNum - 1);
}
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param $handler
* @param $params
* @return object
* @throws ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
@@ -1,13 +1,12 @@
<?php <?php
namespace Server\Handler; namespace Server\Tasker;
use Annotation\Inject; use Note\Inject;
use Kiri\Abstracts\Logger; use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use ReflectionException; use ReflectionException;
use Server\Contract\OnTaskInterface; use Server\Contract\OnTaskInterface;
use Swoole\Server; use Swoole\Server;
@@ -68,19 +67,14 @@ class OnServerTask
/** /**
* @param $data * @param $data
* @return null * @return null
* @throws ReflectionException
*/ */
private function resolve($data) private function resolve($data)
{ {
[$class, $params] = json_encode($data, true); $execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
$reflect = Kiri::getDi()->getReflect($class); return $execute->execute();
if (!$reflect->isInstantiable()) {
return null;
} }
$class = $reflect->newInstanceArgs($params); return null;
return $class->execute();
} }