Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c2fde2e6d7 | |||
| 5ddd0bc6d7 | |||
| d22db391f3 | |||
| bcd78aba56 | |||
| c899f51f35 | |||
| 1419e96c89 | |||
| 224b52db49 | |||
| 248f4d7100 | |||
| 122f37d6ce | |||
| 9fe40ff78a | |||
| ae4d75f310 | |||
| 707379c0fd | |||
| b861e0b17b | |||
| 7b576476a2 | |||
| 711a819ebb | |||
| 971b1f1fb0 | |||
| 4831bc67f5 | |||
| 1b8c6ecde0 | |||
| a4c78874e4 | |||
| b6c4693ef6 | |||
| adfa1cf3f2 |
@@ -1,17 +0,0 @@
|
||||
<?php
|
||||
|
||||
|
||||
namespace Kiri\Server\Contract;
|
||||
|
||||
|
||||
use Swoole\Server;
|
||||
|
||||
interface OnTaskInterface
|
||||
{
|
||||
|
||||
public function execute();
|
||||
|
||||
|
||||
public function finish(Server $server, int $task_id);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Server\Events;
|
||||
|
||||
class OnProcessStart
|
||||
{
|
||||
|
||||
}
|
||||
@@ -15,10 +15,6 @@ class OnPipeMessage extends Server
|
||||
{
|
||||
|
||||
|
||||
/** @var EventDispatch */
|
||||
#[Inject(EventDispatch::class)]
|
||||
public EventDispatch $eventDispatch;
|
||||
|
||||
|
||||
/**
|
||||
* @param \Swoole\Server $server
|
||||
|
||||
@@ -19,12 +19,6 @@ use Kiri\Server\Events\OnStart;
|
||||
class OnServer extends Server
|
||||
{
|
||||
|
||||
/**
|
||||
* @var EventDispatch
|
||||
*/
|
||||
#[Inject(EventDispatch::class)]
|
||||
public EventDispatch $eventDispatch;
|
||||
|
||||
|
||||
/**
|
||||
* @param \Swoole\Server $server
|
||||
@@ -35,7 +29,7 @@ class OnServer extends Server
|
||||
{
|
||||
$this->setProcessName(sprintf('start[%d].server', $server->master_pid));
|
||||
|
||||
$this->eventDispatch->dispatch(new OnStart($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnStart($server));
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +39,7 @@ class OnServer extends Server
|
||||
*/
|
||||
public function onBeforeShutdown(\Swoole\Server $server)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnBeforeShutdown($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnBeforeShutdown($server));
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +49,7 @@ class OnServer extends Server
|
||||
*/
|
||||
public function onShutdown(\Swoole\Server $server)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnShutdown($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnShutdown($server));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -18,12 +18,6 @@ use Kiri\Server\Events\OnManagerStop;
|
||||
class OnServerManager extends Server
|
||||
{
|
||||
|
||||
/**
|
||||
* @var EventDispatch
|
||||
*/
|
||||
#[Inject(EventDispatch::class)]
|
||||
public EventDispatch $eventDispatch;
|
||||
|
||||
|
||||
/**
|
||||
* @param \Swoole\Server $server
|
||||
@@ -33,7 +27,7 @@ class OnServerManager extends Server
|
||||
{
|
||||
$this->setProcessName(sprintf('manger[%d].0', $server->manager_pid));
|
||||
|
||||
$this->eventDispatch->dispatch(new OnManagerStart($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnManagerStart($server));
|
||||
}
|
||||
|
||||
|
||||
@@ -43,7 +37,7 @@ class OnServerManager extends Server
|
||||
*/
|
||||
public function onManagerStop(\Swoole\Server $server)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnManagerStop($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnManagerStop($server));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -16,20 +16,13 @@ class OnServerReload
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @var EventDispatch
|
||||
*/
|
||||
#[Inject(EventDispatch::class)]
|
||||
public EventDispatch $eventDispatch;
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @throws \ReflectionException
|
||||
*/
|
||||
public function onBeforeReload(Server $server)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnBeforeReload($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnBeforeReload($server));
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +32,7 @@ class OnServerReload
|
||||
*/
|
||||
public function onAfterReload(Server $server)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnAfterReload($server));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnAfterReload($server));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+17
-13
@@ -28,11 +28,16 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
|
||||
{
|
||||
|
||||
|
||||
public Router $collector;
|
||||
|
||||
|
||||
/**
|
||||
* @var EventDispatch
|
||||
* @return void
|
||||
*/
|
||||
#[Inject(EventDispatch::class)]
|
||||
public EventDispatch $eventDispatch;
|
||||
public function init()
|
||||
{
|
||||
$this->collector = Kiri::getDi()->get(Router::class);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
@@ -42,19 +47,20 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
|
||||
*/
|
||||
public function onWorkerStart(Server $server, int $workerId)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnBeforeWorkerStart($workerId));
|
||||
$dispatch = \Kiri::getDi()->get(EventDispatch::class);
|
||||
$dispatch->dispatch(new OnBeforeWorkerStart($workerId));
|
||||
set_env('environmental_workerId', $workerId);
|
||||
if ($workerId < $server->setting['worker_num']) {
|
||||
$this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId));
|
||||
Kiri::getDi()->get(Router::class)->scan_build_route();
|
||||
$this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId));
|
||||
$this->collector->scan_build_route();
|
||||
$dispatch->dispatch(new OnWorkerStart($server, $workerId));
|
||||
set_env('environmental', Kiri::WORKER);
|
||||
} else {
|
||||
$this->eventDispatch->dispatch(new OnTaskStart($server, $workerId));
|
||||
$dispatch->dispatch(new OnTaskStart($server, $workerId));
|
||||
$this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId));
|
||||
set_env('environmental', Kiri::TASK);
|
||||
}
|
||||
$this->eventDispatch->dispatch(new OnAfterWorkerStart());
|
||||
$dispatch->dispatch(new OnAfterWorkerStart());
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +72,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
|
||||
public function onWorkerStop(Server $server, int $workerId)
|
||||
{
|
||||
Timer::clearAll();
|
||||
$this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerStop($server, $workerId));
|
||||
}
|
||||
|
||||
|
||||
@@ -77,9 +83,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
|
||||
*/
|
||||
public function onWorkerExit(Server $server, int $workerId)
|
||||
{
|
||||
set_env('state', 'exit');
|
||||
|
||||
$this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerExit($server, $workerId));
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +97,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
|
||||
*/
|
||||
public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal)
|
||||
{
|
||||
$this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
|
||||
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
|
||||
|
||||
$message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s',
|
||||
$worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9)
|
||||
|
||||
+14
-3
@@ -2,14 +2,16 @@
|
||||
|
||||
namespace Kiri\Server;
|
||||
|
||||
use Kiri;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Context;
|
||||
use Kiri\Events\EventDispatch;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri;
|
||||
use Kiri\Server\Abstracts\BaseProcess;
|
||||
use Kiri\Server\Broadcast\Message;
|
||||
use Kiri\Server\Contract\OnProcessInterface;
|
||||
use Kiri\Server\Events\OnProcessStart;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Process;
|
||||
@@ -40,8 +42,14 @@ class ProcessManager
|
||||
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
|
||||
|
||||
$this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
|
||||
|
||||
$server->addProcess($process = $this->parse($customProcess, $system));
|
||||
$process = $this->parse($customProcess, $system);
|
||||
if (Context::inCoroutine()) {
|
||||
Coroutine::create(function () use ($process) {
|
||||
$process->start();
|
||||
});
|
||||
} else {
|
||||
$server->addProcess($process = $this->parse($customProcess, $system));
|
||||
}
|
||||
$this->_process[$customProcess->getName()] = $process;
|
||||
}
|
||||
|
||||
@@ -57,6 +65,9 @@ class ProcessManager
|
||||
if (Kiri::getPlatform()->isLinux()) {
|
||||
$process->name($system . '(' . $customProcess->getName() . ')');
|
||||
}
|
||||
|
||||
Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnProcessStart());
|
||||
|
||||
set_env('environmental', Kiri::PROCESS);
|
||||
$channel = Coroutine::create(function () use ($process, $customProcess) {
|
||||
while (!$customProcess->isStop()) {
|
||||
|
||||
+10
-6
@@ -4,6 +4,7 @@
|
||||
namespace Kiri\Server;
|
||||
|
||||
use Exception;
|
||||
use JetBrains\PhpStorm\Pure;
|
||||
use Kiri;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Annotation\Inject;
|
||||
@@ -12,7 +13,9 @@ use Kiri\Exception\ConfigException;
|
||||
use Kiri\Message\Handler\Abstracts\HttpService;
|
||||
use Kiri\Server\Events\OnShutdown;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Kiri\Server\Events\OnServerBeforeStart;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use ReflectionException;
|
||||
use Swoole\Coroutine;
|
||||
|
||||
|
||||
@@ -77,7 +80,7 @@ class Server extends HttpService
|
||||
* @throws ConfigException
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
* @throws \ReflectionException
|
||||
* @throws ReflectionException
|
||||
* @throws Exception
|
||||
*/
|
||||
public function start(): mixed
|
||||
@@ -92,7 +95,9 @@ class Server extends HttpService
|
||||
|
||||
$processes = array_merge($this->process, Config::get('processes', []));
|
||||
|
||||
$this->container->get(ProcessManager::class)->batch($processes);
|
||||
$this->getContainer()->get(ProcessManager::class)->batch($processes);
|
||||
|
||||
$this->getEventDispatch()->dispatch(new OnServerBeforeStart());
|
||||
|
||||
return $this->manager->getServer()->start();
|
||||
}
|
||||
@@ -103,7 +108,7 @@ class Server extends HttpService
|
||||
* @throws ConfigException
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
* @throws \ReflectionException
|
||||
* @throws ReflectionException
|
||||
* @throws Exception
|
||||
*/
|
||||
public function shutdown()
|
||||
@@ -112,7 +117,7 @@ class Server extends HttpService
|
||||
foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) {
|
||||
$this->state->exit($config['port']);
|
||||
}
|
||||
$this->container->get(EventDispatch::class)->dispatch(new OnShutdown());
|
||||
$this->getContainer()->get(EventDispatch::class)->dispatch(new OnShutdown());
|
||||
}
|
||||
|
||||
|
||||
@@ -143,9 +148,8 @@ class Server extends HttpService
|
||||
|
||||
/**
|
||||
* @return \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null
|
||||
* @throws \ReflectionException
|
||||
*/
|
||||
public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null
|
||||
#[Pure] public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null
|
||||
{
|
||||
return $this->manager->getServer();
|
||||
}
|
||||
|
||||
+1
-7
@@ -30,12 +30,6 @@ class ServerCommand extends Command
|
||||
const ACTIONS = ['start', 'stop', 'restart'];
|
||||
|
||||
|
||||
/**
|
||||
* @var EventDispatch
|
||||
*/
|
||||
#[Inject(EventDispatch::class)]
|
||||
public EventDispatch $eventProvider;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -77,7 +71,7 @@ class ServerCommand extends Command
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return $manager->start();
|
||||
return (int)$manager->start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+19
-94
@@ -7,7 +7,6 @@ use Kiri\Abstracts\Component;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Error\Logger;
|
||||
use Kiri\Events\EventDispatch;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Server\Contract\OnCloseInterface;
|
||||
use Kiri\Server\Contract\OnConnectInterface;
|
||||
@@ -16,18 +15,13 @@ use Kiri\Server\Contract\OnHandshakeInterface;
|
||||
use Kiri\Server\Contract\OnMessageInterface;
|
||||
use Kiri\Server\Contract\OnPacketInterface;
|
||||
use Kiri\Server\Contract\OnReceiveInterface;
|
||||
use Kiri\Server\Events\OnServerBeforeStart;
|
||||
use Kiri\Server\Handler\OnPipeMessage;
|
||||
use Kiri\Server\Handler\OnServer;
|
||||
use Kiri\Server\Handler\OnServerManager;
|
||||
use Kiri\Server\Handler\OnServerReload;
|
||||
use Kiri\Server\Handler\OnServerWorker;
|
||||
use Kiri\Server\Tasker\OnServerTask;
|
||||
use Kiri\Task\TaskManager;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use ReflectionException;
|
||||
use Swoole\Http\Server as HServer;
|
||||
use Swoole\Process;
|
||||
use Swoole\Server;
|
||||
use Swoole\Server\Port;
|
||||
use Swoole\WebSocket\Server as WServer;
|
||||
@@ -64,24 +58,19 @@ class ServerManager extends Component
|
||||
private Server|null $server = null;
|
||||
|
||||
|
||||
protected array $initProcesses = [];
|
||||
|
||||
|
||||
const DEFAULT_EVENT = [
|
||||
Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'],
|
||||
Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'],
|
||||
Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'],
|
||||
Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'],
|
||||
Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'],
|
||||
Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'],
|
||||
Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'],
|
||||
Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'],
|
||||
Constant::START => [OnServer::class, 'onStart'],
|
||||
Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'],
|
||||
Constant::SHUTDOWN => [OnServer::class, 'onShutdown'],
|
||||
];
|
||||
|
||||
|
||||
/**
|
||||
* @var array|string[]
|
||||
*/
|
||||
private array $eventInterface = [
|
||||
OnReceiveInterface::class => 'receive',
|
||||
OnPacketInterface::class => 'packet',
|
||||
@@ -95,11 +84,9 @@ class ServerManager extends Component
|
||||
|
||||
/**
|
||||
* @return Server|WServer|HServer|null
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function getServer(): Server|WServer|HServer|null
|
||||
{
|
||||
di(EventDispatch::class)->dispatch(new OnServerBeforeStart());
|
||||
return $this->server;
|
||||
}
|
||||
|
||||
@@ -140,16 +127,25 @@ class ServerManager extends Component
|
||||
foreach ($this->sortService($configs['ports']) as $config) {
|
||||
$this->startListenerHandler($context, $config, $daemon);
|
||||
}
|
||||
$this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
|
||||
$this->bindPipeMessage();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return array<string,Process>
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function getProcesses(): array
|
||||
public function bindPipeMessage(): void
|
||||
{
|
||||
return $this->initProcesses;
|
||||
$pipeMessage = $this->getContainer()->get(OnPipeMessage::class);
|
||||
$this->server->on(Constant::PIPE_MESSAGE, [$pipeMessage, 'onPipeMessage']);
|
||||
|
||||
if (!isset($this->server->setting['task_worker_num']) || $this->server->setting['task_worker_num'] < 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->getContainer()->get(TaskManager::class)->taskListener($this->server);
|
||||
}
|
||||
|
||||
|
||||
@@ -248,23 +244,6 @@ class ServerManager extends Component
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
* @throws Exception
|
||||
*
|
||||
*
|
||||
*
|
||||
* $data = new Table($this->container->get(OutputInterface::class));
|
||||
* $data->setHeaders(['key', 'value']);
|
||||
*
|
||||
* $array = [];
|
||||
* foreach ($this->server->setting as $key => $value) {
|
||||
* $array[] = [$key, $value];
|
||||
* $array[] = new TableSeparator();
|
||||
* }
|
||||
*
|
||||
* array_pop($array);
|
||||
*
|
||||
* $data->setStyle('box-double');
|
||||
* $data->setRows($array);
|
||||
* $data->render();
|
||||
*/
|
||||
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
|
||||
{
|
||||
@@ -277,7 +256,7 @@ class ServerManager extends Component
|
||||
$this->server = new $match($host, $port, SWOOLE_PROCESS, $mode);
|
||||
$this->server->set(array_merge(Config::get('server.settings', []), $settings['settings']));
|
||||
|
||||
$this->container->setBindings(SwooleServerInterface::class, $this->server);
|
||||
$this->getContainer()->setBindings(SwooleServerInterface::class, $this->server);
|
||||
|
||||
$id = Config::get('id', 'system-service');
|
||||
|
||||
@@ -294,9 +273,6 @@ class ServerManager extends Component
|
||||
*/
|
||||
private function addDefaultListener(array $settings): void
|
||||
{
|
||||
if (($this->server->setting['task_worker_num'] ?? 0) > 0) {
|
||||
$this->addTaskListener($settings['events']);
|
||||
}
|
||||
$this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server);
|
||||
if (!empty($settings['events']) && is_array($settings['events'])) {
|
||||
$this->addServiceEvents($settings['events'], $this->server);
|
||||
@@ -314,7 +290,7 @@ class ServerManager extends Component
|
||||
{
|
||||
foreach ($events as $name => $event) {
|
||||
if (is_array($event) && is_string($event[0])) {
|
||||
$event[0] = $this->container->get($event[0]);
|
||||
$event[0] = $this->getContainer()->get($event[0]);
|
||||
}
|
||||
$server->on($name, $event);
|
||||
}
|
||||
@@ -330,55 +306,4 @@ class ServerManager extends Component
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param mixed $message
|
||||
* @param int $workerId
|
||||
* @return mixed
|
||||
*/
|
||||
public function sendMessage(mixed $message, int $workerId): mixed
|
||||
{
|
||||
return $this->server?->sendMessage($message, $workerId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array $events
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
private function addTaskListener(array $events = []): void
|
||||
{
|
||||
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
|
||||
$reflect = $this->container->get(OnServerTask::class);
|
||||
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
|
||||
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
|
||||
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
|
||||
} else {
|
||||
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array|null $settings
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function bindCallback(?array $settings = [])
|
||||
{
|
||||
if (count($settings) < 1) {
|
||||
return;
|
||||
}
|
||||
foreach ($settings as $event_type => $callback) {
|
||||
if ($this->server->getCallback($event_type) !== null) {
|
||||
continue;
|
||||
}
|
||||
if (is_array($callback) && !is_object($callback[0])) {
|
||||
$callback[0] = $this->container->get($callback[0]);
|
||||
}
|
||||
$this->server->on($event_type, $callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,6 @@ class ServerProviders extends Providers
|
||||
*/
|
||||
public function onImport(Application $application)
|
||||
{
|
||||
$application->set('server', ['class' => Server::class]);
|
||||
|
||||
$container = Kiri::getDi();
|
||||
|
||||
$console = $container->get(\Symfony\Component\Console\Application::class);
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Server\Tasker;
|
||||
|
||||
use Exception;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Core\HashMap;
|
||||
use Kiri;
|
||||
use ReflectionException;
|
||||
use Kiri\Server\Contract\OnTaskInterface;
|
||||
use Kiri\Server\SwooleServerInterface;
|
||||
use Swoole\Server;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
class AsyncTaskExecute extends Component
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @var Server|\Swoole\WebSocket\Server|\Swoole\Http\Server
|
||||
*/
|
||||
public mixed $server = null;
|
||||
|
||||
|
||||
private HashMap $hashMap;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function init()
|
||||
{
|
||||
$this->hashMap = new HashMap();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param $handler
|
||||
*/
|
||||
public function reg(string $key, $handler)
|
||||
{
|
||||
$this->hashMap->put($key, $handler);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnTaskInterface|string $handler
|
||||
* @param array $params
|
||||
* @param int $workerId
|
||||
* @throws Exception
|
||||
*/
|
||||
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
|
||||
{
|
||||
if (!$this->server) {
|
||||
$this->server = Kiri::getDi()->get(SwooleServerInterface::class);
|
||||
}
|
||||
if ($workerId < 0 || $workerId > $this->server->setting['task_worker_num']) {
|
||||
$workerId = random_int(0, $this->server->setting['task_worker_num'] - 1);
|
||||
}
|
||||
if (is_string($handler)) {
|
||||
$handler = $this->handle($handler, $params);
|
||||
}
|
||||
$this->server->task(serialize($handler), $workerId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $handler
|
||||
* @param $params
|
||||
* @return object
|
||||
* @throws ReflectionException
|
||||
* @throws Exception
|
||||
*/
|
||||
private function handle($handler, $params): object
|
||||
{
|
||||
if (!class_exists($handler) && $this->hashMap->has($handler)) {
|
||||
$handler = $this->hashMap->get($handler);
|
||||
}
|
||||
$implements = $this->container->getReflect($handler);
|
||||
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
|
||||
throw new Exception('Task must instance ' . OnTaskInterface::class);
|
||||
}
|
||||
return $implements->newInstanceArgs($params);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,94 +0,0 @@
|
||||
<?php
|
||||
|
||||
|
||||
namespace Kiri\Server\Tasker;
|
||||
|
||||
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Abstracts\Logger;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Server\Contract\OnTaskInterface;
|
||||
use Swoole\Server;
|
||||
|
||||
|
||||
/**
|
||||
* Class OnServerTask
|
||||
* @package Server\Task
|
||||
*/
|
||||
class OnServerTask
|
||||
{
|
||||
|
||||
|
||||
#[Inject(Logger::class)]
|
||||
public Logger $logger;
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param int $src_worker_id
|
||||
* @param mixed $data
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data)
|
||||
{
|
||||
try {
|
||||
$data = $this->resolve($data);
|
||||
} catch (\Throwable $exception) {
|
||||
$data = jTraceEx($exception);
|
||||
|
||||
$this->logger->error('task', [error_trigger_format($exception)]);
|
||||
} finally {
|
||||
$server->finish($data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param Server\Task $task
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function onCoroutineTask(Server $server, Server\Task $task)
|
||||
{
|
||||
try {
|
||||
$data = $this->resolve($task->data);
|
||||
} catch (\Throwable $exception) {
|
||||
$data = jTraceEx($exception);
|
||||
|
||||
$this->logger->error('task', [error_trigger_format($exception)]);
|
||||
} finally {
|
||||
$task->finish($data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $data
|
||||
* @return null
|
||||
*/
|
||||
private function resolve($data)
|
||||
{
|
||||
$execute = unserialize($data);
|
||||
if ($execute instanceof OnTaskInterface) {
|
||||
return $execute->execute();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param mixed $data
|
||||
*/
|
||||
public function onFinish(Server $server, int $task_id, mixed $data)
|
||||
{
|
||||
if (!($data instanceof OnTaskInterface)) {
|
||||
return;
|
||||
}
|
||||
$data->finish($server, $task_id);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user