Files
kiri-http-server/ServerManager.php
T

457 lines
12 KiB
PHP
Raw Normal View History

2021-11-03 15:17:52 +08:00
<?php
namespace Server;
2021-11-30 15:10:00 +08:00
use Note\Inject;
2021-11-03 15:17:52 +08:00
use Exception;
use Kiri\Abstracts\Config;
2021-11-30 14:32:56 +08:00
use Kiri\Di\Container;
2021-11-27 17:43:28 +08:00
use Psr\Container\ContainerInterface;
2021-11-17 16:39:12 +08:00
use Kiri\Error\Logger;
2021-11-03 15:17:52 +08:00
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
2021-11-27 17:43:28 +08:00
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
2021-11-03 15:17:52 +08:00
use ReflectionException;
2021-11-03 17:39:08 +08:00
use Server\Abstracts\BaseProcess;
2021-11-18 11:37:12 +08:00
use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface;
use Server\Contract\OnDisconnectInterface;
use Server\Contract\OnHandshakeInterface;
use Server\Contract\OnMessageInterface;
use Server\Contract\OnPacketInterface;
use Server\Contract\OnProcessInterface;
use Server\Contract\OnReceiveInterface;
use Server\Contract\OnTaskInterface;
2021-11-18 15:37:10 +08:00
use Server\Handler\OnPipeMessage;
use Server\Handler\OnServer;
use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
2021-11-30 14:32:56 +08:00
use Server\Tasker\OnServerTask;
2021-11-18 15:37:10 +08:00
use Server\Handler\OnServerWorker;
2021-11-03 15:17:52 +08:00
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
use Swoole\Server\Port;
use Swoole\WebSocket\Server as WServer;
/**
* Class OnServerManager
* @package Http\Service
*/
class ServerManager
{
2021-11-17 16:39:12 +08:00
use TraitServer;
2021-11-03 15:17:52 +08:00
/** @var string */
public string $host = '';
public int $port = 0;
2021-11-17 16:39:12 +08:00
#[Inject(Logger::class)]
public Logger $logger;
2021-11-03 15:17:52 +08:00
/** @var array<string,Port> */
public array $ports = [];
public int $mode = SWOOLE_TCP;
private Server|null $server = null;
/**
2021-11-30 14:32:56 +08:00
* @var Container
2021-11-03 15:17:52 +08:00
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
const DEFAULT_EVENT = [
Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'],
Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'],
Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'],
Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'],
Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'],
Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'],
Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'],
Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'],
Constant::START => [OnServer::class, 'onStart'],
Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'],
Constant::SHUTDOWN => [OnServer::class, 'onShutdown'],
];
private array $eventInterface = [
OnReceiveInterface::class => 'receive',
OnPacketInterface::class => 'packet',
OnHandshakeInterface::class => 'handshake',
OnMessageInterface::class => 'message',
OnConnectInterface::class => 'connect',
OnCloseInterface::class => 'close',
OnDisconnectInterface::class => 'disconnect'
];
/**
* @return Server|WServer|HServer|null
*/
public function getServer(): Server|WServer|HServer|null
{
return $this->server;
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws Exception
*/
public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
if (!$this->server) {
$this->createBaseServer($type, $host, $port, $mode, $settings);
} else {
if (!isset($settings['settings'])) {
$settings['settings'] = [];
}
$this->addNewListener($type, $host, $port, $mode, $settings);
}
}
/**
* @throws ReflectionException
* @throws ConfigException
*/
public function initBaseServer($configs, int $daemon = 0): void
{
$context = di(ServerManager::class);
foreach ($this->sortService($configs['ports']) as $config) {
$this->startListenerHandler($context, $config, $daemon);
}
2021-11-17 16:39:12 +08:00
$this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
2021-11-03 15:17:52 +08:00
}
/**
* @return bool
* @throws ConfigException
* @throws Exception
*/
public function isRunner(): bool
{
$configs = Config::get('server', [], true);
foreach ($this->sortService($configs['ports']) as $config) {
2021-11-17 16:39:12 +08:00
if (checkPortIsAlready($config['port'])) {
2021-11-03 15:17:52 +08:00
return true;
}
}
return false;
}
/**
2021-11-03 17:39:08 +08:00
* @param string|OnProcessInterface|BaseProcess $customProcess
2021-11-03 15:17:52 +08:00
* @throws Exception
*/
2021-11-03 17:39:08 +08:00
public function addProcess(string|OnProcessInterface|BaseProcess $customProcess)
2021-11-03 15:17:52 +08:00
{
2021-11-03 17:39:08 +08:00
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
2021-11-03 15:17:52 +08:00
}
2021-11-18 15:37:10 +08:00
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
2021-11-18 17:05:00 +08:00
$process = new Process(function (Process $process) use ($customProcess, $system) {
2021-11-18 18:06:45 +08:00
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
2021-11-18 17:05:00 +08:00
$customProcess->process($process);
}, $customProcess->getRedirectStdinAndStdout(), $customProcess->getPipeType(), $customProcess->isEnableCoroutine());
2021-11-18 15:37:10 +08:00
$this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
2021-11-17 16:39:12 +08:00
$this->container->setBindings($customProcess->getName(), $process);
2021-11-03 17:39:08 +08:00
$this->server->addProcess($process);
2021-11-03 15:17:52 +08:00
}
/**
* @return array
*/
public function getSetting(): array
{
return $this->server->setting;
}
/**
* @param string $key
* @param string|int $value
*/
public static function setEnv(string $key, string|int $value): void
{
putenv(sprintf('%s=%s', $key, (string)$value));
}
/**
* @param ServerManager $context
* @param array $config
* @param int $daemon
* @throws ConfigException
* @throws ReflectionException
* @throws Exception
*/
private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0)
{
if (!$this->server) {
$config = $this->mergeConfig($config, $daemon);
}
$context->addListener(
$config['type'], $config['host'], $config['port'], $config['mode'],
$config);
}
/**
* @param $config
* @param $daemon
* @return array
* @throws Exception
*/
private function mergeConfig($config, $daemon): array
{
$config['settings'] = $config['settings'] ?? [];
2021-11-04 16:19:55 +08:00
$config['settings']['daemonize'] = $daemon;
2021-11-03 15:17:52 +08:00
if (!isset($config['settings']['log_file'])) {
$config['settings']['log_file'] = storage('system.log');
}
$config['settings']['pid_file'] = storage('.swoole.pid');
2021-11-18 16:54:15 +08:00
$config['settings'][Constant::OPTION_ENABLE_REUSE_PORT] = true;
2021-11-03 15:17:52 +08:00
$config['events'] = $config['events'] ?? [];
return $config;
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
2021-11-30 14:32:56 +08:00
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
2021-11-03 15:17:52 +08:00
* @throws Exception
*/
private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
$id = Config::get('id', 'system-service');
2021-11-17 16:39:12 +08:00
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port));
2021-11-03 15:17:52 +08:00
/** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode);
if ($this->ports[$port] === false) {
throw new Exception("The port is already in use[$host::$port]");
}
if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['settings']['open_http_protocol'])) {
$settings['settings']['open_http_protocol'] = true;
if (in_array($this->server->setting['dispatch_mode'], [2, 4])) {
$settings['settings']['open_http2_protocol'] = true;
}
}
if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['settings']['open_websocket_protocol'])) {
$settings['settings']['open_websocket_protocol'] = true;
}
$this->ports[$port]->set($settings['settings'] ?? []);
$this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]);
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ConfigException
2021-11-30 14:32:56 +08:00
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
2021-11-17 16:39:12 +08:00
* @throws Exception
2021-11-03 15:17:52 +08:00
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
$match = match ($type) {
Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP,
Constant::SERVER_TYPE_UDP => Server::class,
Constant::SERVER_TYPE_HTTP => HServer::class,
Constant::SERVER_TYPE_WEBSOCKET => WServer::class
};
$this->server = new $match($host, $port, SWOOLE_PROCESS, $mode);
$this->server->set(array_merge(Config::get('server.settings', []), $settings['settings']));
$id = Config::get('id', 'system-service');
2021-11-17 16:39:12 +08:00
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port));
2021-11-03 15:17:52 +08:00
$this->addDefaultListener($settings);
}
/**
* @param int $port
* @throws Exception
*/
public function stopServer(int $port)
{
2021-11-17 16:39:12 +08:00
if (!($pid = checkPortIsAlready($port))) {
2021-11-03 15:17:52 +08:00
return;
}
2021-11-17 16:39:12 +08:00
while (checkPortIsAlready($port)) {
2021-11-03 15:17:52 +08:00
Process::kill($pid, SIGTERM);
usleep(300);
}
}
/**
* @param array $settings
2021-11-30 14:32:56 +08:00
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
2021-11-03 15:17:52 +08:00
* @throws ReflectionException
*/
private function addDefaultListener(array $settings): void
{
if (($this->server->setting['task_worker_num'] ?? 0) > 0) {
$this->addTaskListener($settings['events']);
}
$this->container->setBindings(SwooleServerInterface::class, $this->server);
$this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server);
if (!empty($settings['events']) && is_array($settings['events'])) {
$this->addServiceEvents($settings['events'], $this->server);
}
}
2021-11-30 14:32:56 +08:00
/**
* @param array $events
* @param Server|Port $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
2021-11-03 15:17:52 +08:00
private function addServiceEvents(array $events, Server|Port $server)
{
foreach ($events as $name => $event) {
if (is_array($event) && is_string($event[0])) {
$event[0] = $this->container->get($event[0]);
}
$server->on($name, $event);
}
}
/**
*
*/
public function start()
{
$this->server->start();
}
/**
* @param string $class
* @return object
*/
private function getNewInstance(string $class): object
{
return $this->container->create($class);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws ReflectionException
* @throws Exception
*/
public function task(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerId = random_int($this->server->setting['worker_num'] + 1,
$this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']);
}
if (is_string($handler)) {
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
$handler = $implements->newInstanceArgs($params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param mixed $message
* @param int $workerId
* @return mixed
*/
public function sendMessage(mixed $message, int $workerId): mixed
{
return $this->server?->sendMessage($message, $workerId);
}
/**
* @param array $events
* @throws ReflectionException
*/
private function addTaskListener(array $events = []): void
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->getReflect(OnServerTask::class)?->newInstance();
2021-11-17 16:39:12 +08:00
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
2021-11-03 15:17:52 +08:00
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
} else {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
}
}
/**
* @param array|null $settings
2021-11-30 14:32:56 +08:00
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
2021-11-03 15:17:52 +08:00
*/
2021-11-17 16:39:12 +08:00
public function bindCallback(?array $settings = [])
2021-11-03 15:17:52 +08:00
{
if (count($settings) < 1) {
return;
}
foreach ($settings as $event_type => $callback) {
if ($this->server->getCallback($event_type) !== null) {
continue;
}
if (is_array($callback) && !is_object($callback[0])) {
$callback[0] = $this->container->get($callback[0]);
}
$this->server->on($event_type, $callback);
}
}
}