modify plugin name

This commit is contained in:
2022-06-16 17:38:23 +08:00
parent 10de6b5246
commit 4daad7d111
22 changed files with 1206 additions and 70 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ namespace PHPSTORM_META {
// Reflect
use Kiri\Di\Container;
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
override(ContainerInterface::get(0), map('@'));
override(Container::get(0), map('@'));
+20 -3
View File
@@ -14,7 +14,8 @@ use Kiri\Application;
use Kiri\Core\Json;
use Kiri\Di\Container;
use Kiri\Environmental;
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
use Kiri\Exception\ConfigException;
use Swoole\Coroutine;
use Swoole\Process;
use Swoole\WebSocket\Server;
@@ -149,7 +150,6 @@ class Kiri
}
/**
* @param $className
* @param array $construct
@@ -172,6 +172,24 @@ class Kiri
}
/**
* @param $prefix
* @return void
* @throws ConfigException
*/
public static function setProcessName($prefix): void
{
if (Kiri::getPlatform()->isMac()) {
return;
}
$name = '[' . Config::get('id', 'system-service') . ']';
if (!empty($prefix)) {
$name .= '.' . $prefix;
}
swoole_set_process_name($name);
}
/**
* @return string
* @throws Exception
@@ -311,7 +329,6 @@ class Kiri
}
const PROCESS = 'process';
const TASK = 'task';
const WORKER = 'worker';
+35 -14
View File
@@ -29,27 +29,48 @@ if (!function_exists('make')) {
*/
function make($name, $default = NULL): mixed
{
if (class_exists($name)) {
return Kiri::createObject($name);
if (is_string($name)) {
if (Kiri::has($name)) {
return Kiri::app()->get($name);
}
if (empty($default)) {
throw new Exception("Unknown component ID: $name");
}
if (Kiri::has($default)) {
return Kiri::app()->get($default);
}
return null;
}
if (Kiri::has($name)) {
return Kiri::app()->get($name);
}
if (empty($default)) {
throw new Exception("Unknown component ID: $name");
}
if (Kiri::has($default)) {
return Kiri::app()->get($default);
}
$class = Kiri::createObject($default);
class_alias($name, $default, TRUE);
return $class;
return Kiri::createObject($default);
}
}
if (!function_exists('call')) {
/**
* @param $handler
* @param mixed ...$params
* @return mixed
* @throws Exception
*/
function call($handler, ...$params): mixed
{
if (is_array($handler) && is_string($handler[0])) {
$handler[0] = di($handler[0]);
}
if (!is_callable($handler, true)) {
throw new Exception('Call function not exists.');
}
return call_user_func($handler, ...$params);
}
}
if (!function_exists('map')) {
+75
View File
@@ -0,0 +1,75 @@
<?php
namespace Kiri\CoroutineServer;
use Swoole\Coroutine;
use function Co\run;
class Command
{
public array $arrays = [];
/** @var array<Server> */
private array $servers = [];
/**
* @return void
* @throws \Exception
*/
public function init(): void
{
$this->getServers();
run(function () {
$this->sig();
$waite = new Coroutine\WaitGroup();
foreach ($this->servers as $server) {
$waite->add();
$server->run($waite);
}
$waite->wait();
});
}
/**
* @return void
*/
public function sig(): void
{
Coroutine::create(function () {
$data = Coroutine::waitSignal(SIGTERM | SIGINT, -1);
if ($data) {
foreach ($this->servers as $server) {
$server->stop();
}
}
});
}
/**
* @return void
* @throws \Exception
*/
public function getServers(): void
{
foreach ($this->arrays as $array) {
if (isset($this->servers[$array['name']])) {
throw new \Exception('');
}
$server = new Server(\Kiri::getDi(), []);
$server->setReusePort($array['reuse_port']);
$server->setHost($array['host']);
$server->setPort($array['port']);
$server->setIsSsl($array['isSsl']);
$this->servers[$array['name']] = $server;
}
}
}
+271
View File
@@ -0,0 +1,271 @@
<?php
namespace Kiri\CoroutineServer;
use Closure;
use Co\WaitGroup;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Annotation\Inject;
use Kiri\Context;
use Kiri\Di\ContainerInterface;
use Kiri\Events\EventDispatch;
use Kiri\Message\Constrict\RequestInterface;
use Kiri\Message\Constrict\ResponseInterface;
use Kiri\Message\ServerRequest;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnHandshakeInterface;
use Kiri\Server\Contract\OnMessageInterface;
use Kiri\Server\Contract\OnOpenInterface;
use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnWorkerStart;
use Kiri\ToArray;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
class Server extends Component
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $dispatch;
public string $host = '0.0.0.0';
public int $port = 9527;
public bool $isSsl = false;
public array $router = [];
public bool $reuse_port = true;
private bool $isShutdown = false;
public Coroutine\Http\Server $server;
/**
* @param ContainerInterface $container
* @param array $config
* @throws Exception
*/
public function __construct(public ContainerInterface $container, array $config = [])
{
parent::__construct($config);
}
/**
* @return void
*/
public function init(): void
{
$this->server = new Coroutine\Http\Server($this->host, $this->port, $this->isSsl, $this->reuse_port);
$this->server->set(['max_coroutine' => 500000]);
$this->server->handle('/', [$this, 'actor']);
}
/**
* @param Coroutine\WaitGroup $group
* @return void
*/
public function run(Coroutine\WaitGroup $group): void
{
Coroutine::create(function () use ($group) {
$this->dispatch->dispatch(new OnWorkerStart(null, 0));
$this->start($group);
});
}
/**
* @param WaitGroup $group
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/
public function start(WaitGroup $group): void
{
$this->server->start();
$this->dispatch->dispatch(new OnBeforeShutdown());
if ($this->isShutdown === false) {
$this->start($group);
} else {
$group->done();
}
}
public function stop()
{
$this->isShutdown = true;
$this->server->shutdown();
}
/**
* @param $path
* @param $method
* @param array|Closure $closure
* @return void
*/
public function handler($path, $method, array|Closure $closure): void
{
$this->router[$path] = [strtolower($method), $closure];
}
/**
* @param Request $request
* @param Response $response
* @return mixed
* @throws Exception
*/
public function actor(Request $request, Response $response): mixed
{
Context::setContext(ResponseInterface::class, new \Kiri\Message\Response());
Context::setContext(RequestInterface::class, ServerRequest::createServerRequest($request));
if ($request['request_method'] === 'HEAD') {
return $this->write('', $response, 200);
}
[$method, $handler] = $this->router[$request['request_uri']] ?? [null, null];
if (is_null($handler)) {
return $this->write('Page not found.', $response, 404);
}
if (!is_callable($handler, true)) {
return $this->write('Page not found.', $response, 404);
}
if ($method !== $request['request_method']) {
return $this->write('Page allow.', $response, 405);
}
if (isset($request->header['upgrade']) && $request->header['upgrade'] == 'websocket') {
defer(function () use ($handler, $request) {
if (!$handler instanceof OnOpenInterface) {
return;
}
$handler->onOpen($this->server, $request);
});
if ($handler instanceof OnHandshakeInterface) {
$handler->onHandshake($request, $response);
} else {
$response->upgrade();
}
while (true) {
$read = $response->recv();
if ($read === '' || $read === null || $read instanceof CloseFrame) {
break;
}
if ($handler instanceof OnMessageInterface) {
$handler->onMessage($this->server, $read);
}
}
if ($handler instanceof OnCloseInterface) {
$handler->onClose($this->server, $response->fd);
}
return null;
}
$params = $this->container->getArgs($handler[0], $handler[1] ?? null);
$result = call_user_func($handler, ...$params);
if (is_null($result)) {
return $this->write("", $response);
} else {
return $this->write($result, $response);
}
}
/**
* @param mixed $message
* @param Response $response
* @param int $statusCode
* @return mixed
*/
private function write(mixed $message, Response $response, int $statusCode = 200): mixed
{
$result = $message;
if ($message instanceof ResponseInterface) {
$result = $result->getBody()->getContents();
$response->setStatusCode($message->getStatusCode());
} else {
$message = Context::getContext(ResponseInterface::class);
$response->setStatusCode($statusCode);
}
$headers = $message->getHeaders();
if (is_array($headers)) foreach ($headers as $key => $header) {
$response->setHeader($key, $header);
}
if (!isset($response->header['content-type'])) {
$response->header('content-type', 'application/json');
} else if (!isset($response->header['Content-Type'])) {
$response->header('content-type', 'application/json');
}
$headers = $message->getCookieParams();
if (is_array($headers)) foreach ($headers as $key => $header) {
$response->cookie($key, ...$header);
}
if (is_object($result)) {
$result = $result instanceof ToArray ? $result->toArray() : get_object_vars($result);
}
if (is_array($result)) {
$result = json_encode($result, JSON_UNESCAPED_UNICODE);
}
return $response->end($result);
}
/**
* @param string $host
*/
public function setHost(string $host): void
{
$this->host = $host;
}
/**
* @param int $port
*/
public function setPort(int $port): void
{
$this->port = $port;
}
/**
* @param bool $isSsl
*/
public function setIsSsl(bool $isSsl): void
{
$this->isSsl = $isSsl;
}
/**
* @param bool $reuse_port
*/
public function setReusePort(bool $reuse_port): void
{
$this->reuse_port = $reuse_port;
}
}
+1 -1
View File
@@ -15,7 +15,7 @@ use Kiri;
use Kiri\Di\LocalService;
use Kiri\Error\{ErrorHandler, StdoutLogger, StdoutLoggerInterface};
use Kiri\Exception\{InitException};
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
use Kiri\Server\{Server};
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
+5 -5
View File
@@ -28,11 +28,9 @@ class Logger implements LoggerInterface
const DEBUG = 'debug';
const LOGGER_LEVELS = [Logger::EMERGENCY, Logger::ALERT, Logger::CRITICAL, Logger::ERROR, Logger::WARNING, Logger::NOTICE, Logger::INFO, Logger::DEBUG];
/**
* @param string $message
* @param array $context
@@ -158,12 +156,11 @@ class Logger implements LoggerInterface
}
/**
* @return void
* @throws Exception
*/
public function flush()
public function flush(): void
{
$this->removeFile(storage());
}
@@ -173,7 +170,7 @@ class Logger implements LoggerInterface
* @param string $dirname
* @return void
*/
private function removeFile(string $dirname)
private function removeFile(string $dirname): void
{
$paths = new DirectoryIterator($dirname);
/** @var DirectoryIterator $path */
@@ -197,6 +194,9 @@ class Logger implements LoggerInterface
*/
private function _string($message, $context): string
{
if ($context instanceof \Throwable) {
$context = ['file' => $context->getFile(), 'line' => $context->getLine()];
}
if (!empty($context)) {
return $message . ' ' . PHP_EOL . print_r($context, true) . PHP_EOL;
}
+212
View File
@@ -0,0 +1,212 @@
<?php
namespace Kiri\Reload;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Error\StdoutLoggerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
/**
*
*/
class Inotify extends BaseProcess
{
private mixed $inotify = null;
private mixed $events;
private array $watchFiles = [];
public bool $isReloading = FALSE;
public string $name = 'inotify listen';
public array $dirs = [];
protected int $cid = -1;
const IG_DIR = [APP_PATH . 'commands', APP_PATH . '.git', APP_PATH . '.gitee'];
#[Inject(StdoutLoggerInterface::class)]
public StdoutLoggerInterface $logger;
/**
* @param Process $process
* @return void
* @throws Exception
*/
public function process(Process $process): void
{
// TODO: Implement process() method.
set_error_handler([$this, 'error']);
set_exception_handler([$this, 'error']);
if (!extension_loaded('inotify')) {
while (true) {
if ($this->isStop()) {
break;
}
sleep(1);
}
return;
}
$this->dirs = Config::get('reload.inotify', []);
$this->start();
}
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->isStop = true;
});
return $this;
}
/**
* @return void
*/
public function error(): void
{
}
/**
* @throws Exception
*/
public function start()
{
$this->inotify = inotify_init();
$this->events = IN_MODIFY | IN_DELETE | IN_CREATE | IN_MOVE;
foreach ($this->dirs as $dir) {
if (!is_dir($dir)) continue;
$this->watch($dir);
}
Event::add($this->inotify, [$this, 'check']);
Event::cycle(function () {
if ($this->isStop()) {
Event::del($this->inotify);
Event::exit();
}
}, true);
Event::wait();
}
/**
* 开始监听
* @throws Exception
*/
public function check()
{
if (!($events = inotify_read($this->inotify))) {
return;
}
if ($this->isReloading) {
return;
}
$LISTEN_TYPE = [IN_CREATE, IN_DELETE, IN_MODIFY, IN_MOVED_TO, IN_MOVED_FROM];
foreach ($events as $ev) {
if (!in_array($ev['mask'], $LISTEN_TYPE)) {
continue;
}
//非重启类型
if (str_ends_with($ev['name'], '.php')) {
if ($this->isReloading) {
break;
}
$this->isReloading = TRUE;
Timer::after(3000, fn() => $this->reload());
}
}
}
/**
* @throws Exception
*/
public function reload()
{
$swollen = \Kiri::getDi()->get(SwooleServerInterface::class);
$swollen->reload();
$this->clearWatch();
foreach ($this->dirs as $root) {
$this->watch($root);
}
$this->isReloading = FALSE;
}
/**
* @throws Exception
*/
public function clearWatch()
{
foreach ($this->watchFiles as $wd) {
@inotify_rm_watch($this->inotify, $wd);
}
$this->watchFiles = [];
}
/**
* @param $dir
* @return bool
* @throws Exception
*/
public function watch($dir): bool
{
//目录不存在
if (!is_dir($dir)) {
return $this->logger->addError("[$dir] is not a directory.");
}
//避免重复监听
if (isset($this->watchFiles[$dir])) {
return FALSE;
}
if (in_array($dir, self::IG_DIR)) {
return FALSE;
}
$wd = @inotify_add_watch($this->inotify, $dir, $this->events);
$this->watchFiles[$dir] = $wd;
$files = scandir($dir);
foreach ($files as $f) {
if ($f == '.' || $f == '..') {
continue;
}
$path = $dir . '/' . $f;
//递归目录
if (is_dir($path)) {
$this->watch($path);
} else if (!str_ends_with($f, '.php')) {
continue;
}
//检测文件类型
if (strstr($f, '.') == '.php') {
$wd = @inotify_add_watch($this->inotify, $path, $this->events);
$this->watchFiles[$path] = $wd;
}
}
return TRUE;
}
}
+168
View File
@@ -0,0 +1,168 @@
<?php
namespace Kiri\Reload;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Server\Abstracts\BaseProcess;
use Psr\Log\LoggerInterface;
use Swoole\Process;
class Scaner extends BaseProcess
{
private array $md5Map = [];
public bool $isReloading = FALSE;
private array $dirs = [];
/**
* @var LoggerInterface
*/
#[Inject(LoggerInterface::class)]
public LoggerInterface $logger;
/**
* @throws Exception
*/
public function process(Process $process): void
{
$this->dirs = Config::get('reload.inotify', []);
$this->loadDirs();
$this->tick();
}
/**
* @param bool $isReload
* @throws Exception
*/
private function loadDirs(bool $isReload = FALSE)
{
foreach ($this->dirs as $value) {
if (is_bool($path = realpath($value))) {
continue;
}
if (!is_dir($path)) continue;
$this->loadByDir($path, $isReload);
}
}
/**
* @param $path
* @param bool $isReload
* @return void
* @throws Exception
*/
private function loadByDir($path, bool $isReload = FALSE): void
{
if (!is_string($path)) {
return;
}
$path = rtrim($path, '/');
foreach (glob(realpath($path) . '/*') as $value) {
if (is_dir($value)) {
$this->loadByDir($value, $isReload);
}
if (is_file($value)) {
if ($this->checkFile($value, $isReload)) {
if ($this->isReloading) {
break;
}
$this->isReloading = TRUE;
sleep(2);
$this->timerReload();
break;
}
}
}
}
/**
* @param $value
* @param $isReload
* @return bool
*/
private function checkFile($value, $isReload): bool
{
$md5 = md5($value);
$mTime = filectime($value);
if (!isset($this->md5Map[$md5])) {
if ($isReload) {
return TRUE;
}
$this->md5Map[$md5] = $mTime;
} else {
if ($this->md5Map[$md5] != $mTime) {
if ($isReload) {
return TRUE;
}
$this->md5Map[$md5] = $mTime;
}
}
return FALSE;
}
/**
* @throws Exception
*/
public function timerReload()
{
$this->isReloading = TRUE;
$this->logger->warning('file change');
$swow = \Kiri::getDi()->get(SwooleServerInterface::class);
$swow->reload();
$this->loadDirs();
$this->isReloading = FALSE;
$this->tick();
}
/**
* @return $this
*/
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->onProcessStop();
});
return $this;
}
/**
* @throws Exception
*/
public function tick()
{
if ($this->isStop) {
return;
}
$this->loadDirs(TRUE);
sleep(2);
$this->tick();
}
}
+4 -4
View File
@@ -4,7 +4,7 @@ namespace Kiri\Task\Annotation;
use Kiri\Annotation\AbstractAttribute;
use Kiri\Task\TaskManager;
use Kiri\Task\TaskContainer;
#[\Attribute(\Attribute::TARGET_CLASS)] class AsynchronousTask extends AbstractAttribute
{
@@ -25,9 +25,9 @@ use Kiri\Task\TaskManager;
*/
public function execute(mixed $class, mixed $method = ''): mixed
{
$AsyncTaskExecute = \Kiri::getDi()->get(TaskManager::class);
$AsyncTaskExecute->add($this->name, $class::class);
return parent::execute($class, $method); // TODO: Change the autogenerated stub
// TODO: Change the autogenerated stub
di(TaskContainer::class)->add($this->name, $class::class);
return parent::execute($class, $method);
}
}
+3 -3
View File
@@ -30,7 +30,7 @@ class OnServerTask
* @param mixed $data
* @throws ConfigException
*/
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data)
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): void
{
try {
$data = $this->resolve($data);
@@ -49,7 +49,7 @@ class OnServerTask
* @param Server\Task $task
* @throws ConfigException
*/
public function onCoroutineTask(?Server $server, Server\Task $task)
public function onCoroutineTask(?Server $server, Server\Task $task): void
{
try {
$data = $this->resolve($task->data);
@@ -82,7 +82,7 @@ class OnServerTask
* @param int $task_id
* @param mixed $data
*/
public function onFinish(Server $server, int $task_id, mixed $data)
public function onFinish(Server $server, int $task_id, mixed $data): void
{
if (!($data instanceof OnTaskInterface)) {
return;
+1 -1
View File
@@ -12,6 +12,6 @@ interface OnTaskInterface
public function execute();
public function finish(Server $server, int $task_id);
public function finish(?Server $server, int $task_id);
}
@@ -7,11 +7,11 @@ use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Server;
class TaskManager extends Component
class TaskContainer extends Component
{
@@ -30,30 +30,6 @@ class TaskManager extends Component
}
/**
* @param Server $swollen
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function taskListener(Server $swollen)
{
if (!isset($swollen->setting['task_worker_num']) || $swollen->setting['task_worker_num'] < 1) {
return;
}
$task_use_object = $swollen->setting['task_object'] ?? $swollen->setting['task_use_object'] ?? false;
$reflect = $this->container->get(OnServerTask::class);
$swollen->on('finish', [$reflect, 'onFinish']);
if ($task_use_object || $swollen->setting['task_enable_coroutine']) {
$swollen->on('task', [$reflect, 'onCoroutineTask']);
} else {
$swollen->on('task', [$reflect, 'onTask']);
}
}
/**
* @param string $key
* @param $handler
@@ -5,26 +5,28 @@ namespace Kiri\Task;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Server\SwooleServerInterface;
use Kiri\Server\ServerInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Coroutine;
use Swoole\Server;
/**
*
*/
class AsyncTaskExecute extends Component
class TaskExecute extends Component
{
/**
* @param TaskManager $hashMap
* @param TaskContainer $hashMap
* @param ContainerInterface $container
* @param array $config
* @throws Exception
*/
public function __construct(public TaskManager $hashMap, public ContainerInterface $container, array $config = [])
public function __construct(public TaskContainer $hashMap, public ContainerInterface $container, array $config = [])
{
parent::__construct($config);
}
@@ -43,16 +45,45 @@ class AsyncTaskExecute extends Component
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
if ($this->container->has(SwooleServerInterface::class)) {
$server = $this->container->get(SwooleServerInterface::class);
if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) {
$workerId = random_int(0, $server->setting['task_worker_num'] - 1);
}
$server->task(serialize($handler), $workerId);
if ($this->container->has(ServerInterface::class)) {
$this->onAsyncExec($handler, $workerId);
} else {
Coroutine::create(fn() => $this->onCoronExec($handler));
}
}
/**
* @param OnTaskInterface|string $handler
* @param int $workerId
* @return bool
* @throws Exception
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
protected function onAsyncExec(OnTaskInterface|string $handler, int $workerId = -1): bool
{
/** @var Server $server */
$server = $this->container->get(ServerInterface::class);
if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) {
$workerId = random_int(0, $server->setting['task_worker_num'] - 1);
}
return (bool)$server->task(serialize($handler), $workerId);
}
/**
* @param OnTaskInterface|string $handler
* @return bool
*/
protected function onCoronExec(OnTaskInterface|string $handler): bool
{
$handler->execute();
$handler->finish(null, Coroutine::getCid());
return true;
}
/**
* @param $handler
* @param $params
@@ -0,0 +1,22 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Swoole\Coroutine\Http\Server as CoroutineServer;
interface OnCloseInterface
{
/**
* @param Server|CoroutineServer $server
* @param int $fd
* @return void
*/
public function onClose(Server|CoroutineServer $server, int $fd): void;
}
@@ -0,0 +1,21 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\Http\Request;
use Swoole\Http\Response;
interface OnHandshakeInterface
{
/**
* @param Request $request
* @param Response $response
* @return int
*/
public function onHandshake(Request $request, Response $response): int;
}
@@ -0,0 +1,22 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Swoole\Coroutine\Http\Server as CoroutineServer;
interface OnMessageInterface
{
/**
* @param Server|CoroutineServer $server
* @param Frame $frame
* @return void
*/
public function onMessage(Server|CoroutineServer $server, Frame $frame): void;
}
@@ -0,0 +1,23 @@
<?php
namespace Kiri\Websocket\Contract;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Swoole\Coroutine\Http\Server as CoroutineServer;
interface OnOpenInterface
{
/**
* @param Server|CoroutineServer $server
* @param Request $request
* @return void
*/
public function onOpen(Server|CoroutineServer $server, Request $request): void;
}
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Kiri\Websocket;
class Dispatcher
{
}
+219
View File
@@ -0,0 +1,219 @@
<?php
namespace Kiri\Websocket;
use Closure;
use Co\WaitGroup;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Context;
use Kiri\Di\ContainerInterface;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Message\Abstracts\ExceptionHandlerInterface;
use Kiri\Message\Constrict\RequestInterface;
use Kiri\Message\Constrict\ResponseInterface;
use Kiri\Message\ExceptionHandlerDispatcher;
use Kiri\Message\Handler\DataGrip;
use Kiri\Message\Handler\Dispatcher;
use Kiri\Message\Handler\Handler;
use Kiri\Message\Handler\RouterCollector;
use Kiri\Message\ResponseEmitter;
use Kiri\Message\ServerRequest;
use Kiri\Message\Constrict\Response as CResponse;
use Kiri\Server\Events\OnShutdown;
use Kiri\Websocket\Contract\OnCloseInterface;
use Kiri\Websocket\Contract\OnHandshakeInterface;
use Kiri\Websocket\Contract\OnMessageInterface;
use Kiri\Websocket\Contract\OnOpenInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server as WsServer;
use Swoole\Coroutine\Http\Server as WhServer;
class Server extends Component
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $dispatch;
public string $host = '0.0.0.0';
public int $port = 9527;
public bool $isSsl = false;
public RouterCollector $router;
public bool $reuse_port = true;
private bool $isShutdown = false;
public Coroutine\Http\Server $server;
public ExceptionHandlerInterface $exception;
private int|Handler|null $handler;
public array|null|Closure $handshake;
public array|null|Closure $message;
public array|null|Closure $close;
public array|null|Closure $disconnect;
/**
* @param ContainerInterface $container
* @param DataGrip $collector
* @param Dispatcher $dispatcher
* @param ResponseEmitter $emitter
* @param array $config
* @throws Exception
*/
public function __construct(
public ContainerInterface $container,
public DataGrip $collector,
public Dispatcher $dispatcher,
public ResponseEmitter $emitter,
array $config = [])
{
parent::__construct($config);
$this->router = $this->collector->get('wss');
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ConfigException
*/
public function init(): void
{
$exception = Config::get('exception.http', ExceptionHandlerDispatcher::class);
if (!in_array(ExceptionHandlerInterface::class, class_implements($exception))) {
$exception = ExceptionHandlerDispatcher::class;
}
$this->exception = $this->container->get($exception);
$this->handler = $this->router->find('/', 'GET');
}
/**
* @param Request $request
* @param Response $response
* @return void
* @throws Exception
*/
public function onHandshake(Request $request, Response $response): void
{
try {
/** @var \Kiri\Message\Response $psrResponse */
$psrResponse = Context::setContext(ResponseInterface::class, new \Kiri\Message\Response());
/** @var ServerRequest $psrRequest */
$psrRequest = Context::setContext(RequestInterface::class, ServerRequest::createServerRequest($request));
$handler = $this->router->find('/', 'GET');
if (is_integer($handler)) {
$psrResponse->withContent('Allow Method[' . $request->getMethod() . '].')->withStatus(405);
} else if (is_null($handler)) {
$psrResponse->withContent('Page not found.')->withStatus(404);
} else {
$psrResponse = $this->dispatcher->with($handler)->handle($psrRequest);
$executor = $handler->callback;
$response->upgrade();
if ($handler instanceof OnHandshakeInterface) {
$statusCode = $handler->onHandshake($request, $response);
$response->setStatusCode($statusCode);
$response->write("connect ok.");
}
if ($executor instanceof OnOpenInterface) {
$executor->onOpen($this->server, $request);
}
while (true) {
$frame = $response->recv();
if ($frame === false || $frame instanceof CloseFrame || $frame === '') {
$handler->onClose($this->server, $response->fd);
break;
}
$handler->onMessage($this->server, $frame);
}
}
} catch (\Throwable $throwable) {
$psrResponse = $this->exception->emit($throwable, di(CResponse::class));
} finally {
$this->emitter->sender($response, $psrResponse);
}
}
/**
* @param WsServer|Coroutine\Http\Server $server
* @param Frame $frame
* @return void
*/
public function onMessage(WsServer|WhServer $server, Frame $frame): void
{
$handler = $this->handler->callback;
if ($handler instanceof OnMessageInterface) {
$handler->onMessage($server, $frame);
}
}
/**
* @param WsServer|Coroutine\Http\Server $server
* @param int $fd
* @return void
*/
public function onClose(WsServer|WhServer $server, int $fd): void
{
$handler = $this->handler->callback;
if ($handler instanceof OnCloseInterface) {
$handler->onClose($server, $fd);
}
}
/**
* @param WsServer|WhServer $server
* @param Request $request
* @return void
*/
public function onOpen(WsServer|WhServer $server, Request $request): void
{
$handler = $this->handler->callback;
if ($handler instanceof OnOpenInterface) {
$handler->onOpen($server, $request);
}
}
}
@@ -0,0 +1,38 @@
<?php
class TestSocketServer
{
public function onHandshake($request, $response): int
{
return 200;
}
public function onMessage($server, $frame): void
{
// TODO: Implement onMessage() method.
}
public function onClose($server, int $fd): void
{
// TODO: Implement onClose() method.
}
public function onOpen($server, $request): void
{
// TODO: Implement onOpen() method.
}
}
var_dump(is_callable(new TestSocketServer(), true));
//
//Router::addServer('ws', function () {
// Router::get('/', 'TestSocketServer');
//});
+13 -1
View File
@@ -1,5 +1,17 @@
<?php
use Swoole\Runtime;
use Swoole\Coroutine\Http\Server;
var_dump(30 / 60);
Runtime::enableCoroutine(true);
\Co\run(function () {
Swoole\Coroutine::create(function () {
var_dump(1);
});
$server = new Server('0.0.0.0',9501);
$server->handle('/', function () {
});
$server->start();
});