Compare commits

...

14 Commits

Author SHA1 Message Date
as2252258 14db451313 改名 2021-12-02 18:24:23 +08:00
as2252258 ced92a10e1 改名 2021-12-02 14:16:57 +08:00
as2252258 0d12baf49f 改名 2021-12-01 19:04:59 +08:00
as2252258 b798270bb7 改名 2021-12-01 17:21:15 +08:00
as2252258 955f358577 改名 2021-12-01 16:09:49 +08:00
as2252258 97a5deda23 改名 2021-12-01 15:28:59 +08:00
as2252258 8ae395e9cd 改名 2021-12-01 15:18:34 +08:00
as2252258 44a9a507f6 改名 2021-12-01 15:16:08 +08:00
as2252258 903ee0d6ce 改名 2021-11-30 16:00:39 +08:00
as2252258 a36943cd2d 改名 2021-11-30 15:10:00 +08:00
as2252258 26e0ce7778 改名 2021-11-30 14:59:51 +08:00
as2252258 5a9f9da347 改名 2021-11-30 14:43:55 +08:00
as2252258 a1bf157408 改名 2021-11-30 14:34:15 +08:00
as2252258 27834d37ed 改名 2021-11-30 14:32:56 +08:00
12 changed files with 240 additions and 124 deletions
+1 -1
View File
@@ -4,7 +4,7 @@
namespace Server\Abstracts;
use Annotation\Inject;
use Note\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Server\Abstracts\Server;
use Exception;
use Server\Contract\OnPipeMessageInterface;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Server\Abstracts\Server;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Kiri\Events\EventDispatch;
use Server\Abstracts\Server;
use Kiri\Exception\ConfigException;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Kiri\Events\EventDispatch;
use Server\Events\OnAfterReload;
use Server\Events\OnBeforeReload;
+1 -1
View File
@@ -2,7 +2,7 @@
namespace Server\Handler;
use Annotation\Inject;
use Note\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Core\Help;
+17 -6
View File
@@ -3,13 +3,15 @@
namespace Server;
use Annotation\Inject;
use Exception;
use Http\Handler\Abstracts\HttpService;
use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Config;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Note\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Server\Events\OnShutdown;
@@ -41,6 +43,13 @@ class Server extends HttpService
public EventDispatch $eventDispatch;
/**
* @var State
*/
#[Inject(State::class)]
public State $state;
/**
*
*/
@@ -59,10 +68,11 @@ class Server extends HttpService
/**
* @return string start server
*
* start server
* @return string
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws Exception
*/
public function start(): string
@@ -94,7 +104,7 @@ class Server extends HttpService
{
$configs = Config::get('server', [], true);
foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) {
$this->manager->stopServer($config['port']);
$this->state->exit($config['port']);
}
$this->eventDispatch->dispatch(new OnShutdown());
}
@@ -103,10 +113,11 @@ class Server extends HttpService
/**
* @return bool
* @throws ConfigException
* @throws Exception
*/
public function isRunner(): bool
{
return $this->manager->isRunner();
return $this->state->isRunner();
}
+2 -4
View File
@@ -4,7 +4,7 @@ declare(strict_types=1);
namespace Server;
use Annotation\Inject;
use Note\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Events\EventDispatch;
@@ -105,9 +105,7 @@ class ServerCommand extends Command
$this->configure_set();
Kiri::app()->getRouter()->read_files();
$this->eventProvider->dispatch(new OnServerBeforeStart());
$manager->start();
return 1;
+51 -96
View File
@@ -2,14 +2,16 @@
namespace Server;
use Annotation\Inject;
use Exception;
use Kiri\Abstracts\Config;
use Psr\Container\ContainerInterface;
use Kiri\Di\Container;
use Kiri\Error\Logger;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Note\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Server\Abstracts\BaseProcess;
@@ -21,13 +23,13 @@ use Server\Contract\OnMessageInterface;
use Server\Contract\OnPacketInterface;
use Server\Contract\OnProcessInterface;
use Server\Contract\OnReceiveInterface;
use Server\Contract\OnTaskInterface;
use Server\Events\OnServerBeforeStart;
use Server\Handler\OnPipeMessage;
use Server\Handler\OnServer;
use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
use Server\Handler\OnServerTask;
use Server\Handler\OnServerWorker;
use Server\Tasker\OnServerTask;
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
@@ -51,10 +53,20 @@ class ServerManager
public int $port = 0;
/**
* @var Logger
*/
#[Inject(Logger::class)]
public Logger $logger;
/**
* @var State
*/
#[Inject(State::class)]
public State $state;
/** @var array<string,Port> */
public array $ports = [];
@@ -65,7 +77,7 @@ class ServerManager
/**
* @var ContainerInterface
* @var Container
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
@@ -99,9 +111,11 @@ class ServerManager
/**
* @return Server|WServer|HServer|null
* @throws ReflectionException
*/
public function getServer(): Server|WServer|HServer|null
{
di(EventDispatch::class)->dispatch(new OnServerBeforeStart());
return $this->server;
}
@@ -112,9 +126,10 @@ class ServerManager
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws Exception
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function addListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
@@ -130,8 +145,12 @@ class ServerManager
/**
* @throws ReflectionException
* @param $configs
* @param int $daemon
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function initBaseServer($configs, int $daemon = 0): void
{
@@ -143,23 +162,6 @@ class ServerManager
}
/**
* @return bool
* @throws ConfigException
* @throws Exception
*/
public function isRunner(): bool
{
$configs = Config::get('server', [], true);
foreach ($this->sortService($configs['ports']) as $config) {
if (checkPortIsAlready($config['port'])) {
return true;
}
}
return false;
}
/**
* @param string|OnProcessInterface|BaseProcess $customProcess
* @throws Exception
@@ -191,21 +193,13 @@ class ServerManager
}
/**
* @param string $key
* @param string|int $value
*/
public static function setEnv(string $key, string|int $value): void
{
putenv(sprintf('%s=%s', $key, (string)$value));
}
/**
* @param ServerManager $context
* @param array $config
* @param int $daemon
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
@@ -246,13 +240,16 @@ class ServerManager
* @param int $port
* @param int $mode
* @param array $settings
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = [])
{
$id = Config::get('id', 'system-service');
$this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port));
$this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port));
/** @var Server\Port $service */
$this->ports[$port] = $this->server->addlistener($host, $port, $mode);
@@ -279,8 +276,10 @@ class ServerManager
* @param int $port
* @param int $mode
* @param array $settings
* @throws ReflectionException
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
@@ -302,26 +301,11 @@ class ServerManager
}
/**
* @param int $port
* @throws Exception
*/
public function stopServer(int $port)
{
if (!($pid = checkPortIsAlready($port))) {
return;
}
while (checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
/**
* @param array $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
private function addDefaultListener(array $settings): void
{
@@ -336,12 +320,13 @@ class ServerManager
}
/**
* @param array $events
* @param Server|Port $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
/**
* @param array $events
* @param Server|Port $server
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
private function addServiceEvents(array $events, Server|Port $server)
{
foreach ($events as $name => $event) {
@@ -362,40 +347,6 @@ class ServerManager
}
/**
* @param string $class
* @return object
*/
private function getNewInstance(string $class): object
{
return $this->container->create($class);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws ReflectionException
* @throws Exception
*/
public function task(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerId = random_int($this->server->setting['worker_num'] + 1,
$this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']);
}
if (is_string($handler)) {
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
$handler = $implements->newInstanceArgs($params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param mixed $message
* @param int $workerId
@@ -409,12 +360,14 @@ class ServerManager
/**
* @param array $events
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
private function addTaskListener(array $events = []): void
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->getReflect(OnServerTask::class)?->newInstance();
$reflect = $this->container->get(OnServerTask::class);
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
@@ -426,10 +379,12 @@ class ServerManager
/**
* @param array|null $settings
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function bindCallback(?array $settings = [])
{
// TODO: Implement bindCallback() method.
if (count($settings) < 1) {
return;
}
+56
View File
@@ -0,0 +1,56 @@
<?php
namespace Server;
use Exception;
use Kiri\Abstracts\BaseObject;
use Kiri\Abstracts\Config;
use Swoole\Process;
class State extends BaseObject
{
use TraitServer;
public array $servers = [];
public function init()
{
$this->servers = Config::get('server.ports');
}
/**
* @return bool
* @throws Exception
*/
public function isRunner(): bool
{
$ports = $this->sortService($this->servers);
foreach ($ports as $config) {
if (checkPortIsAlready($config['port'])) {
return true;
}
}
return false;
}
/**
* @param $port
* @throws Exception
*/
public function exit($port)
{
if (!($pid = checkPortIsAlready($port))) {
return;
}
while (checkPortIsAlready($port)) {
Process::kill($pid, SIGTERM);
usleep(300);
}
}
}
+102
View File
@@ -0,0 +1,102 @@
<?php
namespace Server\Tasker;
use Exception;
use Kiri\Abstracts\BaseObject;
use Kiri\Core\HashMap;
use Kiri\Di\Container;
use Kiri\Kiri;
use Note\Inject;
use Psr\Container\ContainerInterface;
use ReflectionException;
use Server\Contract\OnTaskInterface;
use Server\SwooleServerInterface;
/**
*
*/
class AsyncTaskExecute extends BaseObject
{
/**
* @var SwooleServerInterface|null
*/
public ?SwooleServerInterface $server = null;
/**
* @var Container
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
private HashMap $hashMap;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
}
/**
* @param string $key
* @param $handler
*/
public function reg(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int|null $workerId
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = null)
{
if (!$this->server) {
$this->server = Kiri::getDi()->get(SwooleServerInterface::class);
}
if ($workerId === null || $workerId <= $this->server->setting['worker_num']) {
$workerNum = $this->server->setting['worker_num'];
$taskerNum = $workerNum + $this->server->setting['task_worker_num'];
$workerId = random_int($workerNum, $taskerNum - 1);
}
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param $handler
* @param $params
* @return object
* @throws ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
@@ -1,13 +1,12 @@
<?php
namespace Server\Handler;
namespace Server\Tasker;
use Annotation\Inject;
use Note\Inject;
use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use ReflectionException;
use Server\Contract\OnTaskInterface;
use Swoole\Server;
@@ -68,19 +67,14 @@ class OnServerTask
/**
* @param $data
* @return null
* @throws ReflectionException
*/
private function resolve($data)
{
[$class, $params] = json_encode($data, true);
$reflect = Kiri::getDi()->getReflect($class);
if (!$reflect->isInstantiable()) {
return null;
$execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
return $execute->execute();
}
$class = $reflect->newInstanceArgs($params);
return $class->execute();
return null;
}