Compare commits

...

5 Commits

Author SHA1 Message Date
as2252258 740dc7ebbd 改名 2021-11-18 11:37:12 +08:00
as2252258 adc63be3e4 改名 2021-11-17 16:39:12 +08:00
as2252258 34432225c4 改名 2021-11-17 10:45:35 +08:00
as2252258 fd854b28a9 1 2021-11-15 01:15:36 +08:00
as2252258 ba43c969cf 改名 2021-11-05 18:41:27 +08:00
24 changed files with 105 additions and 260 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ namespace Server\Abstracts;
use JetBrains\PhpStorm\Pure;
use Server\SInterface\OnProcessInterface;
use Server\Contract\OnProcessInterface;
use Swoole\Coroutine;
use Swoole\Process;
-27
View File
@@ -1,27 +0,0 @@
<?php
namespace Server\Abstracts;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Psr\EventDispatcher\EventDispatcherInterface;
use ReflectionException;
use Server\ServerManager;
/**
*
*/
class OnTaskerStart extends WorkerStart implements EventDispatcherInterface
{
/**
*/
public function dispatch(object $event)
{
ServerManager::setEnv('environmental', Kiri::TASK);
}
}
-31
View File
@@ -1,31 +0,0 @@
<?php
namespace Server\Abstracts;
use Annotation\Annotation;
use Annotation\Inject;
use Exception;
use Http\Handler\Router;
use Kiri\Abstracts\Config;
use Kiri\Di\NoteManager;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Psr\EventDispatcher\EventDispatcherInterface;
use ReflectionException;
use Server\ServerManager;
class OnWorkerStart extends WorkerStart implements EventDispatcherInterface
{
/**
* @param object $event
* @return void
* @throws Exception
*/
public function dispatch(object $event)
{
ServerManager::setEnv('environmental', Kiri::WORKER);
}
}
-47
View File
@@ -1,47 +0,0 @@
<?php
namespace Server\Abstracts;
use Annotation\Annotation;
use Annotation\Inject;
use Http\Handler\Router;
use Kiri\Abstracts\Config;
use Kiri\Di\NoteManager;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
class WorkerStart
{
/**
* @var Annotation
*/
#[Inject(Annotation::class)]
public Annotation $annotation;
/**
* @var Router
*/
#[Inject(Router::class)]
public Router $router;
/**
* @param $prefix
* @throws ConfigException
*/
protected function setProcessName($prefix)
{
if (Kiri::getPlatform()->isMac()) {
return;
}
$name = Config::get('id', 'system-service');
if (!empty($prefix)) {
$name .= '.' . $prefix;
}
swoole_set_process_name($name);
}
}
+1
View File
@@ -19,6 +19,7 @@ class Constant
const WORKER_EXIT = 'WorkerExit';
const CONNECT = 'Connect';
const HANDSHAKE = 'handshake';
const OPEN = 'open';
const DISCONNECT = 'disconnect';
const MESSAGE = 'message';
const RECEIVE = 'Receive';
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
interface OnBeforeShutdown
{
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Http\Response;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Http\Request;
use Swoole\Http\Response;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
use Swoole\WebSocket\Frame;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Http\Request;
use Swoole\WebSocket\Server;
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Server\Abstracts\Server;
@@ -1,12 +1,12 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
/**
* Interface OnPipeMessageInterface
* @package Server\SInterface
* @package Server\Contract
*/
interface OnPipeMessageInterface
{
@@ -1,7 +1,7 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Process;
@@ -9,7 +9,7 @@ use Swoole\Process;
/**
* Interface BaseProcess
* @package SInterface
* @package Contract
*/
interface OnProcessInterface
{
@@ -1,6 +1,6 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
@@ -1,7 +1,7 @@
<?php
namespace Server\SInterface;
namespace Server\Contract;
use Swoole\Server;
+1 -1
View File
@@ -5,7 +5,7 @@ namespace Server\Handler;
use Annotation\Inject;
use Server\Abstracts\Server;
use Exception;
use Server\SInterface\OnPipeMessageInterface;
use Server\Contract\OnPipeMessageInterface;
use Kiri\Events\EventDispatch;
/**
+1 -1
View File
@@ -9,7 +9,7 @@ use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use ReflectionException;
use Server\SInterface\OnTaskInterface;
use Server\Contract\OnTaskInterface;
use Swoole\Server;
+4 -15
View File
@@ -7,6 +7,7 @@ use Exception;
use Kiri\Abstracts\Config;
use Kiri\Core\Help;
use Kiri\Events\EventDispatch;
use Kiri\Kiri;
use Kiri\Runtime;
use Server\Events\OnAfterWorkerStart;
use Server\Events\OnBeforeWorkerStart;
@@ -46,28 +47,16 @@ class OnServerWorker extends \Server\Abstracts\Server
if ($workerId < $server->setting['worker_num']) {
$this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId));
$this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId));
set_env('environmental', Kiri::WORKER);
} else {
$this->eventDispatch->dispatch(new OnTaskStart($server, $workerId));
$this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId));
set_env('environmental', Kiri::TASK);
}
$this->eventDispatch->dispatch(new OnAfterWorkerStart());
}
/**
* @param OnBeforeWorkerStart $worker
* @throws Exception
*/
public function setConfigure(OnBeforeWorkerStart $worker)
{
ServerManager::setEnv('worker', $worker->workerId);
$serialize = file_get_contents(storage(Runtime::CONFIG_NAME));
if (!empty($serialize)) {
Config::sets(unserialize($serialize));
}
}
/**
* @param Server $server
* @param int $workerId
@@ -87,7 +76,7 @@ class OnServerWorker extends \Server\Abstracts\Server
*/
public function onWorkerExit(Server $server, int $workerId)
{
ServerManager::setEnv('state', 'exit');
set_env('state', 'exit');
$this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId));
}
+19 -31
View File
@@ -10,12 +10,6 @@ use Kiri\Abstracts\Config;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Server\Abstracts\OnTaskerStart as TaskerDispatch;
use Server\Abstracts\OnWorkerStart as WorkerDispatch;
use Server\Events\OnBeforeWorkerStart;
use Server\Events\OnTaskerStart;
use Server\Events\OnWorkerStart;
use Server\Handler\OnServerWorker;
use Swoole\Coroutine;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
@@ -61,28 +55,22 @@ class ServerCommand extends Command
*/
public function execute(InputInterface $input, OutputInterface $output): int
{
try {
$manager = Kiri::app()->getServer();
$manager->setDaemon((int)!is_null($input->getOption('daemon')));
if (is_null($input->getArgument('action'))) {
$input->setArgument('action', 'restart');
}
if (!in_array($input->getArgument('action'), self::ACTIONS)) {
throw new Exception('I don\'t know what I want to do.');
}
if ($manager->isRunner() && $input->getArgument('action') == 'start') {
throw new Exception('Service is running. Please use restart.');
}
$manager->shutdown();
if ($input->getArgument('action') == 'stop') {
throw new Exception('shutdown success');
}
$this->generate_runtime_builder($manager);
} catch (\Throwable $throwable) {
$output->write(jTraceEx($throwable));
} finally {
return 1;
$manager = Kiri::app()->getServer();
$manager->setDaemon((int)!is_null($input->getOption('daemon')));
if (is_null($input->getArgument('action'))) {
$input->setArgument('action', 'restart');
}
if (!in_array($input->getArgument('action'), self::ACTIONS)) {
throw new Exception('I don\'t know what I want to do.');
}
if ($manager->isRunner() && $input->getArgument('action') == 'start') {
throw new Exception('Service is running. Please use restart.');
}
$manager->shutdown();
if ($input->getArgument('action') == 'stop') {
throw new Exception('shutdown success');
}
return $this->generate_runtime_builder($manager);
}
@@ -107,19 +95,19 @@ class ServerCommand extends Command
/**
* @param $manager
* @return int
* @throws ConfigException
* @throws Exception
*/
private function generate_runtime_builder($manager): void
private function generate_runtime_builder($manager): int
{
$this->configure_set();
Kiri::app()->getRouter()->read_files();
$this->eventProvider->on(OnBeforeWorkerStart::class, [di(OnServerWorker::class), 'setConfigure']);
$this->eventProvider->on(OnWorkerStart::class, [di(WorkerDispatch::class), 'dispatch']);
$this->eventProvider->on(OnTaskerStart::class, [di(TaskerDispatch::class), 'dispatch']);
$manager->start();
return 1;
}
}
+31 -91
View File
@@ -7,6 +7,7 @@ use Closure;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Di\ContainerInterface;
use Kiri\Error\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use ReflectionException;
@@ -17,15 +18,15 @@ use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
use Server\Handler\OnServerTask;
use Server\Handler\OnServerWorker;
use Server\SInterface\OnCloseInterface;
use Server\SInterface\OnConnectInterface;
use Server\SInterface\OnDisconnectInterface;
use Server\SInterface\OnHandshakeInterface;
use Server\SInterface\OnMessageInterface;
use Server\SInterface\OnPacketInterface;
use Server\SInterface\OnProcessInterface;
use Server\SInterface\OnReceiveInterface;
use Server\SInterface\OnTaskInterface;
use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface;
use Server\Contract\OnDisconnectInterface;
use Server\Contract\OnHandshakeInterface;
use Server\Contract\OnMessageInterface;
use Server\Contract\OnPacketInterface;
use Server\Contract\OnProcessInterface;
use Server\Contract\OnReceiveInterface;
use Server\Contract\OnTaskInterface;
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
@@ -40,12 +41,19 @@ use Swoole\WebSocket\Server as WServer;
class ServerManager
{
use TraitServer;
/** @var string */
public string $host = '';
public int $port = 0;
#[Inject(Logger::class)]
public Logger $logger;
/** @var array<string,Port> */
public array $ports = [];
@@ -109,7 +117,6 @@ class ServerManager
*/
public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
if ($this->checkPortIsAlready($port)) $this->stopServer($port);
if (!$this->server) {
$this->createBaseServer($type, $host, $port, $mode, $settings);
} else {
@@ -131,8 +138,7 @@ class ServerManager
foreach ($this->sortService($configs['ports']) as $config) {
$this->startListenerHandler($context, $config, $daemon);
}
$this->bindCallback($this->server, [Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
// $this->bindCallback($this->server, $this->getSystemEvents($configs));
$this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
}
@@ -145,7 +151,7 @@ class ServerManager
{
$configs = Config::get('server', [], true);
foreach ($this->sortService($configs['ports']) as $config) {
if ($this->checkPortIsAlready($config['port'])) {
if (checkPortIsAlready($config['port'])) {
return true;
}
}
@@ -163,11 +169,11 @@ class ServerManager
$customProcess = Kiri::getDi()->get($customProcess);
}
$process = new Process(function (Process $soloProcess) use ($customProcess) {
$system = sprintf('[%s].process[%d]', Config::get('id', 'system-service'), $soloProcess->pid);
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
if (Kiri::getPlatform()->isLinux()) {
$soloProcess->name($system . '.' . $customProcess->getName() . ' start.');
$soloProcess->name($system . '(' . $customProcess->getName() . ')');
}
echo "\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m " . $system . $customProcess->getName() . ' start.' . PHP_EOL;
$this->logger->debug($system . $customProcess->getName() . ' start.');
$customProcess->signListen($soloProcess);
$customProcess->onHandler($soloProcess);
},
@@ -175,7 +181,7 @@ class ServerManager
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
// $this->container->setBindings($customProcess::class, $process);
$this->container->setBindings($customProcess->getName(), $process);
$this->server->addProcess($process);
}
@@ -189,30 +195,6 @@ class ServerManager
}
/**
* @param array $ports
* @return array
*/
public function sortService(array $ports): array
{
$array = [];
foreach ($ports as $port) {
if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
array_unshift($array, $port);
} else if ($port['type'] == Constant::SERVER_TYPE_HTTP) {
if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$array[] = $port;
} else {
array_unshift($array, $port);
}
} else {
$array[] = $port;
}
}
return $array;
}
/**
* @param string $key
* @param string|int $value
@@ -273,7 +255,8 @@ class ServerManager
{
$id = Config::get('id', 'system-service');
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL;
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port));
/** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode);
if ($this->ports[$port] === false) {
@@ -293,22 +276,6 @@ class ServerManager
}
/**
* @param int $port
* @param string $event
* @return Closure|array|null
*/
public function getPortCallback(int $port, string $event): Closure|array|null
{
/** @var Server\Port $_port */
$_port = $this->ports[$port] ?? null;
if (is_null($_port)) {
return null;
}
return $_port->getCallback($event);
}
/**
* @param string $type
* @param string $host
@@ -317,6 +284,7 @@ class ServerManager
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws Exception
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
@@ -331,7 +299,7 @@ class ServerManager
$id = Config::get('id', 'system-service');
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL;
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port));
$this->addDefaultListener($settings);
}
@@ -343,43 +311,16 @@ class ServerManager
*/
public function stopServer(int $port)
{
if (!($pid = $this->checkPortIsAlready($port))) {
if (!($pid = checkPortIsAlready($port))) {
return;
}
while ($this->checkPortIsAlready($port)) {
while (checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
/**
* @param $port
* @return bool|string
* @throws Exception
*/
private function checkPortIsAlready($port): bool|string
{
if (!Kiri::getPlatform()->isLinux()) {
exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output);
if (empty($output)) return false;
$output = explode(PHP_EOL, $output[0]);
return $output[0];
}
$serverPid = file_get_contents(storage('.swoole.pid'));
if (!empty($serverPid) && shell_exec('ps -ef | grep ' . $serverPid . ' | grep -v grep')) {
Process::kill($serverPid, SIGTERM);
}
exec('netstat -lnp | grep ' . $port . ' | grep "LISTEN" | awk \'{print $7}\'', $output);
if (empty($output)) {
return false;
}
return explode('/', $output[0])[0];
}
/**
* @param array $settings
* @throws ReflectionException
@@ -475,20 +416,19 @@ class ServerManager
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->getReflect(OnServerTask::class)?->newInstance();
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
} else {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
}
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
}
/**
* @param Port|Server $server
* @param array|null $settings
*/
public function bindCallback(Port|Server $server, ?array $settings = [])
public function bindCallback(?array $settings = [])
{
// TODO: Implement bindCallback() method.
if (count($settings) < 1) {
+32
View File
@@ -0,0 +1,32 @@
<?php
namespace Server;
trait TraitServer
{
/**
* @param array $ports
* @return array
*/
public function sortService(array $ports): array
{
$array = [];
foreach ($ports as $port) {
if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
array_unshift($array, $port);
} else if ($port['type'] == Constant::SERVER_TYPE_HTTP) {
if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$array[] = $port;
} else {
array_unshift($array, $port);
}
} else {
$array[] = $port;
}
}
return $array;
}
}