This commit is contained in:
2023-04-16 01:45:34 +08:00
parent 46baac8bbd
commit 11c21f01a2
35 changed files with 36 additions and 1377 deletions
+1
View File
@@ -1,4 +1,5 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
+1
View File
@@ -1,4 +1,5 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
+1
View File
@@ -1,4 +1,5 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
+1
View File
@@ -1,4 +1,5 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
-82
View File
@@ -1,82 +0,0 @@
<?php
namespace Kiri\Actor;
use Exception;
use Kiri\Di\ContainerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Process;
class ActorProcess extends BaseProcess
{
/**
* @var bool
*/
protected bool $enable_coroutine = true;
/**
* @var string
*/
public string $name = 'actor-process';
/**
* @param ContainerInterface $container
*/
public function __construct(public ContainerInterface $container)
{
}
/**
* @param Process $process
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function process(Process $process): void
{
// TODO: Implement process() method.
$actorManager = $this->container->get(ActorManager::class);
while (!$this->isStop()) {
$read = json_decode($process->read(), true);
if (is_null($read) || !isset($read['category'])) {
continue;
}
$message = new ActorMessage($read['userId'], $read['event'], $read['body']);
switch ($read['category']) {
case ActorState::MESSAGE:
$actorManager->write($read['name'], $message);
break;
case ActorState::CREATE:
/** @var ActorInterface $actor */
$actor = $this->container->create($read['class'], $read['constrict'], $read['config']);
$actorManager->addActor($actor);
break;
case ActorState::SHUTDOWN:
$actorManager->closeActor($read['name']);
break;
}
}
}
/**
* @return $this
*/
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->onProcessStop();
});
return $this;
}
}
-12
View File
@@ -1,12 +0,0 @@
<?php
namespace Kiri\Abstracts;
abstract class AbstractServer extends Component
{
public string $name = 'http';
}
+1 -13
View File
@@ -13,10 +13,9 @@ namespace Kiri\Abstracts;
use Exception;
use Kiri;
use Kiri\Di\LocalService;
use Kiri\Error\{ErrorHandler, StdoutLogger, StdoutLoggerInterface};
use Kiri\Error\{StdoutLogger, StdoutLoggerInterface};
use Kiri\Exception\{InitException};
use Kiri\Di\ContainerInterface;
use Kiri\Server\{Server};
use Psr\Log\LoggerInterface;
use Kiri\Events\EventProvider;
@@ -225,17 +224,6 @@ abstract class BaseMain extends Component
}
/**
*
* @return Server
* @throws
*/
public function getServer(): Server
{
return Kiri::getDi()->get(Server::class);
}
/**
* @param string $name
* @return mixed|null
+2
View File
@@ -1,4 +1,6 @@
<?php
declare(strict_types=1);
namespace Kiri;
+3
View File
@@ -1,6 +1,9 @@
<?php
declare(strict_types=1);
namespace Kiri;
+6 -31
View File
@@ -14,14 +14,11 @@ use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Core\Json;
use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Message\Events\OnAfterRequest;
use Kiri\Message\Handler\Formatter\IFormatter;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Kiri\Server\Events\OnShutdown;
use ReflectionException;
use Kiri\Di\Inject\Container;
/**
* Class ErrorHandler
@@ -32,10 +29,6 @@ use ReflectionException;
class ErrorHandler extends Component implements ErrorInterface
{
/** @var ?IFormatter $message */
private ?IFormatter $message = NULL;
/**
* @var string
*/
@@ -45,7 +38,7 @@ class ErrorHandler extends Component implements ErrorInterface
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
#[Container(EventDispatch::class)]
public EventDispatch $dispatch;
@@ -114,7 +107,7 @@ class ErrorHandler extends Component implements ErrorInterface
$message = array_shift($messages);
$this->dispatch->dispatch(new OnShutdown());
$this->dispatch->dispatch(new Kiri\Events\OnSystemError());
$this->sendError($message, $lastError['file'], $lastError['line']);
}
@@ -125,14 +118,13 @@ class ErrorHandler extends Component implements ErrorInterface
*
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
public function exceptionHandler(\Throwable $exception)
{
$this->category = 'exception';
$this->dispatch->dispatch(new OnAfterRequest());
$this->dispatch->dispatch(new Kiri\Events\OnSystemError());
$this->sendError($exception->getMessage(), $exception->getFile(), $exception->getLine());
}
@@ -158,7 +150,7 @@ class ErrorHandler extends Component implements ErrorInterface
$this->logger->error('On error handler', [$data]);
$this->dispatch->dispatch(new OnAfterRequest());
$this->dispatch->dispatch(new Kiri\Events\OnSystemError());
throw new \ErrorException($error[1], $error[0], 1, $error[2], $error[3]);
}
@@ -181,24 +173,7 @@ class ErrorHandler extends Component implements ErrorInterface
return $data;
}
/**
* @return mixed
*/
public function getErrorMessage(): mixed
{
$message = $this->message;
$this->message = NULL;
return $message->getData();
}
/**
* @return bool
*/
public function getAsError(): bool
{
return $this->message !== NULL;
}
/**
* @param $message
-53
View File
@@ -1,53 +0,0 @@
<?php
namespace Kiri\Error;
use Exception;
use Kiri\Message\Aspect\OnAspectInterface;
use Kiri\Message\Aspect\OnJoinPointInterface;
use Kiri\Message\Constrict\RequestInterface;
use Kiri;
use Psr\Log\LoggerInterface;
/**
* Class LoggerAspect
* @package Kiri\Error
*/
class LoggerAspect implements OnAspectInterface
{
/**
* @param OnJoinPointInterface $joinPoint
* @return mixed
* @throws Exception
*/
public function process(OnJoinPointInterface $joinPoint): mixed
{
$time = microtime(true);
$response = $joinPoint->process();
$this->print_runtime($time);
return $response;
}
/**
* @param $startTime
* @throws Exception
*/
private function print_runtime($startTime)
{
$request = Kiri::getDi()->get(RequestInterface::class);
$runTime = round(microtime(true) - $startTime, 6);
$logger = Kiri::getDi()->get(LoggerInterface::class);
$logger->debug(sprintf('run %s use time %6f', $request->getUri()->__toString(), $runTime));
}
}
+2
View File
@@ -1,5 +1,7 @@
<?php
declare(strict_types=1);
namespace Kiri\Error;
use Kiri\Abstracts\Logger;
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Kiri\Events;
class OnSystemError
{
}
-11
View File
@@ -7,12 +7,7 @@ namespace Kiri\Pool;
use Database\Mysql\PDO;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Di\ContainerInterface;
use Kiri\Exception\ConfigException;
use Kiri\Server\Abstracts\StatusEnum;
use Kiri\Server\WorkerStatus;
/**
@@ -25,12 +20,6 @@ class Pool extends Component
/** @var array<PoolItem> */
private array $_connections = [];
/**
* @var WorkerStatus
*/
#[Inject(WorkerStatus::class)]
public WorkerStatus $status;
/**
* @param $name
+1 -1
View File
@@ -1,9 +1,9 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use Closure;
use Kiri\Annotation\Inject;
use Kiri\Di\Context;
use Swoole\Coroutine\Channel;
+2
View File
@@ -1,5 +1,7 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use Kiri\Di\Context;
+3
View File
@@ -1,5 +1,8 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
interface QueueInterface
+2
View File
@@ -1,5 +1,7 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
+2
View File
@@ -1,5 +1,7 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
interface StopHeartbeatCheck
-122
View File
@@ -1,122 +0,0 @@
<?php
namespace Kiri\Redis;
use Exception;
use Kiri;
use Kiri\Abstracts\Logger;
use Kiri\Exception\RedisConnectException;
use Kiri\Pool\StopHeartbeatCheck;
use Kiri\Server\Events\OnWorkerExit;
use RedisException;
use Swoole\Timer;
use function error;
/**
*
*/
class Helper implements StopHeartbeatCheck
{
private ?\Redis $pdo = null;
public string $host;
public int $port;
public int $database = 0;
public string $auth = '';
public string $prefix = '';
public int $timeout = 30;
public int $read_timeout = 30;
public array $pool = [];
private int $_timer = -1;
/**
* @param array $config
*/
public function __construct(array $config)
{
$this->host = $config['host'];
$this->port = $config['port'];
$this->database = $config['databases'];
$this->auth = $config['auth'];
$this->prefix = $config['prefix'];
$this->timeout = $config['timeout'];
$this->read_timeout = $config['read_timeout'];
$this->pool = $config['pool'];
}
/**
* clear client heartbeat
*/
public function stopHeartbeatCheck(): void
{
$this->_timer = -1;
}
/**
* @param string $name
* @param array $arguments
* @return mixed
* @throws RedisConnectException|RedisException
*/
public function __call(string $name, array $arguments)
{
if (!method_exists($this, $name)) {
return $this->_pdo()->{$name}(...$arguments);
}
return $this->{$name}(...$arguments);
}
/**
* @return \Redis
* @throws Exception
* @throws RedisException
*/
public function _pdo(): \Redis
{
if (!($this->pdo instanceof \Redis) || !$this->pdo->ping('isOk')) {
$this->pdo = $this->newClient();
}
return $this->pdo;
}
/**
* @return \Redis
* @throws Exception
*/
private function newClient(): \Redis
{
$redis = new \Redis();
if (!$redis->connect($this->host, $this->port, $this->timeout)) {
throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $this->host, $this->port));
}
if (!empty($this->auth) && !$redis->auth($this->auth)) {
throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $this->host, $this->auth));
}
if ($this->read_timeout < 0) {
$this->read_timeout = 0;
}
$redis->select($this->database);
if ($this->read_timeout > 0) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout);
}
$redis->setOption(\Redis::OPT_PREFIX, $this->prefix);
return $redis;
}
}
-212
View File
@@ -1,212 +0,0 @@
<?php
namespace Kiri\Reload;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Error\StdoutLoggerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\ServerInterface;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
/**
*
*/
class Inotify extends BaseProcess
{
private mixed $inotify = null;
private mixed $events;
private array $watchFiles = [];
public bool $isReloading = FALSE;
public string $name = 'inotify listen';
public array $dirs = [];
protected int $cid = -1;
const IG_DIR = [APP_PATH . 'commands', APP_PATH . '.git', APP_PATH . '.gitee'];
#[Inject(StdoutLoggerInterface::class)]
public StdoutLoggerInterface $logger;
/**
* @param Process $process
* @return void
* @throws Exception
*/
public function process(Process $process): void
{
// TODO: Implement process() method.
set_error_handler([$this, 'error']);
set_exception_handler([$this, 'error']);
if (!extension_loaded('inotify')) {
while (true) {
if ($this->isStop()) {
break;
}
sleep(1);
}
return;
}
$this->dirs = Config::get('reload.inotify', []);
$this->start();
}
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->isStop = true;
});
return $this;
}
/**
* @return void
*/
public function error(): void
{
}
/**
* @throws Exception
*/
public function start()
{
$this->inotify = inotify_init();
$this->events = IN_MODIFY | IN_DELETE | IN_CREATE | IN_MOVE;
foreach ($this->dirs as $dir) {
if (!is_dir($dir)) continue;
$this->watch($dir);
}
Event::add($this->inotify, [$this, 'check']);
Event::cycle(function () {
if ($this->isStop()) {
Event::del($this->inotify);
}
}, true);
Event::wait();
}
/**
* 开始监听
* @throws Exception
*/
public function check()
{
if (!($events = inotify_read($this->inotify))) {
return;
}
if ($this->isReloading) {
return;
}
$LISTEN_TYPE = [IN_CREATE, IN_DELETE, IN_MODIFY, IN_MOVED_TO, IN_MOVED_FROM];
foreach ($events as $ev) {
if (!in_array($ev['mask'], $LISTEN_TYPE)) {
continue;
}
//非重启类型
if (str_ends_with($ev['name'], '.php')) {
if ($this->isReloading) {
break;
}
$this->isReloading = TRUE;
Timer::after(3000, fn() => $this->reload());
}
}
}
/**
* @throws Exception
*/
public function reload()
{
$swollen = \Kiri::getDi()->get(ServerInterface::class);
$swollen->reload();
$this->clearWatch();
foreach ($this->dirs as $root) {
$this->watch($root);
}
$this->isReloading = FALSE;
}
/**
* @throws Exception
*/
public function clearWatch()
{
foreach ($this->watchFiles as $wd) {
@inotify_rm_watch($this->inotify, $wd);
}
$this->watchFiles = [];
}
/**
* @param $dir
* @return bool
* @throws Exception
*/
public function watch($dir): bool
{
//目录不存在
if (!is_dir($dir)) {
return $this->logger->addError("[$dir] is not a directory.");
}
//避免重复监听
if (isset($this->watchFiles[$dir])) {
return FALSE;
}
if (in_array($dir, self::IG_DIR)) {
return FALSE;
}
$wd = @inotify_add_watch($this->inotify, $dir, $this->events);
$this->watchFiles[$dir] = $wd;
$files = scandir($dir);
foreach ($files as $f) {
if ($f == '.' || $f == '..') {
continue;
}
$path = $dir . '/' . $f;
//递归目录
if (is_dir($path)) {
$this->watch($path);
} else if (!str_ends_with($f, '.php')) {
continue;
}
//检测文件类型
if (strstr($f, '.') == '.php') {
$wd = @inotify_add_watch($this->inotify, $path, $this->events);
$this->watchFiles[$path] = $wd;
}
}
return TRUE;
}
}
-166
View File
@@ -1,166 +0,0 @@
<?php
namespace Kiri\Reload;
use DirectoryIterator;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\ServerInterface;
use Psr\Log\LoggerInterface;
use Swoole\Process;
use Swoole\Timer;
class Scanner extends BaseProcess
{
private array $md5Map = [];
public bool $isReloading = FALSE;
public string $name = 'hot reload';
protected bool $enable_coroutine = true;
private array $dirs = [];
/**
* @var LoggerInterface
*/
#[Inject(LoggerInterface::class)]
public LoggerInterface $logger;
/**
* @throws Exception
*/
public function process(Process $process): void
{
$this->dirs = Config::get('reload.inotify', []);
$this->loadDirs();
Timer::tick(3000, fn() => $this->loadDirs(true));
}
/**
* @param bool $isReload
* @throws Exception
*/
private function loadDirs(bool $isReload = FALSE)
{
echo 'file tick ' . date('Y-m-d H:i:s') . PHP_EOL;
try {
if ($this->isReloading) {
return;
}
foreach ($this->dirs as $value) {
if ($this->isReloading) {
break;
}
$value = new DirectoryIterator($value);
if ($value->isDir()) {
$this->loadByDir($value, $isReload);
}
}
} catch (\Throwable $throwable) {
$this->logger->error($throwable->getMessage(), [$throwable]);
}
}
/**
* @param DirectoryIterator $iterator
* @param bool $isReload
* @return void
* @throws Exception
*/
private function loadByDir(DirectoryIterator $iterator, bool $isReload = FALSE): void
{
foreach ($iterator as $path) {
if ($this->isReloading) {
return;
}
if (!$this->isNeedCheck($path)) {
continue;
}
if ($path->isDir()) {
$this->loadByDir(new DirectoryIterator($path->getRealPath()), $isReload);
}
if ($this->checkFile($path, $isReload)) {
$this->isReloading = TRUE;
$this->timerReload();
break;
}
}
}
private function isNeedCheck(DirectoryIterator $path): bool
{
if ($path->isDot() || str_starts_with($path->getFilename(), '.')) {
return false;
}
if ($path->getExtension() !== 'php') {
return false;
}
return true;
}
/**
* @param DirectoryIterator $value
* @param $isReload
* @return bool
*/
private function checkFile(DirectoryIterator $value, $isReload): bool
{
$md5 = md5($value->getRealPath());
$mTime = filectime($value->getRealPath());
if (!isset($this->md5Map[$md5])) {
if ($isReload) {
return TRUE;
}
$this->md5Map[$md5] = $mTime;
} else {
if ($this->md5Map[$md5] != $mTime) {
if ($isReload) {
return TRUE;
}
$this->md5Map[$md5] = $mTime;
}
}
return FALSE;
}
/**
* @return $this
*/
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
Timer::clearAll();
$this->onProcessStop();
});
return $this;
}
/**
* @throws Exception
*/
public function timerReload()
{
$this->logger->warning('file change');
$swow = \Kiri::getDi()->get(ServerInterface::class);
$swow->reload();
$this->loadDirs();
$this->isReloading = FALSE;
}
}
-33
View File
@@ -1,33 +0,0 @@
<?php
namespace Kiri\Task\Annotation;
use Kiri\Annotation\AbstractAttribute;
use Kiri\Task\TaskContainer;
#[\Attribute(\Attribute::TARGET_CLASS)] class AsynchronousTask extends AbstractAttribute
{
/**
* @param string $name
*/
public function __construct(public string $name)
{
}
/**
* @param mixed $class
* @param mixed $method
* @return mixed
*/
public function execute(mixed $class, mixed $method = ''): mixed
{
// TODO: Change the autogenerated stub
di(TaskContainer::class)->add($this->name, $class::class);
return parent::execute($class, $method);
}
}
-94
View File
@@ -1,94 +0,0 @@
<?php
namespace Kiri\Task;
use Kiri\Annotation\Inject;
use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Task\OnTaskInterface;
use Swoole\Server;
/**
* Class OnServerTask
* @package Server\Task
*/
class OnServerTask
{
#[Inject(Logger::class)]
public Logger $logger;
/**
* @param Server $server
* @param int $task_id
* @param int $src_worker_id
* @param mixed $data
* @throws ConfigException
*/
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): void
{
try {
$data = $this->resolve($data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [throwable($exception)]);
} finally {
$server->finish($data);
}
}
/**
* @param Server|null $server
* @param Server\Task $task
* @throws ConfigException
*/
public function onCoroutineTask(?Server $server, Server\Task $task): void
{
try {
$data = $this->resolve($task->data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [throwable($exception)]);
} finally {
$task->finish($data);
}
}
/**
* @param $data
* @return null
*/
private function resolve($data)
{
$execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
return $execute->execute();
}
return null;
}
/**
* @param Server $server
* @param int $task_id
* @param mixed $data
*/
public function onFinish(Server $server, int $task_id, mixed $data): void
{
if (!($data instanceof OnTaskInterface)) {
return;
}
$data->finish($server, $task_id);
}
}
-17
View File
@@ -1,17 +0,0 @@
<?php
namespace Kiri\Task;
use Swoole\Server;
interface OnTaskInterface
{
public function execute();
public function finish(?Server $server, int $task_id);
}
-72
View File
@@ -1,72 +0,0 @@
<?php
namespace Kiri\Task;
use Exception;
use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Psr\Container\ContainerExceptionInterface;
use Kiri\Di\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Server;
class TaskContainer extends Component
{
private HashMap $hashMap;
/**
* @param ContainerInterface $container
* @param array $config
* @throws Exception
*/
public function __construct(public ContainerInterface $container, array $config = [])
{
parent::__construct($config);
$this->hashMap = new HashMap();
}
/**
* @param string $key
* @param $handler
*/
public function add(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param string $key
* @return bool
*/
#[Pure] public function has(string $key): bool
{
return $this->hashMap->has($key);
}
/**
* @param string $key
* @return OnTaskInterface
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function get(string $key): OnTaskInterface
{
$task = $this->hashMap->get($key);
if (is_string($task)) {
$task = $this->container->get($task);
if (!empty($task)) {
$this->add($key, $task);
}
}
return $task;
}
}
-108
View File
@@ -1,108 +0,0 @@
<?php
namespace Kiri\Task;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Server\ServerInterface;
use Psr\Container\ContainerExceptionInterface;
use Kiri\Di\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Coroutine;
use Swoole\Server;
/**
*
*/
class TaskExecute extends Component
{
/**
* @param TaskContainer $hashMap
* @param ContainerInterface $container
* @param array $config
* @throws Exception
*/
public function __construct(public TaskContainer $hashMap, public ContainerInterface $container, array $config = [])
{
parent::__construct($config);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int $workerId
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
{
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
if ($this->container->has(ServerInterface::class)) {
$this->onAsyncExec($handler, $workerId);
} else {
Coroutine::create(fn() => $this->onCoronExec($handler));
}
}
/**
* @param OnTaskInterface|string $handler
* @param int $workerId
* @return bool
* @throws Exception
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
protected function onAsyncExec(OnTaskInterface|string $handler, int $workerId = -1): bool
{
/** @var Server $server */
$server = $this->container->get(ServerInterface::class);
if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) {
$workerId = random_int(0, $server->setting['task_worker_num'] - 1);
}
return (bool)$server->task(serialize($handler), $workerId);
}
/**
* @param OnTaskInterface|string $handler
* @return bool
*/
protected function onCoronExec(OnTaskInterface|string $handler): bool
{
$handler->execute();
$handler->finish(null, Coroutine::getCid());
return true;
}
/**
* @param $handler
* @param $params
* @return object
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
@@ -1,17 +0,0 @@
<?php
namespace Kiri\Websocket\Contract;
interface OnCloseInterface
{
/**
* @param int $fd
* @return void
*/
public function OnClose(int $fd): void;
}
@@ -1,15 +0,0 @@
<?php
namespace Kiri\Websocket\Contract;
interface OnDisconnectInterface
{
/**
* @param int $fd
*/
public function OnDisconnect(int $fd): void;
}
@@ -1,21 +0,0 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\Http\Request;
use Swoole\Http\Response;
interface OnHandshakeInterface
{
/**
* @param Request $request
* @param Response $response
* @return void
*/
public function OnHandshake(Request $request, Response $response): void;
}
@@ -1,19 +0,0 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\WebSocket\Frame;
interface OnMessageInterface
{
/**
* @param Frame $frame
* @return void
*/
public function OnMessage(Frame $frame): void;
}
@@ -1,19 +0,0 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\Http\Request;
interface OnOpenInterface
{
/**
* @param Request $request
* @return void
*/
public function onOpen(Request $request): void;
}
-8
View File
@@ -1,8 +0,0 @@
<?php
namespace Kiri\Websocket;
class Dispatcher
{
}
-213
View File
@@ -1,213 +0,0 @@
<?php
namespace Kiri\Websocket;
use Closure;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Di\ContainerInterface;
use Kiri\Di\Context;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Message\Abstracts\ExceptionHandlerInterface;
use Kiri\Message\Constrict\Response as CResponse;
use Kiri\Message\Constrict\ResponseInterface;
use Kiri\Message\ExceptionHandlerDispatcher;
use Kiri\Message\Handler\DataGrip;
use Kiri\Message\Handler\Dispatcher;
use Kiri\Message\Handler\RouterCollector;
use Kiri\Message\ResponseEmitter;
use Kiri\Websocket\Contract\OnCloseInterface;
use Kiri\Websocket\Contract\OnDisconnectInterface;
use Kiri\Websocket\Contract\OnHandshakeInterface;
use Kiri\Websocket\Contract\OnMessageInterface;
use Kiri\Websocket\Contract\OnOpenInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Coroutine;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
/**
*
*/
class Server extends Component implements OnHandshakeInterface, OnMessageInterface, OnCloseInterface, OnDisconnectInterface, OnOpenInterface
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $dispatch;
/**
* @var string
*/
public string $host = '0.0.0.0';
/**
* @var int
*/
public int $port = 9527;
/**
* @var RouterCollector
*/
public RouterCollector $router;
/**
* @var Coroutine\Http\Server
*/
public Coroutine\Http\Server $server;
/**
* @var ExceptionHandlerInterface
*/
public ExceptionHandlerInterface $exception;
/**
* @var array|Closure|null
*/
public array|null|Closure $message;
/**
* @var array|Closure|null
*/
public array|null|Closure $close;
/**
* @param ContainerInterface $container
* @param DataGrip $collector
* @param Dispatcher $dispatcher
* @param ResponseEmitter $emitter
* @param array $config
* @throws Exception
*/
public function __construct(
public ContainerInterface $container,
public DataGrip $collector,
public Dispatcher $dispatcher,
public ResponseEmitter $emitter,
array $config = [])
{
parent::__construct($config);
$this->router = $this->collector->get('wss');
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ConfigException
*/
public function init(): void
{
$exception = Config::get('exception.websocket', ExceptionHandlerDispatcher::class);
if (!in_array(ExceptionHandlerInterface::class, class_implements($exception))) {
$exception = ExceptionHandlerDispatcher::class;
}
$this->exception = $this->container->get($exception);
}
/**
* @param Request $request
* @param Response $response
* @return void
* @throws Exception
*/
public function OnHandshake(Request $request, Response $response): void
{
try {
/** @var \Kiri\Message\Response $psrResponse */
$psrResponse = Context::set(ResponseInterface::class, new \Kiri\Message\Response());
$handler = $this->router->find('/', 'GET');
if (is_integer($handler)) {
$psrResponse->withContent('Allow Method[' . $request->getMethod() . '].')->withStatus(405);
} else if (is_null($handler)) {
$psrResponse->withContent('Page not found.')->withStatus(404);
} else {
$executor = $handler->dispatch->handler;
if (Context::inCoroutine()) {
$response->upgrade();
if ($handler instanceof OnHandshakeInterface) {
$handler->OnHandshake($request, $response);
}
if ($executor instanceof OnOpenInterface) {
$executor->OnOpen($request);
}
while (true) {
$frame = $response->recv();
if ($frame === false || $frame instanceof CloseFrame || $frame === '') {
$handler->OnClose($response->fd);
break;
}
$handler->OnMessage($frame);
}
} else {
$this->OnOpen($request);
}
}
} catch (\Throwable $throwable) {
$psrResponse = $this->exception->emit($throwable, di(CResponse::class));
} finally {
$this->emitter->sender($response, $psrResponse);
}
}
/**
* @param Frame $frame
* @return void
*/
public function OnMessage(Frame $frame): void
{
}
/**
* @param int $fd
* @return void
*/
public function OnClose(int $fd): void
{
}
/**
* @param int $fd
* @return void
*/
public function OnDisconnect(int $fd): void
{
// TODO: Implement OnDisconnect() method.
}
/**
* @param Request $request
* @return void
*/
public function OnOpen(Request $request): void
{
}
}
@@ -1,38 +0,0 @@
<?php
class TestSocketServer
{
public function onHandshake($request, $response): int
{
return 200;
}
public function onMessage($server, $frame): void
{
// TODO: Implement onMessage() method.
}
public function onClose($server, int $fd): void
{
// TODO: Implement onClose() method.
}
public function onOpen($server, $request): void
{
// TODO: Implement onOpen() method.
}
}
var_dump(is_callable(new TestSocketServer(), true));
//
//Router::addServer('ws', function () {
// Router::get('/', 'TestSocketServer');
//});