Files
kiri-core/http-server/ServerManager.php
T

473 lines
12 KiB
PHP
Raw Normal View History

2021-07-19 19:04:13 +08:00
<?php
namespace Server;
use Closure;
2021-07-21 11:25:31 +08:00
use Exception;
2021-08-12 12:40:06 +08:00
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Exception\NotFindClassException;
use Kiri\Kiri;
2021-07-19 19:04:13 +08:00
use ReflectionException;
2021-08-05 16:56:42 +08:00
use Server\Manager\OnPipeMessage;
2021-07-19 19:04:13 +08:00
use Server\SInterface\CustomProcess;
2021-07-21 11:25:31 +08:00
use Server\SInterface\TaskExecute;
2021-08-05 16:15:42 +08:00
use Server\Task\OnServerTask;
2021-07-19 19:04:13 +08:00
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
2021-08-03 14:13:39 +08:00
use Swoole\Server\Port;
2021-07-19 19:04:13 +08:00
use Swoole\WebSocket\Server as WServer;
/**
2021-08-05 16:15:42 +08:00
* Class OnServerManager
2021-08-17 16:43:50 +08:00
* @package Http\Service
2021-07-19 19:04:13 +08:00
*/
2021-08-12 14:00:44 +08:00
class ServerManager
2021-07-19 19:04:13 +08:00
{
2021-08-06 13:49:28 +08:00
/** @var string */
2021-08-01 11:18:09 +08:00
public string $host = '';
public int $port = 0;
2021-08-12 12:51:53 +08:00
/** @var array<string,Port> */
2021-08-01 11:18:09 +08:00
public array $ports = [];
public int $mode = SWOOLE_TCP;
2021-08-12 14:00:44 +08:00
private mixed $server = null;
2021-08-01 11:18:09 +08:00
/**
* @return Server|WServer|HServer|null
*/
public function getServer(): Server|WServer|HServer|null
{
return $this->server;
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws Exception
*/
public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
2021-08-17 18:35:11 +08:00
if ($this->checkPortIsAlready($port)) $this->stopServer($port);
2021-08-01 11:18:09 +08:00
if (!$this->server) {
$this->createBaseServer($type, $host, $port, $mode, $settings);
} else {
if (!isset($settings['settings'])) {
$settings['settings'] = [];
}
$this->addNewListener($type, $host, $port, $mode, $settings);
}
}
/**
* @throws NotFindClassException
* @throws ReflectionException
2021-08-17 18:35:11 +08:00
* @throws ConfigException
2021-08-01 11:18:09 +08:00
*/
2021-08-17 16:38:58 +08:00
public function initBaseServer($configs, int $daemon = 0): void
2021-08-01 11:18:09 +08:00
{
2021-08-18 11:56:19 +08:00
$context = di(ServerManager::class);
2021-08-04 16:54:37 +08:00
foreach ($this->sortService($configs['ports']) as $config) {
2021-08-17 16:38:58 +08:00
$this->startListenerHandler($context, $config, $daemon);
2021-08-01 11:18:09 +08:00
}
2021-08-05 16:56:42 +08:00
$this->bindCallback($this->server, [Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
2021-08-04 16:54:37 +08:00
$this->bindCallback($this->server, $this->getSystemEvents($configs));
2021-08-01 11:18:09 +08:00
}
/**
* @return bool
2021-08-17 18:35:11 +08:00
* @throws ConfigException
2021-08-19 17:41:10 +08:00
* @throws Exception
2021-08-01 11:18:09 +08:00
*/
2021-08-17 18:35:11 +08:00
public function isRunner(): bool
2021-08-01 11:18:09 +08:00
{
2021-08-17 18:35:11 +08:00
$configs = Config::get('server', [], true);
foreach ($this->sortService($configs['ports']) as $config) {
if ($this->checkPortIsAlready($config['port'])) {
return true;
}
2021-08-01 11:18:09 +08:00
}
2021-08-17 18:35:11 +08:00
return false;
2021-08-01 11:18:09 +08:00
}
/**
* @param string|CustomProcess $customProcess
* @param null $redirect_stdin_and_stdout
* @param int|null $pipe_type
* @param bool $enable_coroutine
* @throws Exception
*/
public function addProcess(string|CustomProcess $customProcess, $redirect_stdin_and_stdout = null, ?int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = true)
{
2021-08-19 18:10:15 +08:00
$process = $this->initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine);
2021-08-19 17:41:10 +08:00
$this->server->addProcess($process);
2021-08-19 18:28:21 +08:00
if ($customProcess instanceof CustomProcess) {
Kiri::app()->addProcess($customProcess::class, $process);
} else {
Kiri::app()->addProcess($customProcess, $process);
}
2021-08-19 17:41:10 +08:00
}
/**
* @param $customProcess
2021-08-19 18:10:15 +08:00
* @param $redirect_stdin_and_stdout
* @param $pipe_type
* @param $enable_coroutine
* @return Process
2021-08-19 17:41:10 +08:00
*/
2021-08-19 18:10:15 +08:00
private function initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine): Process
2021-08-19 17:41:10 +08:00
{
2021-08-19 18:10:15 +08:00
$server = $this->server;
2021-08-20 10:16:13 +08:00
return new Process(function (Process $soloProcess) use ($customProcess, $server) {
if (is_string($customProcess)) {
$customProcess = Kiri::createObject($customProcess, [$server]);
}
$name = $customProcess->getProcessName($soloProcess);
2021-08-20 10:19:05 +08:00
info(sprintf("Process %s start.", $name));
2021-08-20 10:16:13 +08:00
2021-08-20 13:59:15 +08:00
scan_directory(directory('app'), 'App');
2021-08-20 10:16:13 +08:00
$system = sprintf('%s.process[%d]', Config::get('id', 'system-service'), $soloProcess->pid);
if (Kiri::getPlatform()->isLinux()) {
$soloProcess->name($system . '.' . $name . ' start.');
}
$customProcess->signListen($soloProcess);
$customProcess->onHandler($soloProcess);
},
2021-08-19 18:10:15 +08:00
$redirect_stdin_and_stdout,
$pipe_type,
$enable_coroutine
);
2021-08-01 11:18:09 +08:00
}
/**
* @param array $ports
* @return array
*/
2021-08-17 18:45:22 +08:00
public function sortService(array $ports): array
2021-08-01 11:18:09 +08:00
{
$array = [];
foreach ($ports as $port) {
if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
array_unshift($array, $port);
} else if ($port['type'] == Constant::SERVER_TYPE_HTTP) {
if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$array[] = $port;
} else {
array_unshift($array, $port);
}
} else {
$array[] = $port;
}
}
return $array;
}
/**
* @param array $configs
* @return array
*/
private function getSystemEvents(array $configs): array
{
return array_intersect_key($configs['events'] ?? [], [
Constant::SHUTDOWN => '',
Constant::WORKER_START => '',
Constant::WORKER_ERROR => '',
Constant::WORKER_EXIT => '',
Constant::WORKER_STOP => '',
Constant::MANAGER_START => '',
Constant::MANAGER_STOP => '',
Constant::BEFORE_RELOAD => '',
Constant::AFTER_RELOAD => '',
Constant::START => '',
]);
}
/**
* @param ServerManager $context
* @param array $config
2021-08-17 16:38:58 +08:00
* @param int $daemon
* @throws ConfigException
2021-08-01 11:18:09 +08:00
* @throws ReflectionException
* @throws Exception
*/
2021-08-17 16:38:58 +08:00
private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0)
2021-08-01 11:18:09 +08:00
{
2021-08-04 16:54:37 +08:00
if (!$this->server) {
2021-08-17 16:38:58 +08:00
$config = $this->mergeConfig($config, $daemon);
}
$context->addListener(
$config['type'], $config['host'], $config['port'], $config['mode'],
$config);
}
/**
* @param $config
* @param $daemon
* @return array
* @throws Exception
*/
private function mergeConfig($config, $daemon): array
{
$config['settings'] = $config['settings'] ?? [];
if (!isset($config['settings']['daemonize']) || !$config['settings']['daemonize'] != $daemon) {
$config['settings']['daemonize'] = $daemon;
}
if (!isset($config['settings']['log_file'])) {
$config['settings']['log_file'] = storage('system.log');
}
2021-08-18 14:06:44 +08:00
$config['settings']['pid_file'] = storage('.swoole.pid');
2021-08-17 16:38:58 +08:00
$config['events'] = $config['events'] ?? [];
return $config;
2021-08-01 11:18:09 +08:00
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
2021-08-03 14:06:47 +08:00
* @throws Exception
2021-08-01 11:18:09 +08:00
*/
private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m $type service %s::%d start.", $host, $port) . PHP_EOL;
2021-08-03 14:06:47 +08:00
/** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode);
2021-08-04 17:00:49 +08:00
if ($this->ports[$port] === false) {
throw new Exception("The port is already in use[$host::$port]");
}
2021-08-03 14:06:47 +08:00
$this->ports[$port]->set($settings['settings'] ?? []);
2021-08-12 12:40:06 +08:00
$this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]);
2021-08-01 11:18:09 +08:00
}
/**
* @param int $port
* @param string $event
* @return Closure|array|null
*/
public function getPortCallback(int $port, string $event): Closure|array|null
{
/** @var Server\Port $_port */
$_port = $this->ports[$port] ?? null;
if (is_null($_port)) {
return null;
}
return $_port->getCallback($event);
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
$match = match ($type) {
Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP,
Constant::SERVER_TYPE_UDP => Server::class,
Constant::SERVER_TYPE_HTTP => HServer::class,
Constant::SERVER_TYPE_WEBSOCKET => WServer::class
};
$this->server = new $match($host, $port, SWOOLE_PROCESS, $mode);
$this->server->set(array_merge(Config::get('server.settings', []), $settings['settings']));
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m $type service %s::%d start.", $host, $port) . PHP_EOL;
$this->addDefaultListener($type, $settings);
}
/**
* @param int $port
2021-08-18 14:06:44 +08:00
* @throws Exception
2021-08-01 11:18:09 +08:00
*/
public function stopServer(int $port)
{
2021-08-17 18:35:11 +08:00
if (!($pid = $this->checkPortIsAlready($port))) {
2021-08-01 11:18:09 +08:00
return;
}
2021-08-17 18:35:11 +08:00
while ($this->checkPortIsAlready($port)) {
2021-08-18 13:48:34 +08:00
Process::kill($pid, SIGTERM);
2021-08-06 17:12:36 +08:00
usleep(300);
2021-08-01 11:18:09 +08:00
}
}
/**
* @param $port
* @return bool|string
2021-08-18 14:05:04 +08:00
* @throws Exception
2021-08-01 11:18:09 +08:00
*/
2021-08-17 18:35:11 +08:00
private function checkPortIsAlready($port): bool|string
2021-08-01 11:18:09 +08:00
{
2021-08-12 15:39:33 +08:00
if (!Kiri::getPlatform()->isLinux()) {
2021-08-12 15:41:50 +08:00
exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output);
2021-08-12 15:39:33 +08:00
if (empty($output)) return false;
$output = explode(PHP_EOL, $output[0]);
return $output[0];
}
2021-08-18 14:05:04 +08:00
2021-08-18 14:06:44 +08:00
$serverPid = file_get_contents(storage('.swoole.pid'));
2021-08-20 10:40:37 +08:00
if (!empty($serverPid) && shell_exec('ps -ef | grep ' . $serverPid . ' | grep -v grep')) {
2021-08-18 14:05:04 +08:00
Process::kill($serverPid, SIGTERM);
}
2021-08-01 11:18:09 +08:00
exec('netstat -lnp | grep ' . $port . ' | grep "LISTEN" | awk \'{print $7}\'', $output);
if (empty($output)) {
return false;
}
return explode('/', $output[0])[0];
}
/**
* @param string $type
* @param array $settings
* @throws ReflectionException
2021-08-03 14:06:47 +08:00
* @throws Exception
2021-08-01 11:18:09 +08:00
*/
private function addDefaultListener(string $type, array $settings): void
{
if (($this->server->setting['task_worker_num'] ?? 0) > 0) {
$this->addTaskListener($settings['events']);
}
2021-08-12 12:40:06 +08:00
$this->addServiceEvents($settings['events'] ?? [], $this->server);
2021-08-18 13:48:34 +08:00
Kiri::getDi()->setBindings(SwooleServerInterface::class,
$this->server);
2021-08-12 12:40:06 +08:00
}
/**
* @param array $events
2021-08-12 12:51:21 +08:00
* @param Server|Port $server
2021-08-12 12:40:06 +08:00
* @throws ReflectionException
*/
2021-08-12 12:51:21 +08:00
private function addServiceEvents(array $events, Server|Port $server)
2021-08-12 12:40:06 +08:00
{
foreach ($events as $name => $event) {
2021-08-12 12:46:12 +08:00
if (is_array($event) && is_string($event[0])) {
2021-08-12 12:50:42 +08:00
$event[0] = Kiri::getDi()->get($event[0], [$server]);
2021-08-12 12:40:06 +08:00
}
$server->on($name, $event);
2021-08-03 10:47:29 +08:00
}
2021-08-01 11:18:09 +08:00
}
/**
*
*/
public function start()
{
$this->server->start();
}
/**
* @param string $class
* @return object
2021-08-03 18:30:49 +08:00
* @throws NotFindClassException
2021-08-01 11:18:09 +08:00
* @throws ReflectionException
*/
private function getNewInstance(string $class): object
{
2021-08-11 01:04:57 +08:00
return Kiri::getDi()->newObject($class);
2021-08-01 11:18:09 +08:00
}
/**
* @param TaskExecute|string $handler
* @param array $params
2021-08-13 14:37:46 +08:00
* @param int|null $workerId
* @throws ReflectionException
2021-08-01 11:18:09 +08:00
* @throws Exception
*/
2021-08-13 14:37:46 +08:00
public function task(TaskExecute|string $handler, array $params = [], int $workerId = null)
2021-08-01 11:18:09 +08:00
{
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerId = random_int($this->server->setting['worker_num'] + 1,
$this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']);
}
if (is_string($handler)) {
2021-08-11 01:04:57 +08:00
$implements = Kiri::getDi()->getReflect($handler);
2021-08-01 11:18:09 +08:00
if (!in_array(TaskExecute::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . TaskExecute::class);
}
$handler = $implements->newInstanceArgs($params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param array $events
* @throws ReflectionException
*/
private function addTaskListener(array $events = []): void
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
2021-08-11 01:04:57 +08:00
$reflect = Kiri::getDi()->getReflect(OnServerTask::class)?->newInstance();
2021-08-01 11:18:09 +08:00
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
} else {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
}
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
}
2021-08-03 14:13:39 +08:00
/**
* @param Port|Server $server
* @param array|null $settings
* @throws ReflectionException
*/
public function bindCallback(Port|Server $server, ?array $settings = [])
{
// TODO: Implement bindCallback() method.
if (count($settings) < 1) {
return;
}
foreach ($settings as $event_type => $callback) {
if ($this->server->getCallback($event_type) !== null) {
continue;
}
if (is_array($callback) && !is_object($callback[0])) {
2021-08-11 01:04:57 +08:00
$callback[0] = Kiri::getDi()->get($callback[0]);
2021-08-03 14:13:39 +08:00
}
$this->server->on($event_type, $callback);
}
}
2021-07-19 19:04:13 +08:00
}