Compare commits

...

31 Commits

Author SHA1 Message Date
as2252258 14db451313 改名 2021-12-02 18:24:23 +08:00
as2252258 ced92a10e1 改名 2021-12-02 14:16:57 +08:00
as2252258 0d12baf49f 改名 2021-12-01 19:04:59 +08:00
as2252258 b798270bb7 改名 2021-12-01 17:21:15 +08:00
as2252258 955f358577 改名 2021-12-01 16:09:49 +08:00
as2252258 97a5deda23 改名 2021-12-01 15:28:59 +08:00
as2252258 8ae395e9cd 改名 2021-12-01 15:18:34 +08:00
as2252258 44a9a507f6 改名 2021-12-01 15:16:08 +08:00
as2252258 903ee0d6ce 改名 2021-11-30 16:00:39 +08:00
as2252258 a36943cd2d 改名 2021-11-30 15:10:00 +08:00
as2252258 26e0ce7778 改名 2021-11-30 14:59:51 +08:00
as2252258 5a9f9da347 改名 2021-11-30 14:43:55 +08:00
as2252258 a1bf157408 改名 2021-11-30 14:34:15 +08:00
as2252258 27834d37ed 改名 2021-11-30 14:32:56 +08:00
as2252258 c260ac6df4 1 2021-11-27 17:43:28 +08:00
as2252258 7479bed4e4 改名 2021-11-19 17:37:50 +08:00
as2252258 39394f76d5 改名 2021-11-19 14:46:07 +08:00
as2252258 065555a80e 改名 2021-11-18 18:06:45 +08:00
as2252258 13a8ca73bd 改名 2021-11-18 18:01:03 +08:00
as2252258 8456e1b7d5 改名 2021-11-18 17:05:00 +08:00
as2252258 82b5650c8a 改名 2021-11-18 16:59:45 +08:00
as2252258 03601f2db6 改名 2021-11-18 16:54:15 +08:00
as2252258 a7082b76fd 改名 2021-11-18 16:40:57 +08:00
as2252258 e872e04da1 改名 2021-11-18 15:59:46 +08:00
as2252258 370b255862 改名 2021-11-18 15:57:37 +08:00
as2252258 d4b5bed960 改名 2021-11-18 15:37:10 +08:00
as2252258 740dc7ebbd 改名 2021-11-18 11:37:12 +08:00
as2252258 adc63be3e4 改名 2021-11-17 16:39:12 +08:00
as2252258 34432225c4 改名 2021-11-17 10:45:35 +08:00
as2252258 fd854b28a9 1 2021-11-15 01:15:36 +08:00
as2252258 ba43c969cf 改名 2021-11-05 18:41:27 +08:00
33 changed files with 377 additions and 499 deletions
+1 -61
View File
@@ -4,7 +4,7 @@ namespace Server\Abstracts;
use JetBrains\PhpStorm\Pure;
use Server\SInterface\OnProcessInterface;
use Server\Contract\OnProcessInterface;
use Swoole\Coroutine;
use Swoole\Process;
@@ -80,64 +80,4 @@ abstract class BaseProcess implements OnProcessInterface
}
/**
* @return bool
*/
public function checkProcessIsStop(): bool
{
return $this->isStop === true;
}
/**
* @param Process $process
*/
public function signListen(Process $process): void
{
}
/**
*
*/
protected function exit(): void
{
putenv('process.status=idle');
}
/**
* @return bool
*/
#[Pure] public function isWorking(): bool
{
return env('process.status', 'working') == 'working';
}
/**
*
*/
private function waiteExit(Process $process): void
{
$this->onProcessStop();
while ($this->isWorking()) {
$this->sleep();
}
$process->exit(0);
}
/**
*
*/
private function sleep(): void
{
if ($this->enable_coroutine) {
Coroutine::sleep(0.1);
} else {
usleep(100);
}
}
}
-27
View File
@@ -1,27 +0,0 @@
<?php
namespace Server\Abstracts;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Psr\EventDispatcher\EventDispatcherInterface;
use ReflectionException;
use Server\ServerManager;
/**
*
*/
class OnTaskerStart extends WorkerStart implements EventDispatcherInterface
{
/**
*/
public function dispatch(object $event)
{
ServerManager::setEnv('environmental', Kiri::TASK);
}
}
-31
View File
@@ -1,31 +0,0 @@
<?php
namespace Server\Abstracts;
use Annotation\Annotation;
use Annotation\Inject;
use Exception;
use Http\Handler\Router;
use Kiri\Abstracts\Config;
use Kiri\Di\NoteManager;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Psr\EventDispatcher\EventDispatcherInterface;
use ReflectionException;
use Server\ServerManager;
class OnWorkerStart extends WorkerStart implements EventDispatcherInterface
{
/**
* @param object $event
* @return void
* @throws Exception
*/
public function dispatch(object $event)
{
ServerManager::setEnv('environmental', Kiri::WORKER);
}
}
+1 -1
View File
@@ -4,7 +4,7 @@
namespace Server\Abstracts;
use Annotation\Inject;
use Note\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
-47
View File
@@ -1,47 +0,0 @@
<?php
namespace Server\Abstracts;
use Annotation\Annotation;
use Annotation\Inject;
use Http\Handler\Router;
use Kiri\Abstracts\Config;
use Kiri\Di\NoteManager;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
class WorkerStart
{
/**
* @var Annotation
*/
#[Inject(Annotation::class)]
public Annotation $annotation;
/**
* @var Router
*/
#[Inject(Router::class)]
public Router $router;
/**
* @param $prefix
* @throws ConfigException
*/
protected function setProcessName($prefix)
{
if (Kiri::getPlatform()->isMac()) {
return;
}
$name = Config::get('id', 'system-service');
if (!empty($prefix)) {
$name .= '.' . $prefix;
}
swoole_set_process_name($name);
}
}
+1
View File
@@ -19,6 +19,7 @@ class Constant
const WORKER_EXIT = 'WorkerExit';
const CONNECT = 'Connect';
const HANDSHAKE = 'handshake';
const OPEN = 'open';
const DISCONNECT = 'disconnect';
const MESSAGE = 'message';
const RECEIVE = 'Receive';
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
interface OnBeforeShutdown
{
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Http\Request;
use Swoole\Http\Response;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
use Swoole\WebSocket\Frame;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Http\Request;
use Swoole\WebSocket\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Server\Abstracts\Server;
@@ -1,12 +1,12 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
/**
* Interface OnPipeMessageInterface
* @package Server\SInterface
* @package Server\Contract
*/
interface OnPipeMessageInterface
{
+25
View File
@@ -0,0 +1,25 @@
<?php
namespace Server\Contract;
use Swoole\Process;
/**
* Interface BaseProcess
* @package Contract
*/
interface OnProcessInterface
{
/**
* @param Process $process
*/
public function process(Process $process): void;
}
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,7 +1,7 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Server\Events;
class OnServerBeforeStart
{
}
+2 -2
View File
@@ -2,10 +2,10 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Server\Abstracts\Server;
use Exception;
use Server\SInterface\OnPipeMessageInterface;
use Server\Contract\OnPipeMessageInterface;
use Kiri\Events\EventDispatch;
/**
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Server\Abstracts\Server;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Kiri\Events\EventDispatch;
use Server\Abstracts\Server;
use Kiri\Exception\ConfigException;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Kiri\Events\EventDispatch;
use Server\Events\OnAfterReload;
use Server\Events\OnBeforeReload;
+5 -18
View File
@@ -2,12 +2,12 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Core\Help;
use Kiri\Events\EventDispatch;
use Kiri\Runtime;
use Kiri\Kiri;
use Server\Events\OnAfterWorkerStart;
use Server\Events\OnBeforeWorkerStart;
use Server\Events\OnTaskerStart as OnTaskStart;
@@ -15,7 +15,6 @@ use Server\Events\OnWorkerError;
use Server\Events\OnWorkerExit;
use Server\Events\OnWorkerStart;
use Server\Events\OnWorkerStop;
use Server\ServerManager;
use Swoole\Server;
use Swoole\Timer;
@@ -46,28 +45,16 @@ class OnServerWorker extends \Server\Abstracts\Server
if ($workerId < $server->setting['worker_num']) {
$this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId));
$this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId));
set_env('environmental', Kiri::WORKER);
} else {
$this->eventDispatch->dispatch(new OnTaskStart($server, $workerId));
$this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId));
set_env('environmental', Kiri::TASK);
}
$this->eventDispatch->dispatch(new OnAfterWorkerStart());
}
/**
* @param OnBeforeWorkerStart $worker
* @throws Exception
*/
public function setConfigure(OnBeforeWorkerStart $worker)
{
ServerManager::setEnv('worker', $worker->workerId);
$serialize = file_get_contents(storage(Runtime::CONFIG_NAME));
if (!empty($serialize)) {
Config::sets(unserialize($serialize));
}
}
/**
* @param Server $server
* @param int $workerId
@@ -87,7 +74,7 @@ class OnServerWorker extends \Server\Abstracts\Server
*/
public function onWorkerExit(Server $server, int $workerId)
{
ServerManager::setEnv('state', 'exit');
set_env('state', 'exit');
$this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId));
}
-12
View File
@@ -1,12 +0,0 @@
<?php
namespace Server\SInterface;
use Swoole\Http\Response;
interface OnDownloadInterface
{
public function dispatch(Response $response);
}
-43
View File
@@ -1,43 +0,0 @@
<?php
namespace Server\SInterface;
use Swoole\Process;
/**
* Interface BaseProcess
* @package SInterface
*/
interface OnProcessInterface
{
/**
* @param Process $process
* @return string
*/
public function getProcessName(Process $process): string;
/**
* @param Process $process
*/
public function signListen(Process $process): void;
/**
* @param Process $process
*/
public function onHandler(Process $process): void;
/**
*
*/
public function onProcessStop(): void;
}
+17 -7
View File
@@ -3,14 +3,15 @@
namespace Server;
use Annotation\Inject;
use Exception;
use Http\Handler\Abstracts\HttpService;
use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Config;
use Kiri\Error\LoggerProcess;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Note\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Server\Events\OnShutdown;
@@ -42,6 +43,13 @@ class Server extends HttpService
public EventDispatch $eventDispatch;
/**
* @var State
*/
#[Inject(State::class)]
public State $state;
/**
*
*/
@@ -60,10 +68,11 @@ class Server extends HttpService
/**
* @return string start server
*
* start server
* @return string
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws Exception
*/
public function start(): string
@@ -95,7 +104,7 @@ class Server extends HttpService
{
$configs = Config::get('server', [], true);
foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) {
$this->manager->stopServer($config['port']);
$this->state->exit($config['port']);
}
$this->eventDispatch->dispatch(new OnShutdown());
}
@@ -104,10 +113,11 @@ class Server extends HttpService
/**
* @return bool
* @throws ConfigException
* @throws Exception
*/
public function isRunner(): bool
{
return $this->manager->isRunner();
return $this->state->isRunner();
}
+26 -37
View File
@@ -4,18 +4,13 @@ declare(strict_types=1);
namespace Server;
use Annotation\Inject;
use Note\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Events\EventProvider;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Server\Abstracts\OnTaskerStart as TaskerDispatch;
use Server\Abstracts\OnWorkerStart as WorkerDispatch;
use Server\Events\OnBeforeWorkerStart;
use Server\Events\OnTaskerStart;
use Server\Events\OnWorkerStart;
use Server\Handler\OnServerWorker;
use Server\Events\OnServerBeforeStart;
use Swoole\Coroutine;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
@@ -35,10 +30,10 @@ class ServerCommand extends Command
/**
* @var EventProvider
* @var EventDispatch
*/
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
#[Inject(EventDispatch::class)]
public EventDispatch $eventProvider;
/**
@@ -61,28 +56,22 @@ class ServerCommand extends Command
*/
public function execute(InputInterface $input, OutputInterface $output): int
{
try {
$manager = Kiri::app()->getServer();
$manager->setDaemon((int)!is_null($input->getOption('daemon')));
if (is_null($input->getArgument('action'))) {
$input->setArgument('action', 'restart');
}
if (!in_array($input->getArgument('action'), self::ACTIONS)) {
throw new Exception('I don\'t know what I want to do.');
}
if ($manager->isRunner() && $input->getArgument('action') == 'start') {
throw new Exception('Service is running. Please use restart.');
}
$manager->shutdown();
if ($input->getArgument('action') == 'stop') {
throw new Exception('shutdown success');
}
$this->generate_runtime_builder($manager);
} catch (\Throwable $throwable) {
$output->write(jTraceEx($throwable));
} finally {
return 1;
$manager = Kiri::app()->getServer();
$manager->setDaemon((int)!is_null($input->getOption('daemon')));
if (is_null($input->getArgument('action'))) {
$input->setArgument('action', 'restart');
}
if (!in_array($input->getArgument('action'), self::ACTIONS)) {
throw new Exception('I don\'t know what I want to do.');
}
if ($manager->isRunner() && $input->getArgument('action') == 'start') {
throw new Exception('Service is running. Please use restart.');
}
$manager->shutdown();
if ($input->getArgument('action') == 'stop') {
return 0;
}
return $this->generate_runtime_builder($manager);
}
@@ -107,19 +96,19 @@ class ServerCommand extends Command
/**
* @param $manager
* @return int
* @throws ConfigException
* @throws Exception
*/
private function generate_runtime_builder($manager): void
private function generate_runtime_builder($manager): int
{
$this->configure_set();
Kiri::app()->getRouter()->read_files();
$this->eventProvider->on(OnBeforeWorkerStart::class, [di(OnServerWorker::class), 'setConfigure']);
$this->eventProvider->on(OnWorkerStart::class, [di(WorkerDispatch::class), 'dispatch']);
$this->eventProvider->on(OnTaskerStart::class, [di(TaskerDispatch::class), 'dispatch']);
$manager->start();
return 1;
}
}
+79 -185
View File
@@ -2,30 +2,34 @@
namespace Server;
use Annotation\Inject;
use Closure;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Di\ContainerInterface;
use Kiri\Di\Container;
use Kiri\Error\Logger;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Note\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Server\Abstracts\BaseProcess;
use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface;
use Server\Contract\OnDisconnectInterface;
use Server\Contract\OnHandshakeInterface;
use Server\Contract\OnMessageInterface;
use Server\Contract\OnPacketInterface;
use Server\Contract\OnProcessInterface;
use Server\Contract\OnReceiveInterface;
use Server\Events\OnServerBeforeStart;
use Server\Handler\OnPipeMessage;
use Server\Handler\OnServer;
use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
use Server\Handler\OnServerTask;
use Server\Handler\OnServerWorker;
use Server\SInterface\OnCloseInterface;
use Server\SInterface\OnConnectInterface;
use Server\SInterface\OnDisconnectInterface;
use Server\SInterface\OnHandshakeInterface;
use Server\SInterface\OnMessageInterface;
use Server\SInterface\OnPacketInterface;
use Server\SInterface\OnProcessInterface;
use Server\SInterface\OnReceiveInterface;
use Server\SInterface\OnTaskInterface;
use Server\Tasker\OnServerTask;
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
@@ -40,12 +44,29 @@ use Swoole\WebSocket\Server as WServer;
class ServerManager
{
use TraitServer;
/** @var string */
public string $host = '';
public int $port = 0;
/**
* @var Logger
*/
#[Inject(Logger::class)]
public Logger $logger;
/**
* @var State
*/
#[Inject(State::class)]
public State $state;
/** @var array<string,Port> */
public array $ports = [];
@@ -56,7 +77,7 @@ class ServerManager
/**
* @var ContainerInterface
* @var Container
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
@@ -90,9 +111,11 @@ class ServerManager
/**
* @return Server|WServer|HServer|null
* @throws ReflectionException
*/
public function getServer(): Server|WServer|HServer|null
{
di(EventDispatch::class)->dispatch(new OnServerBeforeStart());
return $this->server;
}
@@ -103,13 +126,13 @@ class ServerManager
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws Exception
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
if ($this->checkPortIsAlready($port)) $this->stopServer($port);
if (!$this->server) {
$this->createBaseServer($type, $host, $port, $mode, $settings);
} else {
@@ -122,8 +145,12 @@ class ServerManager
/**
* @throws ReflectionException
* @param $configs
* @param int $daemon
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function initBaseServer($configs, int $daemon = 0): void
{
@@ -131,25 +158,7 @@ class ServerManager
foreach ($this->sortService($configs['ports']) as $config) {
$this->startListenerHandler($context, $config, $daemon);
}
$this->bindCallback($this->server, [Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
// $this->bindCallback($this->server, $this->getSystemEvents($configs));
}
/**
* @return bool
* @throws ConfigException
* @throws Exception
*/
public function isRunner(): bool
{
$configs = Config::get('server', [], true);
foreach ($this->sortService($configs['ports']) as $config) {
if ($this->checkPortIsAlready($config['port'])) {
return true;
}
}
return false;
$this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
}
@@ -162,20 +171,15 @@ class ServerManager
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
}
$process = new Process(function (Process $soloProcess) use ($customProcess) {
$system = sprintf('[%s].process[%d]', Config::get('id', 'system-service'), $soloProcess->pid);
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
$process = new Process(function (Process $process) use ($customProcess, $system) {
if (Kiri::getPlatform()->isLinux()) {
$soloProcess->name($system . '.' . $customProcess->getName() . ' start.');
$process->name($system . '(' . $customProcess->getName() . ')');
}
echo "\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m " . $system . $customProcess->getName() . ' start.' . PHP_EOL;
$customProcess->signListen($soloProcess);
$customProcess->onHandler($soloProcess);
},
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
// $this->container->setBindings($customProcess::class, $process);
$customProcess->process($process);
}, $customProcess->getRedirectStdinAndStdout(), $customProcess->getPipeType(), $customProcess->isEnableCoroutine());
$this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
$this->container->setBindings($customProcess->getName(), $process);
$this->server->addProcess($process);
}
@@ -189,45 +193,13 @@ class ServerManager
}
/**
* @param array $ports
* @return array
*/
public function sortService(array $ports): array
{
$array = [];
foreach ($ports as $port) {
if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
array_unshift($array, $port);
} else if ($port['type'] == Constant::SERVER_TYPE_HTTP) {
if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$array[] = $port;
} else {
array_unshift($array, $port);
}
} else {
$array[] = $port;
}
}
return $array;
}
/**
* @param string $key
* @param string|int $value
*/
public static function setEnv(string $key, string|int $value): void
{
putenv(sprintf('%s=%s', $key, (string)$value));
}
/**
* @param ServerManager $context
* @param array $config
* @param int $daemon
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
@@ -256,6 +228,7 @@ class ServerManager
$config['settings']['log_file'] = storage('system.log');
}
$config['settings']['pid_file'] = storage('.swoole.pid');
$config['settings'][Constant::OPTION_ENABLE_REUSE_PORT] = true;
$config['events'] = $config['events'] ?? [];
return $config;
}
@@ -267,13 +240,17 @@ class ServerManager
* @param int $port
* @param int $mode
* @param array $settings
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
$id = Config::get('id', 'system-service');
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL;
$this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port));
/** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode);
if ($this->ports[$port] === false) {
@@ -293,30 +270,17 @@ class ServerManager
}
/**
* @param int $port
* @param string $event
* @return Closure|array|null
*/
public function getPortCallback(int $port, string $event): Closure|array|null
{
/** @var Server\Port $_port */
$_port = $this->ports[$port] ?? null;
if (is_null($_port)) {
return null;
}
return $_port->getCallback($event);
}
/**
* @param string $type
* @param string $host
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
@@ -331,59 +295,17 @@ class ServerManager
$id = Config::get('id', 'system-service');
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL;
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port));
$this->addDefaultListener($settings);
}
/**
* @param int $port
* @throws Exception
*/
public function stopServer(int $port)
{
if (!($pid = $this->checkPortIsAlready($port))) {
return;
}
while ($this->checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
/**
* @param $port
* @return bool|string
* @throws Exception
*/
private function checkPortIsAlready($port): bool|string
{
if (!Kiri::getPlatform()->isLinux()) {
exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output);
if (empty($output)) return false;
$output = explode(PHP_EOL, $output[0]);
return $output[0];
}
$serverPid = file_get_contents(storage('.swoole.pid'));
if (!empty($serverPid) && shell_exec('ps -ef | grep ' . $serverPid . ' | grep -v grep')) {
Process::kill($serverPid, SIGTERM);
}
exec('netstat -lnp | grep ' . $port . ' | grep "LISTEN" | awk \'{print $7}\'', $output);
if (empty($output)) {
return false;
}
return explode('/', $output[0])[0];
}
/**
* @param array $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
private function addDefaultListener(array $settings): void
{
@@ -401,6 +323,9 @@ class ServerManager
/**
* @param array $events
* @param Server|Port $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
private function addServiceEvents(array $events, Server|Port $server)
{
@@ -422,40 +347,6 @@ class ServerManager
}
/**
* @param string $class
* @return object
*/
private function getNewInstance(string $class): object
{
return $this->container->create($class);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws ReflectionException
* @throws Exception
*/
public function task(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerId = random_int($this->server->setting['worker_num'] + 1,
$this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']);
}
if (is_string($handler)) {
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
$handler = $implements->newInstanceArgs($params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param mixed $message
* @param int $workerId
@@ -469,28 +360,31 @@ class ServerManager
/**
* @param array $events
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
private function addTaskListener(array $events = []): void
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->getReflect(OnServerTask::class)?->newInstance();
$reflect = $this->container->get(OnServerTask::class);
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
} else {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
}
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
}
/**
* @param Port|Server $server
* @param array|null $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function bindCallback(Port|Server $server, ?array $settings = [])
public function bindCallback(?array $settings = [])
{
// TODO: Implement bindCallback() method.
if (count($settings) < 1) {
return;
}
+56
View File
@@ -0,0 +1,56 @@
<?php
namespace Server;
use Exception;
use Kiri\Abstracts\BaseObject;
use Kiri\Abstracts\Config;
use Swoole\Process;
class State extends BaseObject
{
use TraitServer;
public array $servers = [];
public function init()
{
$this->servers = Config::get('server.ports');
}
/**
* @return bool
* @throws Exception
*/
public function isRunner(): bool
{
$ports = $this->sortService($this->servers);
foreach ($ports as $config) {
if (checkPortIsAlready($config['port'])) {
return true;
}
}
return false;
}
/**
* @param $port
* @throws Exception
*/
public function exit($port)
{
if (!($pid = checkPortIsAlready($port))) {
return;
}
while (checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
}
+102
View File
@@ -0,0 +1,102 @@
<?php
namespace Server\Tasker;
use Exception;
use Kiri\Abstracts\BaseObject;
use Kiri\Core\HashMap;
use Kiri\Di\Container;
use Kiri\Kiri;
use Note\Inject;
use Psr\Container\ContainerInterface;
use ReflectionException;
use Server\Contract\OnTaskInterface;
use Server\SwooleServerInterface;
/**
*
*/
class AsyncTaskExecute extends BaseObject
{
/**
* @var SwooleServerInterface|null
*/
public ?SwooleServerInterface $server = null;
/**
* @var Container
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
private HashMap $hashMap;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
}
/**
* @param string $key
* @param $handler
*/
public function reg(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if (!$this->server) {
$this->server = Kiri::getDi()->get(SwooleServerInterface::class);
}
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerNum = $this->server->setting['worker_num'];
$taskerNum = $workerNum + $this->server->setting['task_worker_num'];
$workerId = random_int($workerNum, $taskerNum - 1);
}
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param $handler
* @param $params
* @return object
* @throws ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
@@ -1,15 +1,14 @@
<?php
namespace Server\Handler;
namespace Server\Tasker;
use Annotation\Inject;
use Note\Inject;
use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use ReflectionException;
use Server\SInterface\OnTaskInterface;
use Server\Contract\OnTaskInterface;
use Swoole\Server;
@@ -68,19 +67,14 @@ class OnServerTask
/**
* @param $data
* @return null
* @throws ReflectionException
*/
private function resolve($data)
{
[$class, $params] = json_encode($data, true);
$reflect = Kiri::getDi()->getReflect($class);
if (!$reflect->isInstantiable()) {
return null;
$execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
return $execute->execute();
}
$class = $reflect->newInstanceArgs($params);
return $class->execute();
return null;
}
+32
View File
@@ -0,0 +1,32 @@
<?php
namespace Server;
trait TraitServer
{
/**
* @param array $ports
* @return array
*/
public function sortService(array $ports): array
{
$array = [];
foreach ($ports as $port) {
if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
array_unshift($array, $port);
} else if ($port['type'] == Constant::SERVER_TYPE_HTTP) {
if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$array[] = $port;
} else {
array_unshift($array, $port);
}
} else {
$array[] = $port;
}
}
return $array;
}
}