modify plugin name

This commit is contained in:
2022-06-16 17:38:22 +08:00
parent 0233acb279
commit 0051f1f15c
25 changed files with 928 additions and 1055 deletions
+195
View File
@@ -0,0 +1,195 @@
<?php
namespace Kiri\Server;
use Exception;
use Kiri;
use Kiri\Abstracts\Config;
use Kiri\Di\ContainerInterface;
use Kiri\Exception\ConfigException;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Server;
use Kiri\Events\EventDispatch;
use Kiri\Exception\NotFindClassException;
use Kiri\Server\Events\OnServerBeforeStart;
/**
*
*/
class AsyncServer
{
use TraitServer;
/**
* @var Server|null
*/
private Server|null $server = null;
/**
* @param Config $config
* @param ContainerInterface $container
* @param EventDispatch $dispatch
* @param ProcessManager $processManager
*/
public function __construct(public Config $config,
public ContainerInterface $container,
public EventDispatch $dispatch,
public ProcessManager $processManager)
{
}
/**
* @param array $service
* @param int $daemon
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFindClassException
* @throws NotFoundExceptionInterface
*/
public function initCoreServers(array $service, int $daemon = 0): void
{
$service = $this->genConfigService($service);
$this->createBaseServer(array_shift($service), $daemon);
foreach ($service as $value) {
$this->addListener($value);
}
$this->processManager->batch(Config::get('processes', []), $this->server);
$this->processManager->batch($this->getProcess(), $this->server);
}
/**
* @param string $name
* @return Server|null
*/
public function getServer(string $name = ''): Server|null
{
return $this->server;
}
/**
* @param \Kiri\Server\Config $config
* @param int $daemon
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFindClassException
* @throws NotFoundExceptionInterface
*/
private function createBaseServer(\Kiri\Server\Config $config, int $daemon = 0): void
{
$match = $this->getServerClass($config->type);
if (is_null($match)) {
throw new NotFindClassException('Unknown server type ' . $config->type);
}
$this->server = new $match($config->host, $config->port, SWOOLE_PROCESS, $config->mode);
$this->server->set($this->systemConfig($config, $daemon));
$this->onEventListen($this->server, Config::get('server.events', []));
$this->container->setBindings(ServerInterface::class, $this->server);
}
/**
* @param \Kiri\Server\Config $config
* @param int $daemon
* @return array
* @throws Exception
* @throws ConfigException
*/
protected function systemConfig(\Kiri\Server\Config $config, int $daemon): array
{
$settings = array_merge(Config::get('server.settings', []), $config->settings);
$settings[Constant::OPTION_DAEMONIZE] = (bool)$daemon;
$settings[Constant::OPTION_ENABLE_REUSE_PORT] = true;
$settings[Constant::OPTION_PID_FILE] = storage('.swoole.pid');
if (!isset($settings[Constant::OPTION_PID_FILE])) {
$settings[Constant::OPTION_LOG_FILE] = storage('system.log');
}
return $settings;
}
/**
* @param \Kiri\Server\Config $config
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function addListener(\Kiri\Server\Config $config): void
{
$port = $this->server->addlistener($config->host, $config->port, $config->mode);
if ($port === false) {
throw new Exception('Listen port fail.' . swoole_last_error());
}
$port->set($this->resetSettings($config->type, $config->settings));
$this->onEventListen($port, $config->getEvents());
Kiri::app()->set($config->getName(), $port);
}
/**
* @param string $type
* @param array $settings
* @return array
*/
private function resetSettings(string $type, array $settings): array
{
if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['open_http_protocol'])) {
$settings['open_http_protocol'] = true;
if (in_array($this->server->setting['dispatch_mode'], [2, 4])) {
$settings['open_http2_protocol'] = true;
}
}
if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['open_websocket_protocol'])) {
$settings['open_websocket_protocol'] = true;
}
return $settings;
}
/**
* @param Server\Port|Server $base
* @param array $events
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function onEventListen(Server\Port|Server $base, array $events): void
{
foreach ($events as $name => $event) {
if (is_array($event) && is_string($event[0])) {
$event[0] = $this->container->get($event[0]);
}
$base->on($name, $event);
}
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function start(): void
{
$this->dispatch->dispatch(new OnServerBeforeStart());
$this->server->start();
}
}
+36
View File
@@ -0,0 +1,36 @@
<?php
namespace Kiri\Server\Abstracts;
use Kiri\Context;
class DoWhile
{
private bool $isStop = false;
/**
* @param array|\Closure $handler
* @return void
*/
public static function waite(array|\Closure $handler): void
{
if (Context::hasContext('stop')) {
return;
}
$handler();
self::waite($handler);
}
/**
* @return void
*/
public static function stop(): void
{
Context::setContext('stop', 1);
}
}
@@ -2,6 +2,7 @@
namespace Kiri\Server;
use Closure;
use Kiri;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
@@ -15,6 +16,9 @@ use Kiri\Server\Events\OnProcessStart;
use Psr\Log\LoggerInterface;
use Swoole\Coroutine;
use Swoole\Process;
use Kiri\Server\Events\OnProcessStop;
use Kiri\Di\ContainerInterface;
use Swoole\Timer;
class ProcessManager
{
@@ -24,20 +28,21 @@ class ProcessManager
private array $_process = [];
/** @var array<string, Process> */
private array $_taskProcess = [];
/**
* @param ContainerInterface $container
* @param LoggerInterface $logger
*/
public function __construct(public ContainerInterface $container, public LoggerInterface $logger)
{
}
#[Inject(LoggerInterface::class)]
public LoggerInterface $logger;
/**
* @param string|OnProcessInterface|BaseProcess $customProcess
* @param string $tag
* @return void
* @return array
* @throws ConfigException
*/
public function add(string|OnProcessInterface|BaseProcess $customProcess, string $tag = 'default')
public function add(string|OnProcessInterface|BaseProcess $customProcess): array
{
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
@@ -46,15 +51,41 @@ class ProcessManager
$system = sprintf('[%s].Custom Process', Config::get('id', 'system-service'));
$this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
$process = $this->parse($customProcess, $system);
if (!Kiri::getDi()->has(SwooleServerInterface::class)) {
$process->start();
} else {
$server = Kiri::getDi()->get(SwooleServerInterface::class);
$server->addProcess($process = $this->parse($customProcess, $system));
if (Context::inCoroutine()) {
return [$customProcess, $this->resolve($customProcess, $system)];
}
$this->_process[$tag][$customProcess->getName()] = $process;
$process = new Process($this->resolve($customProcess, $system),
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
return [$customProcess, $process];
}
/**
* @param $customProcess
* @param $system
* @return Closure
*/
public function resolve($customProcess, $system): Closure
{
return static function () use ($customProcess, $system) {
$process = func_get_arg(0);
if ($process instanceof Process\Pool) {
$process = $process->getProcess(func_get_arg(1));
}
set_env('environmental', Kiri::PROCESS);
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
$dispatcher = Kiri::getDi()->get(EventDispatch::class);
$dispatcher->dispatch(new OnProcessStart());
$customProcess->onSigterm()->process($process);
$dispatcher->dispatch(new OnProcessStop($process));
};
}
@@ -82,66 +113,56 @@ class ProcessManager
/**
* @return void
*/
public function stop()
public function stop(): void
{
foreach ($this->_process as $process) {
$process->exit(0);
}
foreach ($this->_taskProcess as $process) {
$process->exit(0);
}
}
/**
* @param $customProcess
* @param $system
* @return Process
*/
private function parse($customProcess, $system): Process
{
return new Process(static function (Process $process) use ($customProcess, $system) {
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnProcessStart());
set_env('environmental', Kiri::PROCESS);
// $channel = Coroutine::create(function () use ($process, $customProcess) {
// while (!$customProcess->isStop()) {
// $message = $process->read();
// if (!empty($message)) {
// $message = unserialize($message);
// }
// if (is_null($message)) {
// continue;
// }
// $customProcess->onBroadcast($message);
// }
// });
// Context::setContext('waite:process:message', $channel);
$customProcess->onSigterm()->process($process);
},
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
}
/**
* @param array $processes
* @param string $tag
* @param \Swoole\Server|null $server
* @return void
* @throws ConfigException
*/
public function batch(array $processes, string $tag = 'default')
public function batch(array $processes, ?\Swoole\Server $server = null): void
{
foreach ($processes as $process) {
$this->add($process, $tag);
$processes = array_merge($processes, Config::get('processes', []));
if (Context::inCoroutine()) {
$this->poolManager($processes);
return;
}
foreach ($processes as $process) {
[$customProcess, $sProcess] = $this->add($process);
$this->_process[$customProcess->getName()] = $customProcess;
$server->addProcess($sProcess);
}
}
/**
* @param array $processes
* @return void
* @throws ConfigException
*/
protected function poolManager(array $processes): void
{
$manager = new Process\Manager();
foreach ($processes as $process) {
/** @var BaseProcess $customProcess */
[$customProcess, $sProcess] = $this->add($process);
$this->_process[$customProcess->getName()] = $customProcess;
$manager->add($sProcess, $customProcess->isEnableCoroutine());
}
$manager->start();
}
@@ -151,7 +172,7 @@ class ProcessManager
* @param string $tag
* @return void
*/
public function push(string $message, string $name = '', string $tag = 'default')
public function push(string $message, string $name = '', string $tag = 'default'): void
{
$processes = $this->_process;
if (!empty($this->_process[$name])) {
+1 -18
View File
@@ -21,29 +21,12 @@ abstract class Server
/**
* @var LoggerInterface
* @var LoggerInterface
*/
#[Inject(LoggerInterface::class)]
public LoggerInterface $logger;
/**
* @param $prefix
* @throws ConfigException
*/
protected function setProcessName($prefix)
{
if (Kiri::getPlatform()->isMac()) {
return;
}
$name = '[' . Config::get('id', 'system-service') . ']';
if (!empty($prefix)) {
$name .= '.' . $prefix;
}
swoole_set_process_name($name);
}
/**
* Server constructor.
* @throws Exception
+2 -2
View File
@@ -4,7 +4,7 @@ namespace Kiri\Server\Broadcast;
use Kiri;
use Kiri\Server\ProcessManager;
use Kiri\Server\SwooleServerInterface;
use Kiri\Server\ServerInterface;
class Broadcast
{
@@ -19,7 +19,7 @@ class Broadcast
$di = Kiri::getDi();
$di->get(ProcessManager::class)->push($message);
$server = $di->get(SwooleServerInterface::class);
$server = $di->get(ServerInterface::class);
$total = $server->setting['worker_num'] + $server->setting['task_worker_num'];
for ($i = 0; $i < $total; $i++) {
+143
View File
@@ -0,0 +1,143 @@
<?php
namespace Kiri\Server;
/**
*
*/
class Config
{
public string $type;
public string $host;
public int $port;
public string $name;
public int $mode;
public array $settings;
public array $events;
/**
* @return string
*/
public function getType(): string
{
return $this->type;
}
/**
* @param string $type
*/
public function setType(string $type): void
{
$this->type = $type;
}
/**
* @return string
*/
public function getHost(): string
{
return $this->host;
}
/**
* @param string $host
*/
public function setHost(string $host): void
{
$this->host = $host;
}
/**
* @return int
*/
public function getPort(): int
{
return $this->port;
}
/**
* @param int $port
*/
public function setPort(int $port): void
{
$this->port = $port;
}
/**
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @param string $name
*/
public function setName(string $name): void
{
$this->name = $name;
}
/**
* @return int
*/
public function getMode(): int
{
return $this->mode;
}
/**
* @param int $mode
*/
public function setMode(int $mode): void
{
$this->mode = $mode;
}
/**
* @return array
*/
public function getSettings(): array
{
return $this->settings;
}
/**
* @param array $settings
*/
public function setSettings(array $settings): void
{
$this->settings = $settings;
}
/**
* @return array
*/
public function getEvents(): array
{
return $this->events;
}
/**
* @param array $events
*/
public function setEvents(array $events): void
{
$this->events = $events;
}
}
+2 -2
View File
@@ -13,11 +13,11 @@ interface OnCloseInterface
/**
* @param Server $server
* @param Server|\Swoole\Coroutine\Http\Server $server
* @param int $fd
* @return void
*/
public function onClose(Server $server, int $fd): void;
public function onClose(Server|\Swoole\Coroutine\Http\Server $server, int $fd): void;
}
+2 -2
View File
@@ -10,10 +10,10 @@ interface OnMessageInterface
/**
* @param Server $server
* @param Server|\Swoole\Coroutine\Http\Server $server
* @param Frame $frame
* @return void
*/
public function onMessage(Server $server, Frame $frame): void;
public function onMessage(Server|\Swoole\Coroutine\Http\Server $server, Frame $frame): void;
}
+3 -2
View File
@@ -4,16 +4,17 @@ namespace Kiri\Server\Contract;
use Swoole\Http\Request;
use Swoole\WebSocket\Server;
use Swoole\Coroutine\Http\Server as HServer;
interface OnOpenInterface
{
/**
* @param Server $server
* @param Server|HServer $server
* @param Request $request
* @return void
*/
public function onOpen(Server $server, Request $request): void;
public function onOpen(Server|HServer $server, Request $request): void;
}
+1 -1
View File
@@ -12,7 +12,7 @@ interface OnPacketInterface
* @param Server $server
* @param string $data
* @param array $clientInfo
* @return mixed
* @return void
*/
public function onPacket(Server $server, string $data, array $clientInfo): void;
+17
View File
@@ -0,0 +1,17 @@
<?php
namespace Kiri\Server\Events;
use Swoole\Process;
class OnProcessStop
{
/**
* @param Process $process
*/
public function __construct(Process $process)
{
}
}
+9 -7
View File
@@ -2,7 +2,9 @@
namespace Kiri\Server\Events;
use Kiri\Exception\ConfigException;
use Swoole\Server;
use Kiri;
/**
*
@@ -11,13 +13,13 @@ class OnTaskerStart
{
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(public Server $server, public int $workerId)
{
}
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(public Server $server, public int $workerId)
{
}
}
+7 -7
View File
@@ -11,13 +11,13 @@ class OnWorkerStart
{
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(public Server $server, public int $workerId)
{
}
/**
* @param Server|null $server
* @param int $workerId
*/
public function __construct(public ?Server $server, public int $workerId)
{
}
}
+57 -12
View File
@@ -2,14 +2,18 @@
namespace Kiri\Server\Handler;
use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Server\Events\OnAfterReload;
use Kiri\Server\Events\OnBeforeReload;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Kiri\Server\Abstracts\Server;
use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnShutdown;
use Kiri\Server\Events\OnStart;
use Swoole\Server as SServer;
/**
@@ -18,39 +22,80 @@ use Kiri\Server\Events\OnStart;
*/
class OnServer extends Server
{
/**
* @param \Swoole\Server $server
* @param EventDispatch $dispatch
* @throws \Exception
*/
public function __construct(public EventDispatch $dispatch)
{
parent::__construct();
}
/**
* @param SServer $server
* @throws ConfigException
* @throws ReflectionException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function onStart(\Swoole\Server $server)
public function onStart(SServer $server)
{
$this->setProcessName(sprintf('start[%d].server', $server->master_pid));
\Kiri::setProcessName(sprintf('start[%d].server', $server->master_pid));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnStart($server));
$this->dispatch->dispatch(new OnStart($server));
}
/**
* @param \Swoole\Server $server
* @param SServer $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function onBeforeShutdown(\Swoole\Server $server)
public function onBeforeShutdown(SServer $server)
{
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnBeforeShutdown($server));
$this->dispatch->dispatch(new OnBeforeShutdown($server));
}
/**
* @param \Swoole\Server $server
* @param SServer $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function onShutdown(\Swoole\Server $server)
public function onShutdown(SServer $server)
{
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnShutdown($server));
$this->dispatch->dispatch(new OnShutdown($server));
}
/**
* @param SServer $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function onBeforeReload(SServer $server)
{
$this->dispatch->dispatch(new OnBeforeReload($server));
}
/**
* @param SServer $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function onAfterReload(SServer $server)
{
$this->dispatch->dispatch(new OnAfterReload($server));
}
}
+21 -5
View File
@@ -2,8 +2,11 @@
namespace Kiri\Server\Handler;
use Kiri;
use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Kiri\Server\Abstracts\Server;
use Kiri\Exception\ConfigException;
@@ -18,26 +21,39 @@ use Kiri\Server\Events\OnManagerStop;
class OnServerManager extends Server
{
/**
* @param EventDispatch $dispatch
* @throws \Exception
*/
public function __construct(public EventDispatch $dispatch)
{
parent::__construct();
}
/**
* @param \Swoole\Server $server
* @throws ConfigException|ReflectionException
* @param \Swoole\Server $server
* @throws ConfigException
* @throws ReflectionException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function onManagerStart(\Swoole\Server $server)
{
$this->setProcessName(sprintf('manger[%d].0', $server->manager_pid));
Kiri::setProcessName(sprintf('manger[%d].0', $server->manager_pid));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnManagerStart($server));
$this->dispatch->dispatch(new OnManagerStart($server));
}
/**
* @param \Swoole\Server $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function onManagerStop(\Swoole\Server $server)
{
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnManagerStop($server));
$this->dispatch->dispatch(new OnManagerStop($server));
}
-38
View File
@@ -1,38 +0,0 @@
<?php
namespace Kiri\Server\Handler;
use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Server\Events\OnAfterReload;
use Kiri\Server\Events\OnBeforeReload;
use Swoole\Server;
/**
*
*/
class OnServerReload
{
/**
* @param Server $server
* @throws \ReflectionException
*/
public function onBeforeReload(Server $server)
{
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnBeforeReload($server));
}
/**
* @param Server $server
* @throws \ReflectionException
*/
public function onAfterReload(Server $server)
{
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnAfterReload($server));
}
}
+85 -96
View File
@@ -30,111 +30,100 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
{
public Router $router;
/**
* @param EventDispatch $dispatch
* @param Router $router
* @throws Exception
*/
public function __construct(public EventDispatch $dispatch, public Router $router)
{
parent::__construct();
}
public EventDispatch $dispatch;
/**
* @param Server $server
* @param int $workerId
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function onWorkerStart(Server $server, int $workerId): void
{
$this->dispatch->dispatch(new OnBeforeWorkerStart($workerId));
set_env('environmental_workerId', $workerId);
if ($workerId < $server->setting['worker_num']) {
$this->dispatch->dispatch(new OnWorkerStart($server, $workerId));
} else {
$this->dispatch->dispatch(new OnTaskStart($server, $workerId));
}
$this->dispatch->dispatch(new OnAfterWorkerStart());
}
/**
* @return void
*/
public function init()
{
$this->dispatch = Kiri::getDi()->get(EventDispatch::class);
$this->router = Kiri::getDi()->get(Router::class);
}
/**
* @param Server $server
* @param int $workerId
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface|ReflectionException
*/
public function onWorkerStop(Server $server, int $workerId)
{
$this->dispatch->dispatch(new OnWorkerStop($server, $workerId));
}
/**
* @param Server $server
* @param int $workerId
* @return void
* @throws Kiri\Exception\ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function onWorkerStart(Server $server, int $workerId)
{
$this->dispatch->dispatch(new OnBeforeWorkerStart($workerId));
set_env('environmental_workerId', $workerId);
if ($workerId < $server->setting['worker_num']) {
set_env('environmental', Kiri::WORKER);
$this->setProcessName(sprintf('Worker Process[%d].%d', $server->worker_pid, $workerId));
$this->dispatch->dispatch(new OnWorkerStart($server, $workerId));
} else {
set_env('environmental', Kiri::TASK);
$this->setProcessName(sprintf('Tasker Process[%d].%d', $server->worker_pid, $workerId));
$this->dispatch->dispatch(new OnTaskStart($server, $workerId));
}
$this->dispatch->dispatch(new OnAfterWorkerStart());
}
/**
* @param Server $server
* @param int $workerId
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface|ReflectionException
*/
public function onWorkerExit(Server $server, int $workerId)
{
$this->dispatch->dispatch(new OnWorkerExit($server, $workerId));
}
/**
* @param Server $server
* @param int $workerId
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface|ReflectionException
*/
public function onWorkerStop(Server $server, int $workerId)
{
Timer::clearAll();
$this->dispatch->dispatch(new OnWorkerStop($server, $workerId));
}
/**
* @param Server $server
* @param int $worker_id
* @param int $worker_pid
* @param int $exit_code
* @param int $signal
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal)
{
$this->dispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
$message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s',
$worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9)
);
$this->logger->error($message);
$this->system_mail($message);
}
/**
* @param Server $server
* @param int $workerId
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface|ReflectionException
*/
public function onWorkerExit(Server $server, int $workerId)
{
$this->dispatch->dispatch(new OnWorkerExit($server, $workerId));
}
/**
* @param Server $server
* @param int $worker_id
* @param int $worker_pid
* @param int $exit_code
* @param int $signal
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal)
{
$this->dispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
$message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s',
$worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9)
);
$this->logger->error($message);
$this->system_mail($message);
}
/**
* @param $messageContent
* @throws Exception
*/
protected function system_mail($messageContent)
{
try {
$email = Config::get('email', ['enable' => false]);
if (!empty($email) && ($email['enable'] ?? false) == true) {
Help::sendEmail($email, 'Service Error', $messageContent);
}
} catch (\Throwable $e) {
error($e, 'email');
}
}
/**
* @param $messageContent
* @throws Exception
*/
protected function system_mail($messageContent)
{
try {
$email = Config::get('email', ['enable' => false]);
if (!empty($email) && ($email['enable'] ?? false)) {
Help::sendEmail($email, 'Service Error', $messageContent);
}
} catch (\Throwable $e) {
error($e, 'email');
}
}
}
-212
View File
@@ -1,212 +0,0 @@
<?php
namespace Kiri\Server;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Error\StdoutLoggerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
/**
*
*/
class Inotify extends BaseProcess
{
private mixed $inotify = null;
private mixed $events;
private array $watchFiles = [];
public bool $isReloading = FALSE;
public string $name = 'inotify listen';
public array $dirs = [];
protected int $cid = -1;
const IG_DIR = [APP_PATH . 'commands', APP_PATH . '.git', APP_PATH . '.gitee'];
#[Inject(StdoutLoggerInterface::class)]
public StdoutLoggerInterface $logger;
/**
* @param Process $process
* @return void
* @throws Exception
*/
public function process(Process $process): void
{
// TODO: Implement process() method.
set_error_handler([$this, 'error']);
set_exception_handler([$this, 'error']);
if (!extension_loaded('inotify')) {
while (true) {
if ($this->isStop()) {
break;
}
sleep(1);
}
return;
}
$this->dirs = Config::get('reload.inotify', []);
$this->start();
}
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->isStop = true;
});
return $this;
}
/**
* @return void
*/
public function error(): void
{
}
/**
* @throws Exception
*/
public function start()
{
$this->inotify = inotify_init();
$this->events = IN_MODIFY | IN_DELETE | IN_CREATE | IN_MOVE;
foreach ($this->dirs as $dir) {
if (!is_dir($dir)) continue;
$this->watch($dir);
}
Event::add($this->inotify, [$this, 'check']);
Event::cycle(function () {
if ($this->isStop()) {
Event::del($this->inotify);
Event::exit();
}
}, true);
Event::wait();
}
/**
* 开始监听
* @throws Exception
*/
public function check()
{
if (!($events = inotify_read($this->inotify))) {
return;
}
if ($this->isReloading) {
return;
}
$LISTEN_TYPE = [IN_CREATE, IN_DELETE, IN_MODIFY, IN_MOVED_TO, IN_MOVED_FROM];
foreach ($events as $ev) {
if (!in_array($ev['mask'], $LISTEN_TYPE)) {
continue;
}
//非重启类型
if (str_ends_with($ev['name'], '.php')) {
if ($this->isReloading) {
break;
}
$this->isReloading = TRUE;
Timer::after(3000, fn() => $this->reload());
}
}
}
/**
* @throws Exception
*/
public function reload()
{
$swollen = \Kiri::getDi()->get(SwooleServerInterface::class);
$swollen->reload();
$this->clearWatch();
foreach ($this->dirs as $root) {
$this->watch($root);
}
$this->isReloading = FALSE;
}
/**
* @throws Exception
*/
public function clearWatch()
{
foreach ($this->watchFiles as $wd) {
@inotify_rm_watch($this->inotify, $wd);
}
$this->watchFiles = [];
}
/**
* @param $dir
* @return bool
* @throws Exception
*/
public function watch($dir): bool
{
//目录不存在
if (!is_dir($dir)) {
return $this->logger->addError("[$dir] is not a directory.");
}
//避免重复监听
if (isset($this->watchFiles[$dir])) {
return FALSE;
}
if (in_array($dir, self::IG_DIR)) {
return FALSE;
}
$wd = @inotify_add_watch($this->inotify, $dir, $this->events);
$this->watchFiles[$dir] = $wd;
$files = scandir($dir);
foreach ($files as $f) {
if ($f == '.' || $f == '..') {
continue;
}
$path = $dir . '/' . $f;
//递归目录
if (is_dir($path)) {
$this->watch($path);
} else if (!str_ends_with($f, '.php')) {
continue;
}
//检测文件类型
if (strstr($f, '.') == '.php') {
$wd = @inotify_add_watch($this->inotify, $path, $this->events);
$this->watchFiles[$path] = $wd;
}
}
return TRUE;
}
}
-170
View File
@@ -1,170 +0,0 @@
<?php
namespace Kiri\Server;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Error\StdoutLoggerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\SwooleServerInterface;
use Psr\Log\LoggerInterface;
use Swoole\Process;
class Scaner extends BaseProcess
{
private array $md5Map = [];
public bool $isReloading = FALSE;
private array $dirs = [];
/**
* @var LoggerInterface
*/
#[Inject(LoggerInterface::class)]
public LoggerInterface $logger;
/**
* @throws Exception
*/
public function process(Process $process): void
{
$this->dirs = Config::get('reload.inotify', []);
$this->loadDirs();
$this->tick();
}
/**
* @param bool $isReload
* @throws Exception
*/
private function loadDirs(bool $isReload = FALSE)
{
foreach ($this->dirs as $value) {
if (is_bool($path = realpath($value))) {
continue;
}
if (!is_dir($path)) continue;
$this->loadByDir($path, $isReload);
}
}
/**
* @param $path
* @param bool $isReload
* @return void
* @throws Exception
*/
private function loadByDir($path, bool $isReload = FALSE): void
{
if (!is_string($path)) {
return;
}
$path = rtrim($path, '/');
foreach (glob(realpath($path) . '/*') as $value) {
if (is_dir($value)) {
$this->loadByDir($value, $isReload);
}
if (is_file($value)) {
if ($this->checkFile($value, $isReload)) {
if ($this->isReloading) {
break;
}
$this->isReloading = TRUE;
sleep(2);
$this->timerReload();
break;
}
}
}
}
/**
* @param $value
* @param $isReload
* @return bool
*/
private function checkFile($value, $isReload): bool
{
$md5 = md5($value);
$mTime = filectime($value);
if (!isset($this->md5Map[$md5])) {
if ($isReload) {
return TRUE;
}
$this->md5Map[$md5] = $mTime;
} else {
if ($this->md5Map[$md5] != $mTime) {
if ($isReload) {
return TRUE;
}
$this->md5Map[$md5] = $mTime;
}
}
return FALSE;
}
/**
* @throws Exception
*/
public function timerReload()
{
$this->isReloading = TRUE;
$this->logger->warning('file change');
$swow = \Kiri::getDi()->get(SwooleServerInterface::class);
$swow->reload();
$this->loadDirs();
$this->isReloading = FALSE;
$this->tick();
}
/**
* @return $this
*/
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->onProcessStop();
});
return $this;
}
/**
* @throws Exception
*/
public function tick()
{
if ($this->isStop) {
return;
}
$this->loadDirs(TRUE);
sleep(2);
$this->tick();
}
}
+70 -16
View File
@@ -20,10 +20,13 @@ use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnServerBeforeStart;
use Kiri\Server\Events\OnShutdown;
use Kiri\Server\Events\OnWorkerStart;
use Kiri\Server\Events\OnTaskerStart;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Kiri\Server\Events\OnWorkerStop;
use ReflectionException;
use Kiri\Reload\Scaner;
use Swoole\WebSocket\Server as WsServer;
use Swoole\Server as SServer;
use Swoole\Http\Server as HServer;
@@ -47,7 +50,7 @@ class Server extends HttpService
/**
* @param State $state
* @param ServerManager $manager
* @param AsyncServer $manager
* @param ContainerInterface $container
* @param ProcessManager $processManager
* @param EventDispatch $dispatch
@@ -57,7 +60,7 @@ class Server extends HttpService
* @throws Exception
*/
public function __construct(public State $state,
public ServerManager $manager,
public AsyncServer $manager,
public ContainerInterface $container,
public ProcessManager $processManager,
public EventDispatch $dispatch,
@@ -72,14 +75,9 @@ class Server extends HttpService
/**
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function init(): void
{
$this->container->mapping(ResponseInterface::class, Response::class);
$this->container->mapping(RequestInterface::class, Request::class);
$enable_coroutine = Config::get('servers.settings.enable_coroutine', false);
if (!$enable_coroutine) {
return;
@@ -112,23 +110,78 @@ class Server extends HttpService
*/
public function start(): void
{
$this->manager->initBaseServer(Config::get('server', [], true), $this->daemon);
$this->manager->initCoreServers(Config::get('server', [], true), $this->daemon);
$rpcService = Config::get('rpc', []);
if (!empty($rpcService)) {
$this->manager->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], $rpcService['mode'], $rpcService);
/** @var \Kiri\Server\Config $create */
$create = $this->container->create(\Kiri\Server\Config::class, null, $rpcService);
$this->manager->addListener($create);
}
pcntl_signal(SIGINT, [$this, 'onSigint']);
$this->onHotReload();
pcntl_signal(SIGINT, [$this, 'onSigint']);
$processes = array_merge($this->process, Config::get('processes', []));
$this->processManager->batch($processes);
$this->processManager->batch($this->process, $this->manager->getServer());
$this->dispatch->dispatch(new OnServerBeforeStart());
$this->manager->start();
}
/**
* @return void
*/
protected function onWorkerListener(): void
{
$this->provider->on(OnWorkerStop::class, '\Swoole\Timer::clearAll');
$this->provider->on(OnWorkerStart::class, [$this, 'setWorkerName']);
$this->provider->on(OnTaskerStart::class, [$this, 'setTaskerName']);
}
/**
* @param OnWorkerStart $onWorkerStart
* @throws ConfigException
*/
protected function setWorkerName(OnWorkerStart $onWorkerStart): void
{
$prefix = sprintf('Worker Process[%d].%d', $onWorkerStart->server->worker_pid, $onWorkerStart->workerId);
set_env('environmental', Kiri::WORKER);
$this->setProcessName($prefix);
}
/**
* @param OnWorkerStart $onWorkerStart
* @throws ConfigException
*/
protected function setTaskerName(OnWorkerStart $onWorkerStart): void
{
$prefix = sprintf('Worker Process[%d].%d', $onWorkerStart->server->worker_pid, $onWorkerStart->workerId);
set_env('environmental', Kiri::WORKER);
$this->setProcessName($prefix);
}
/**
* @param $prefix
* @throws ConfigException
*/
protected function setProcessName($prefix): void
{
if (Kiri::getPlatform()->isMac()) {
return;
}
$name = '[' . Config::get('id', 'system-service') . ']';
if (!empty($prefix)) {
$name .= '.' . $prefix;
}
swoole_set_process_name($name);
}
/**
* @return void
* @throws ConfigException
@@ -136,13 +189,14 @@ class Server extends HttpService
*/
public function onHotReload(): void
{
$this->onWorkerListener();
$reload = Config::get('reload.hot', false);
if ($reload !== false) {
$this->provider->on(OnWorkerStart::class, [$this, 'onWorkerStart']);
$this->provider->on(OnWorkerStart::class, [$this, 'LoadRoutingList']);
$this->process[] = Scaner::class;
} else {
$this->onWorkerStart();
$this->LoadRoutingList();
}
}
@@ -167,7 +221,7 @@ class Server extends HttpService
* @throws ReflectionException
* @throws Exception
*/
public function onWorkerStart(): void
public function LoadRoutingList(): void
{
scan_directory(MODEL_PATH, 'app\Model');
+90 -56
View File
@@ -10,6 +10,7 @@ use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -24,67 +25,100 @@ class ServerCommand extends Command
{
const ACTIONS = ['start', 'stop', 'restart'];
const ACTIONS = ['start', 'stop', 'restart'];
/**
*
*/
protected function configure()
{
$this->setName('sw:server')
->setDescription('server start|stop|reload|restart')
->addArgument('action', InputArgument::OPTIONAL, 'run action', 'start')
->addOption('daemon', 'd', InputOption::VALUE_OPTIONAL, 'is run daemonize');
}
/**
* @return void
*/
protected function configure(): void
{
$this->setName('sw:server')
->setDescription('server start|stop|reload|restart')
->addArgument('action', InputArgument::OPTIONAL, 'run action', 'start')
->addOption('daemon', 'd', InputOption::VALUE_OPTIONAL, 'is run daemonize');
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws Exception
*/
public function execute(InputInterface $input, OutputInterface $output): int
{
$manager = Kiri::app()->getServer();
$manager->setDaemon((int)!is_null($input->getOption('daemon')));
$this->scan_file();
$action = $input->getArgument('action');
if (is_null($action)) {
throw new Exception('I don\'t know what I want to do.');
}
if (!in_array($action, self::ACTIONS)) {
throw new Exception('I don\'t know what I want to do.');
}
if ($action == 'restart' || $action == 'stop') {
$manager->shutdown();
if ($action == 'stop') {
return 1;
}
}
$manager->start();
return 1;
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
public function execute(InputInterface $input, OutputInterface $output): int
{
return match ($input->getArgument('action')) {
'restart' => $this->restart($input),
'stop' => $this->stop(),
'start' => $this->start($input),
default =>
throw new Exception('I don\'t know what I want to do.')
};
}
/**
* @return void
* @throws ConfigException
* @throws \ReflectionException
*/
protected function scan_file(): void
{
$config = Config::get('reload.scanner', []);
if (is_array($config)) foreach ($config as $key => $value) {
scan_directory($value, $key);
}
}
/**
* @param InputInterface $input
* @return int
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface|ReflectionException
*/
protected function restart(InputInterface $input): int
{
$this->stop();
$this->start($input);
return 1;
}
/**
* @return int
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
protected function stop(): int
{
$manager = Kiri::app()->getServer();
$manager->shutdown();
return 1;
}
/**
* @param InputInterface $input
* @return int
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface|ReflectionException
*/
protected function start(InputInterface $input): int
{
$this->scan_file();
$manager = Kiri::app()->getServer();
$manager->setDaemon((int)!is_null($input->getOption('daemon')));
$manager->start();
return 1;
}
/**
* @return void
* @throws ConfigException
* @throws ReflectionException
*/
protected function scan_file(): void
{
$config = Config::get('reload.scanner', []);
if (is_array($config)) foreach ($config as $key => $value) {
scan_directory($value, $key);
}
}
}
+21
View File
@@ -0,0 +1,21 @@
<?php
namespace Kiri\Server;
use Swoole\Server;
/**
* @mixin Server
*/
interface ServerInterface
{
/**
* @param string $name
* @return Server|\Swoole\Coroutine\Server|\Swoole\Coroutine\Http\Server
*/
public function getServer(string $name): Server|\Swoole\Coroutine\Server|\Swoole\Coroutine\Http\Server;
}
-332
View File
@@ -1,332 +0,0 @@
<?php
namespace Kiri\Server;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Message\Emitter;
use Kiri\Message\ResponseEmitter;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnDisconnectInterface;
use Kiri\Server\Contract\OnHandshakeInterface;
use Kiri\Server\Contract\OnMessageInterface;
use Kiri\Server\Contract\OnPacketInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Handler\OnPipeMessage;
use Kiri\Server\Handler\OnServer;
use Kiri\Server\Handler\OnServerManager;
use Kiri\Server\Handler\OnServerWorker;
use Kiri\Task\TaskManager;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Http\Server as HServer;
use Swoole\Server;
use Swoole\Server\Port;
use Swoole\WebSocket\Server as WServer;
/**
* Class OnServerManager
* @package Http\Service
*/
class ServerManager extends Component
{
use TraitServer;
/** @var string */
public string $host = '';
public int $port = 0;
/** @var array<string,Port> */
public array $ports = [];
public int $mode = SWOOLE_TCP;
private Server|null $server = null;
const DEFAULT_EVENT = [
Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'],
Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'],
Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'],
Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'],
Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'],
Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'],
Constant::START => [OnServer::class, 'onStart'],
Constant::SHUTDOWN => [OnServer::class, 'onShutdown'],
];
/**
* @var array|string[]
*/
private array $eventInterface = [
OnReceiveInterface::class => 'receive',
OnPacketInterface::class => 'packet',
OnHandshakeInterface::class => 'handshake',
OnMessageInterface::class => 'message',
OnConnectInterface::class => 'connect',
OnCloseInterface::class => 'close',
OnDisconnectInterface::class => 'disconnect'
];
/**
* @return Server|WServer|HServer|null
*/
public function getServer(): Server|WServer|HServer|null
{
return $this->server;
}
/**
* @param ContainerInterface $container
* @param TaskManager $manager
* @param array $config
* @throws Exception
*/
public function __construct(public ContainerInterface $container,
public TaskManager $manager, array $config = [])
{
parent::__construct($config);
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
if (!$this->server) {
$this->createBaseServer($type, $host, $port, $mode, $settings);
} else {
if (!isset($settings['settings'])) {
$settings['settings'] = [];
}
$this->addNewListener($type, $host, $port, $mode, $settings);
}
}
/**
* @param $configs
* @param int $daemon
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function initBaseServer($configs, int $daemon = 0): void
{
$context = di(ServerManager::class);
foreach ($this->sortService($configs['ports']) as $config) {
$this->startListenerHandler($context, $config, $daemon);
}
$this->bindPipeMessage();
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function bindPipeMessage(): void
{
$pipeMessage = $this->container->get(OnPipeMessage::class);
$this->server->on(Constant::PIPE_MESSAGE, [$pipeMessage, 'onPipeMessage']);
if (!isset($this->server->setting['task_worker_num']) || $this->server->setting['task_worker_num'] < 1) {
return;
}
$this->manager->taskListener($this->server);
}
/**
* @return array
*/
public function getSetting(): array
{
return $this->server->setting;
}
/**
* @param ServerManager $context
* @param array $config
* @param int $daemon
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0)
{
if (!$this->server) {
$config = $this->mergeConfig($config, $daemon);
}
$context->addListener(
$config['type'], $config['host'], $config['port'], $config['mode'],
$config);
}
/**
* @param $config
* @param $daemon
* @return array
* @throws Exception
*/
private function mergeConfig($config, $daemon): array
{
$config['settings'] = $config['settings'] ?? [];
$config['settings']['daemonize'] = $daemon;
if (!isset($config['settings']['log_file'])) {
$config['settings']['log_file'] = storage('system.log');
}
$config['settings']['pid_file'] = storage('.swoole.pid');
$config['settings'][Constant::OPTION_ENABLE_REUSE_PORT] = true;
$config['events'] = $config['events'] ?? [];
return $config;
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
$id = Config::get('id', 'system-service');
$this->logger->alert(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port));
/** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode);
if ($this->ports[$port] === false) {
throw new Exception("The port is already in use[$host::$port]");
}
$this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]);
$this->ports[$port]->set($this->resetSettings($type, $settings));
}
/**
* @param string $type
* @param array $settings
* @return array
*/
private function resetSettings(string $type, array $settings): array
{
if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['settings']['open_http_protocol'])) {
$settings['settings']['open_http_protocol'] = true;
if (in_array($this->server->setting['dispatch_mode'], [2, 4])) {
$settings['settings']['open_http2_protocol'] = true;
}
}
if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['settings']['open_websocket_protocol'])) {
$settings['settings']['open_websocket_protocol'] = true;
}
return $settings['settings'] ?? [];
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
$match = match ($type) {
Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP,
Constant::SERVER_TYPE_UDP => Server::class,
Constant::SERVER_TYPE_HTTP => HServer::class,
Constant::SERVER_TYPE_WEBSOCKET => WServer::class
};
$this->server = new $match($host, $port, SWOOLE_PROCESS, $mode);
$this->server->set(array_merge(Config::get('server.settings', []), $settings['settings']));
$this->container->setBindings(SwooleServerInterface::class, $this->server);
$this->container->setBindings(Emitter::class, ResponseEmitter::class);
$id = Config::get('id', 'system-service');
$this->logger->alert(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port));
$this->addDefaultListener($settings);
}
/**
* @param array $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function addDefaultListener(array $settings): void
{
$this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server);
if (!empty($settings['events']) && is_array($settings['events'])) {
$this->addServiceEvents($settings['events'], $this->server);
}
}
/**
* @param array $events
* @param Server|Port $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function addServiceEvents(array $events, Server|Port $server)
{
foreach ($events as $name => $event) {
if (is_array($event) && is_string($event[0])) {
$event[0] = $this->container->get($event[0]);
}
$server->on($name, $event);
}
}
/**
*
*/
public function start()
{
$this->server->start();
}
}
-14
View File
@@ -1,14 +0,0 @@
<?php
namespace Kiri\Server;
use Swoole\Server;
/**
* @mixin Server
*/
interface SwooleServerInterface
{
}
+82
View File
@@ -2,10 +2,36 @@
namespace Kiri\Server;
use Swoole\Http\Server as HServer;
use Swoole\Server;
use Swoole\WebSocket\Server as WServer;
trait TraitServer
{
private array $_process = [];
/**
* @param $class
* @return void
*/
public function addProcess($class): void
{
$this->_process[] = $class;
}
/**
* @return array
*/
public function getProcess(): array
{
return $this->_process;
}
/**
* @param array $ports
* @return array
@@ -29,4 +55,60 @@ trait TraitServer
return $array;
}
/**
* @param array $ports
* @return array<Config>
*/
public function genConfigService(array $ports): array
{
$array = [];
foreach ($ports as $port) {
$config = \Kiri::getDi()->create(Config::class, [], $port);
if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
array_unshift($array, $config);
} else if ($port['type'] == Constant::SERVER_TYPE_HTTP) {
if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$array[] = $config;
} else {
array_unshift($array, $config);
}
} else {
$array[] = $config;
}
}
return $array;
}
/**
* @param $type
* @return string|null
*/
public function getServerClass($type): ?string
{
return match ($type) {
Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP,
Constant::SERVER_TYPE_UDP => Server::class,
Constant::SERVER_TYPE_HTTP => HServer::class,
Constant::SERVER_TYPE_WEBSOCKET => WServer::class,
default => null
};
}
/**
* @param $type
* @return string|null
*/
public function getCoroutineServerClass($type): ?string
{
return match ($type) {
Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP, Constant::SERVER_TYPE_UDP => Server::class,
Constant::SERVER_TYPE_HTTP, Constant::SERVER_TYPE_WEBSOCKET => \Swoole\Coroutine\Http\Server::class,
default => null
};
}
}