This commit is contained in:
2023-07-31 23:08:58 +08:00
parent 2b2a779b94
commit ba3df62cb5
11 changed files with 70 additions and 137 deletions
+6 -7
View File
@@ -3,6 +3,7 @@
namespace Kiri\Server\Abstracts; namespace Kiri\Server\Abstracts;
use Kiri\Abstracts\Logger;
use Kiri\Di\Context; use Kiri\Di\Context;
use Kiri\Server\Contract\OnProcessInterface; use Kiri\Server\Contract\OnProcessInterface;
use Swoole\Coroutine; use Swoole\Coroutine;
@@ -88,17 +89,15 @@ abstract class BaseProcess implements OnProcessInterface
abstract public function onSigterm(): static; abstract public function onSigterm(): static;
/** /**
* @param $data * @param $data
* @throws \ReflectionException * @return void
*/ */
protected function onShutdown($data): void protected function onShutdown($data): void
{ {
$this->isStop = true; $this->isStop = true;
$value = Context::get('waite:process:message'); $value = Context::get('waite:process:message');
Logger::_alert('Process ' . $this->getName() . ' stop');
\Kiri::getLogger()->alert('Process ' . $this->getName() . ' stop');
if (!is_null($value) && Coroutine::exists((int)$value)) { if (!is_null($value) && Coroutine::exists((int)$value)) {
Coroutine::cancel((int)$value); Coroutine::cancel((int)$value);
} }
+31 -43
View File
@@ -7,15 +7,10 @@ use Exception;
use Kiri; use Kiri;
use Kiri\Abstracts\Component; use Kiri\Abstracts\Component;
use Kiri\Server\Contract\OnProcessInterface; use Kiri\Server\Contract\OnProcessInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException; use ReflectionException;
use Swoole\Coroutine;
use Swoole\Process; use Swoole\Process;
use Psr\Container\ContainerInterface; use Kiri\Abstracts\Logger;
use Kiri\Events\EventProvider;
use Kiri\Server\ServerInterface; use Kiri\Server\ServerInterface;
use Kiri\Di\Inject\Container;
use Kiri\Server\Events\OnServerBeforeStart; use Kiri\Server\Events\OnServerBeforeStart;
class ProcessManager extends Component class ProcessManager extends Component
@@ -32,8 +27,7 @@ class ProcessManager extends Component
*/ */
public function init(): void public function init(): void
{ {
$provider = Kiri::getDi()->get(EventProvider::class); on(OnServerBeforeStart::class, [$this, 'OnServerBeforeStart']);
$provider->on(OnServerBeforeStart::class, [$this, 'OnServerBeforeStart']);
} }
@@ -46,19 +40,13 @@ class ProcessManager extends Component
{ {
$server = Kiri::getDi()->get(ServerInterface::class); $server = Kiri::getDi()->get(ServerInterface::class);
foreach ($this->_process as $custom) { foreach ($this->_process as $custom) {
if (Kiri\Di\Context::inCoroutine()) { $server->addProcess(new Process(function (Process $process) use ($custom) {
Coroutine::create(function () use ($custom) { $this->extracted($custom, $process);
$custom->onSigterm()->process(null); },
}); $custom->getRedirectStdinAndStdout(),
} else { $custom->getPipeType(),
$server->addProcess(new Process(function (Process $process) use ($custom) { $custom->isEnableCoroutine()
$this->extracted($custom, $process); ));
},
$custom->getRedirectStdinAndStdout(),
$custom->getPipeType(),
$custom->isEnableCoroutine()
));
}
} }
} }
@@ -95,9 +83,9 @@ class ProcessManager extends Component
*/ */
public function shutdown(): void public function shutdown(): void
{ {
// foreach ($this->_process as $process) { foreach ($this->_process as $process) {
// Process::kill($process->pid, 0) && Process::kill($process->pid, 15); Process::kill($process->pid, 0) && Process::kill($process->pid, 15);
// } }
} }
@@ -120,16 +108,16 @@ class ProcessManager extends Component
*/ */
public function get(?string $name = null, string $tag = 'default'): array|Process|null public function get(?string $name = null, string $tag = 'default'): array|Process|null
{ {
// $process = $this->_process[$tag] ?? null; $process = $this->_process[$tag] ?? null;
// if (empty($process)) { if (empty($process)) {
// return null; return null;
// } }
// if (!empty($name)) { if (!empty($name)) {
// if (!isset($process[$name])) { if (!isset($process[$name])) {
// return null; return null;
// } }
// return $process[$name]; return $process[$name];
// } }
return null; return null;
} }
@@ -139,9 +127,9 @@ class ProcessManager extends Component
*/ */
public function stop(): void public function stop(): void
{ {
// foreach ($this->_process as $process) { foreach ($this->_process as $process) {
// Process::kill($process->pid, 0) && Process::kill($process->pid, 15); Process::kill($process->pid, 0) && Process::kill($process->pid, 15);
// } }
} }
@@ -168,11 +156,11 @@ class ProcessManager extends Component
*/ */
public function push(string $name, string $message): void public function push(string $name, string $message): void
{ {
// if (!isset($this->_process[$name])) { if (!isset($this->_process[$name])) {
// return; return;
// } }
// $process = $this->_process[$name]; $process = $this->_process[$name];
// $process->write($message); $process->write($message);
} }
/** /**
@@ -186,7 +174,7 @@ class ProcessManager extends Component
{ {
set_env('environmental', Kiri::PROCESS); set_env('environmental', Kiri::PROCESS);
$system = sprintf('[%s].Custom Process', \config('id', 'system-service')); $system = sprintf('[%s].Custom Process', \config('id', 'system-service'));
Kiri::getLogger()->alert($system . ' ' . $custom->getName() . ' start.'); Logger::_alert($system . ' ' . $custom->getName() . ' start.');
if (Kiri::getPlatform()->isLinux()) { if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '[' . $process->pid . '].' . $custom->getName()); $process->name($system . '[' . $process->pid . '].' . $custom->getName());
} }
-2
View File
@@ -6,8 +6,6 @@ namespace Kiri\Server\Abstracts;
use Exception; use Exception;
use Kiri; use Kiri;
use Psr\Log\LoggerInterface;
use Kiri\Di\Inject\Container;
/** /**
-12
View File
@@ -1,12 +0,0 @@
<?php
namespace Kiri\Server\Abstracts;
enum StatusEnum
{
case START;
case STOP;
case EXIT;
case ERROR;
}
-15
View File
@@ -165,19 +165,4 @@ trait TraitServer
}; };
} }
/**
* @param $type
* @return string|null
*/
public function getCoroutineServerClass($type): ?string
{
return match ($type) {
Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP, Constant::SERVER_TYPE_UDP => Coroutine\Server::class,
Constant::SERVER_TYPE_HTTP, Constant::SERVER_TYPE_WEBSOCKET => Coroutine\Http\Server::class,
default => null
};
}
} }
-1
View File
@@ -27,7 +27,6 @@ class OnServer extends Server
/** /**
* @param SServer $server * @param SServer $server
* @throws ConfigException
* @throws ReflectionException * @throws ReflectionException
* @throws ContainerExceptionInterface * @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface * @throws NotFoundExceptionInterface
+1 -2
View File
@@ -87,8 +87,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
*/ */
public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal): void public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal): void
{ {
$dispatch = \Kiri::getDi()->get(EventDispatch::class); event(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
$dispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
$message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s',
$worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9)
+2 -2
View File
@@ -305,7 +305,7 @@ class HotReload extends BaseProcess
try { try {
inotify_rm_watch($this->inotify, $wd); inotify_rm_watch($this->inotify, $wd);
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
logger()->addError($exception, 'throwable'); trigger_print_error($exception, 'throwable');
} }
} }
$this->watchFiles = []; $this->watchFiles = [];
@@ -321,7 +321,7 @@ class HotReload extends BaseProcess
{ {
//目录不存在 //目录不存在
if (!is_dir($dir)) { if (!is_dir($dir)) {
return logger()->addError("[$dir] is not a directory."); return trigger_print_error("[$dir] is not a directory.");
} }
//避免重复监听 //避免重复监听
if (isset($this->watchFiles[$dir])) { if (isset($this->watchFiles[$dir])) {
+16 -42
View File
@@ -28,11 +28,6 @@ defined('PID_PATH') or define('PID_PATH', APP_PATH . 'storage/server.pid');
class Server class Server
{ {
/**
* @var string|mixed
*/
private string $class;
/** /**
* @var int * @var int
@@ -41,20 +36,16 @@ class Server
/** /**
* * @param AsyncServer $manager
* @param State $state
* @param EventDispatch $dispatch
* @param Router $router
*/ */
public function __construct() public function __construct(public AsyncServer $manager,
public State $state,
public EventDispatch $dispatch,
public Router $router)
{ {
$this->class = \config('server.type', AsyncServer::class);
}
/**
* @throws ReflectionException
*/
private function manager(): AsyncServer
{
return Kiri::getDi()->get($this->class);
} }
@@ -64,7 +55,7 @@ class Server
*/ */
public function addProcess($process): void public function addProcess($process): void
{ {
$this->manager()->addProcess($process); $this->manager->addProcess($process);
} }
@@ -79,26 +70,14 @@ class Server
on(OnTaskerStart::class, [$this, 'setTaskerName']); on(OnTaskerStart::class, [$this, 'setTaskerName']);
if (\config('reload.hot') === false) { if (\config('reload.hot') === false) {
$this->hotLoad(); $this->router->scan_build_route();
} else { } else {
on(OnWorkerStart::class, [$this, 'hotLoad']); on(OnWorkerStart::class, [$this, 'hotLoad']);
$this->addProcess(HotReload::class); $this->addProcess(HotReload::class);
} }
$manager = $this->manager(); $this->manager->initCoreServers(\config('server', []), $this->daemon);
$manager->initCoreServers(\config('server', []), $this->daemon); $this->manager->start();
$manager->start();
}
/**
* @return void
* @throws ReflectionException
*/
public function hotLoad(): void
{
$manager = Kiri::getDi()->get(Router::class);
$manager->scan_build_route();
} }
@@ -141,15 +120,11 @@ class Server
public function shutdown(): void public function shutdown(): void
{ {
$configs = \config('server', []); $configs = \config('server', []);
$instances = $this->manager->sortService($configs['ports'] ?? []);
$state = Kiri::getDi()->get(State::class);
$instances = $this->manager()->sortService($configs['ports'] ?? []);
foreach ($instances as $config) { foreach ($instances as $config) {
$state->exit($config->port); $this->state->exit($config->port);
} }
$this->dispatch->dispatch(new OnShutdown());
$manager = Kiri::getDi()->get(EventDispatch::class);
$manager->dispatch(new OnShutdown());
} }
@@ -159,8 +134,7 @@ class Server
*/ */
public function isRunner(): bool public function isRunner(): bool
{ {
$state = Kiri::getDi()->get(State::class); return $this->state->isRunner();
return $state->isRunner();
} }
-3
View File
@@ -26,9 +26,6 @@ class ServerCommand extends Command
{ {
const ACTIONS = ['start', 'stop', 'restart'];
private Server $server; private Server $server;
+14 -8
View File
@@ -3,6 +3,7 @@
namespace Kiri\Server\Task; namespace Kiri\Server\Task;
use Kiri;
use Kiri\Server\Constant; use Kiri\Server\Constant;
use Kiri\Server\ServerInterface; use Kiri\Server\ServerInterface;
use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerExceptionInterface;
@@ -13,8 +14,15 @@ use Swoole\Server;
class Task implements TaskInterface class Task implements TaskInterface
{ {
/**
* @param ServerInterface $server
*/
public function __construct(public ServerInterface $server)
{
}
/**
/**
* @param Server $server * @param Server $server
* @return void * @return void
*/ */
@@ -60,7 +68,7 @@ class Task implements TaskInterface
if (is_null($data)) { if (is_null($data)) {
return null; return null;
} }
$data[0] = \Kiri::getDi()->get($data[0]); $data[0] = Kiri::getDi()->get($data[0]);
return call_user_func($data, $task_id, $src_worker_id); return call_user_func($data, $task_id, $src_worker_id);
} }
@@ -73,20 +81,18 @@ class Task implements TaskInterface
*/ */
public function dispatch(array|string|object $handler, ?int $workerId = null): void public function dispatch(array|string|object $handler, ?int $workerId = null): void
{ {
/** @var Server $server */
$server = \Kiri::getDi()->get(ServerInterface::class);
if (is_null($workerId)) { if (is_null($workerId)) {
$workerId = rand(0, $server->setting[Constant::OPTION_TASK_WORKER_NUM] - 1); $workerId = rand(0, $this->server->setting[Constant::OPTION_TASK_WORKER_NUM] - 1);
} }
if (is_string($handler)) { if (is_string($handler)) {
$server->task(serialize([di($handler), 'handle']), $workerId); $this->server->task(serialize([di($handler), 'handle']), $workerId);
} else if (is_array($handler)) { } else if (is_array($handler)) {
if (is_string($handler[0])) { if (is_string($handler[0])) {
$handler[0] = di($handler[0]); $handler[0] = di($handler[0]);
} }
$server->task(serialize($handler), $workerId); $this->server->task(serialize($handler), $workerId);
} else { } else {
$server->task(serialize([$handler, 'handle']), $workerId); $this->server->task(serialize([$handler, 'handle']), $workerId);
} }
} }