Compare commits

..

10 Commits

Author SHA1 Message Date
as2252258 555d653288 Revert "改名"
This reverts commit fdf58326
2022-01-18 10:18:13 +08:00
as2252258 45cf88e52c Revert "改名"
This reverts commit fdf58326
2022-01-17 19:04:26 +08:00
as2252258 a8f840bfb2 Revert "改名"
This reverts commit fdf58326
2022-01-17 18:48:57 +08:00
as2252258 d8222366b1 Revert "改名"
This reverts commit fdf58326
2022-01-17 18:45:00 +08:00
as2252258 fbe13eaa7e Revert "改名"
This reverts commit fdf58326
2022-01-17 14:04:37 +08:00
as2252258 466df3387f Revert "改名"
This reverts commit fdf58326
2022-01-17 10:59:55 +08:00
as2252258 fa76b5170a Revert "改名"
This reverts commit fdf58326
2022-01-15 10:23:57 +08:00
as2252258 b0c66c9c6a Revert "改名"
This reverts commit fdf58326
2022-01-14 16:50:01 +08:00
as2252258 2ffdf83645 Revert "改名"
This reverts commit fdf58326
2022-01-14 16:05:12 +08:00
as2252258 c3a3551ba3 Revert "改名"
This reverts commit fdf58326
2022-01-14 15:52:38 +08:00
19 changed files with 544 additions and 349 deletions
+1 -274
View File
@@ -14,7 +14,6 @@ use Kiri\Application;
use Kiri\Core\Json;
use Kiri\Di\Container;
use Kiri\Environmental;
use Kiri\Server\Tasker\AsyncTaskExecute;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine;
use Swoole\Process;
@@ -139,25 +138,6 @@ class Kiri
}
/**
* @param $port
* @return bool
* @throws Exception
*/
public static function port_already($port): bool
{
if (empty($port)) {
return false;
}
if (Kiri::getPlatform()->isLinux()) {
exec('netstat -tunlp | grep ' . $port, $output);
} else {
exec('lsof -i :' . $port . ' | grep -i "LISTEN"', $output);
}
return !empty($output);
}
/**
* @return Annotation
* @throws Exception
@@ -168,15 +148,6 @@ class Kiri
}
/**
* @param $service
* @return string
*/
#[Pure] public static function listen($service): string
{
return sprintf('Check listen %s::%d -> ok', $service['host'], $service['port']);
}
/**
* @param $className
@@ -215,15 +186,6 @@ class Kiri
}
/**
* @return bool
*/
public static function inCoroutine(): bool
{
return Coroutine::getCid() > 0;
}
/**
* @return Container
*/
@@ -242,40 +204,6 @@ class Kiri
}
/**
* @param $workerId
* @return mixed
* @throws Exception
*/
public static function setManagerId($workerId): mixed
{
if (empty($workerId) || static::isDocker()) {
return $workerId;
}
$tmpFile = storage($workerId . '.sock', 'pid/manager');
return self::writeFile($tmpFile, $workerId);
}
/**
* @param $workerId
* @return mixed
* @throws Exception
*/
public static function setProcessId($workerId): mixed
{
if (empty($workerId) || static::isDocker()) {
return $workerId;
}
$tmpFile = storage($workerId . '.sock', 'pid/process');
return self::writeFile($tmpFile, $workerId);
}
/**
* @return bool
*/
@@ -289,39 +217,6 @@ class Kiri
}
/**
* @param $workerId
* @return mixed
* @throws Exception
*/
public static function setWorkerId($workerId): mixed
{
if (empty($workerId) || static::isDocker()) {
return $workerId;
}
$tmpFile = storage($workerId . '.sock', 'pid/worker');
return self::writeFile($tmpFile, $workerId);
}
/**
* @param $workerId
* @return mixed
* @throws Exception
*/
public static function setTaskId($workerId): mixed
{
if (empty($workerId) || static::isDocker()) {
return $workerId;
}
$tmpFile = storage($workerId . '.sock', 'pid/task');
return self::writeFile($tmpFile, $workerId);
}
/**
* @param $fileName
* @param $content
@@ -334,7 +229,7 @@ class Kiri
if ($is_append !== null) {
$params[] = $is_append;
}
return !self::inCoroutine() ? file_put_contents(...$params) : Coroutine::writeFile(...$params);
return !(Coroutine::getCid() > 0) ? file_put_contents(...$params) : Coroutine::writeFile(...$params);
}
@@ -355,95 +250,6 @@ class Kiri
}
/**
* @param $workerId
* @param bool $isWorker
* @throws Exception
*/
public static function clearProcessId($workerId, bool $isWorker = false)
{
clearstatcache();
$directory = $isWorker === true ? 'pid/worker' : 'pid/task';
if (!file_exists($file = storage($workerId, $directory))) {
return;
}
shell_exec('rm -rf ' . $file);
}
/**
* @param string|null $taskPid
* @throws Exception
*/
public static function clearTaskPid(string $taskPid = null)
{
if (empty($taskPid)) {
exec('rm -rf ' . storage(null, 'pid/task'));
} else {
static::clearProcessId($taskPid);
}
}
/**
* @param $taskPid
* @throws Exception
*/
public static function clearWorkerPid($taskPid = null)
{
if (empty($taskPid)) {
exec('rm -rf ' . storage(null, 'pid/worker'));
} else {
static::clearProcessId($taskPid, true);
}
}
/**
* @return Server|null
* @throws
*/
public static function getWebSocket(): ?\Swoole\Server
{
$server = static::app()->getSwoole();
if (!($server instanceof \Swoole\Server)) {
return null;
}
return $server;
}
/**
* @return false|string
* @throws Exception
*/
public static function getMasterPid(): bool|string
{
$pid = Kiri::app()->getSwoole()->setting['pid_file'];
return file_get_contents($pid);
}
/**
* @param int $fd
* @param $data
* @return mixed
* @throws Exception
*/
public static function push(int $fd, $data): mixed
{
$server = static::getWebSocket();
if (empty($server) || !$server->isEstablished($fd)) {
return false;
}
if (!is_string($data)) {
$data = Json::encode($data);
}
return $server->push($fd, $data);
}
/**
* @return mixed
*/
@@ -453,19 +259,6 @@ class Kiri
}
/**
* @param string $class
* @param array $params
* @throws ReflectionException
* @throws Exception
*/
public static function async(string $class, array $params = [])
{
$manager = di(AsyncTaskExecute::class);
$manager->execute(new $class(...$params));
}
/**
* @param array $v1
* @param array $v2
@@ -490,19 +283,6 @@ class Kiri
}
/**
* @param $process
* @throws Exception
*/
public static function shutdown($process): void
{
static::app()->getSwoole()->shutdown();
if ($process instanceof Process) {
$process->exit(0);
}
}
/**
* @param $tmp_name
* @return string
@@ -540,34 +320,10 @@ class Kiri
}
private static array $_autoload = [];
const PROCESS = 'process';
const TASK = 'task';
const WORKER = 'worker';
/**
* @param string $event
* @param null $data
* @return false|string
* @throws Exception
*/
public static function param(string $event, $data = NULL): bool|string
{
if (is_object($data)) {
if ($data instanceof ModelInterface || $data instanceof Collection) {
$data = $data->getAttributes();
} else {
$data = get_object_vars($data);
}
}
if (!is_array($data)) $data = ['data' => $data];
return json_encode(array_merge(['callback' => $event], $data));
}
/**
* @return string|null
*/
@@ -603,35 +359,6 @@ class Kiri
return static::getEnvironmental() == static::PROCESS;
}
/**
* @param $class
* @param $file
*/
public static function setAutoload($class, $file)
{
if (isset(static::$_autoload[$class])) {
return;
}
static::$_autoload[$class] = $file;
include_once "Kiri.php";
}
/**
* @param $className
*/
public static function autoload($className)
{
if (!isset(static::$_autoload[$className])) {
return;
}
$file = static::$_autoload[$className];
require_once "Kiri.php";
}
}
//spl_autoload_register([Kiri::class, 'autoload'], true, true);
Kiri::setContainer(new Container());
+1
View File
@@ -36,6 +36,7 @@
"Kiri\\Websocket\\": "kiri-websocket-server/",
"Gii\\": "kiri-gii/",
"Kiri\\Annotation\\": "kiri-annotation/",
"Kiri\\Server\\": "kiri-server/",
"Kiri\\Task\\": "kiri-task/"
},
"files": [
-16
View File
@@ -297,22 +297,6 @@ if (!function_exists('injectRuntime')) {
}
if (!function_exists('swoole')) {
/**
* @return Server|null
* @throws Exception
*/
function swoole(): ?Server
{
return Kiri::getWebSocket();
}
}
if (!function_exists('directory')) {
/**
-44
View File
@@ -1,44 +0,0 @@
<?php
namespace Kiri\Annotation;
use Exception;
use Kiri;
use Kiri\Server\Tasker\AsyncTaskExecute;
/**
* Class Task
* @package Annotation
* Task任务
*/
#[\Attribute(\Attribute::TARGET_CLASS)] class Task extends Attribute
{
/**
* Task constructor.
* @param string $name
*/
public function __construct(public string $name)
{
}
/**
* @param mixed $class
* @param mixed|null $method
* @return bool
* @throws Exception
*/
public function execute(mixed $class, mixed $method = null): bool
{
$task = Kiri::getDi()->get(AsyncTaskExecute::class);
$task->reg($this->name, $class);
return true;
}
}
+4 -2
View File
@@ -22,7 +22,9 @@ use Kiri\Di\LocalService;
use Kiri\Error\{ErrorHandler, Logger};
use Kiri\Exception\{InitException, NotFindClassException};
use ReflectionException;
use Kiri\Server\{Contract\OnTaskInterface, Server, ServerManager, Tasker\AsyncTaskExecute};
use Kiri\Server\{Server, ServerManager};
use Kiri\Task\AsyncTaskExecute;
use Kiri\Task\OnTaskInterface;
use Swoole\Table;
/**
@@ -200,7 +202,7 @@ abstract class BaseApplication extends Component
/**
* @param OnTaskInterface $execute
* @throws ReflectionException
* @throws ReflectionException|Exception
*/
public function task(OnTaskInterface $execute): void
{
+1 -1
View File
@@ -7,7 +7,7 @@ namespace Kiri;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Server\ServerManager;
use Kiri\Server\Tasker\AsyncTaskExecute;
use Kiri\Task\AsyncTaskExecute;
use Kiri;
/**
* Class Async
+2 -3
View File
@@ -86,13 +86,12 @@ class Inotify
continue;
}
var_dump($ev);
$search = array_search($ev['wd'], $this->watchFiles);
//非重启类型
if (str_ends_with($ev['name'], '.php')) {
Timer::after(3000, fn() => $this->reload($ev['name']));
Timer::after(3000, fn() => $this->reload($search));
$this->isReloading = TRUE;
}
}
+21 -5
View File
@@ -6,10 +6,10 @@ namespace Kiri\Pool;
use Closure;
use Database\Mysql\PDO;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Context;
use Kiri;
use Swoole\Error;
use Throwable;
@@ -118,6 +118,19 @@ class Connection extends Component
}
/**
* @param string $name
* @param PDO $PDO
* @return void
* @throws Kiri\Exception\ConfigException
* @throws Exception
*/
public function addItem(string $name, PDO $PDO)
{
$this->getPool()->push($name, $PDO);
}
/**
* @param $name
* @param $isMaster
@@ -126,21 +139,24 @@ class Connection extends Component
*/
public function initConnections($name, $isMaster, $max)
{
$this->getPool()->initConnections($name, $isMaster, $max);
$pool = $this->getPool();
$pool->initConnections($name, $isMaster, $max);
}
/**
* @param $coroutineName
* @param $isMaster
* @param array $config
* @throws Kiri\Exception\ConfigException
* @throws Exception
*/
public function release($coroutineName, $isMaster)
public function release($coroutineName, $isMaster, array $config)
{
$coroutineName = $this->name('Mysql:' . $coroutineName, $isMaster);
/** @var PDO $client */
if (!($client = Context::getContext($coroutineName)) instanceof PDO) {
return;
$client = call_user_func($this->create($coroutineName, $config));
}
if ($client->inTransaction()) {
return;
@@ -184,7 +200,7 @@ class Connection extends Component
} else {
$result = true;
}
} catch (Error | Throwable $exception) {
} catch (Error|Throwable $exception) {
$result = $this->addError($exception, 'mysql');
} finally {
return $result;
+1 -1
View File
@@ -31,7 +31,7 @@ class GiiTask extends GiiBase
namespace App\Async;
use Kiri\Server\Contract\OnTaskInterface;
use Kiri\Task\OnTaskInterface;
';
+115
View File
@@ -0,0 +1,115 @@
<?php
namespace Kiri\Server;
use Kiri;
use Kiri\Context;
use Kiri\Events\EventDispatch;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\Contract\OnProcessInterface;
use Kiri\Server\Events\OnProcessStart;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Process\Manager;
use Swoole\Process\Pool;
class CoroutineServer
{
private Manager $manager;
/**
* @param string|OnProcessInterface|BaseProcess $customProcess
* @param string $system
* @return void
*/
public function addProcess(string|OnProcessInterface|BaseProcess $customProcess, string $system)
{
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
}
$this->manager->add(function (Pool $pool, int $workerId) use ($customProcess, $system) {
$process = $pool->getProcess($workerId);
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnProcessStart());
set_env('environmental', Kiri::PROCESS);
$channel = Coroutine::create(function () use ($process, $customProcess) {
while (!$customProcess->isStop()) {
$message = $process->read();
if (!empty($message)) {
$message = unserialize($message);
}
if (is_null($message)) {
continue;
}
$customProcess->onBroadcast($message);
}
});
Context::setContext('waite:process:message', $channel);
$customProcess->onSigterm()->process($process);
}, $customProcess->isEnableCoroutine());
}
/**
* @param array $settings
* @return void
*/
public function httpServer(array $settings = []): void
{
$this->manager->add(function (Pool $pool, int $workerId) use ($settings) {
$host = $settings['host'];
$port = $settings['port'];
$server = new Server($host, $port, false, true);
$server->set($settings);
$callback = $settings['events'][Constant::REQUEST] ?? null;
if (is_null($callback)) {
$pool->shutdown();
return;
}
if (is_string($callback[0])) {
$callback[0] = Kiri::getDi()->get($callback[0]);
}
$server->handle('/', $callback);
$server->start();
}, true);
}
/**
* @param array $settings
* @return void
*/
public function websocketServer(array $settings)
{
$this->manager->add(function () use ($settings) {
$host = $settings['host'];
$port = $settings['port'];
$server = new Server($host, $port, false, true);
$server->set($settings);
$hServer = \Kiri::getDi()->get(\Kiri\Message\Server::class);
$server->handle('/', function (Request $request, Response $response) use ($hServer) {
call_user_func([$hServer, 'onRequest'], $request, $response);
});
$server->start();
}, true);
}
}
+33
View File
@@ -0,0 +1,33 @@
<?php
namespace Kiri\Task\Annotation;
use Kiri\Annotation\Attribute;
use Kiri\Task\TaskManager;
#[\Attribute(\Attribute::TARGET_CLASS)] class AsynchronousTask extends Attribute
{
/**
* @param string $name
*/
public function __construct(public string $name)
{
}
/**
* @param mixed $class
* @param mixed $method
* @return mixed
*/
public function execute(mixed $class, mixed $method = ''): mixed
{
$AsyncTaskExecute = \Kiri::getDi()->get(TaskManager::class);
$AsyncTaskExecute->add($this->name, $class::class);
return parent::execute($class, $method); // TODO: Change the autogenerated stub
}
}
+42
View File
@@ -0,0 +1,42 @@
<?php
namespace Kiri\Task;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Server\SwooleServerInterface;
use Swoole\Coroutine;
/**
*
*/
class AsyncTaskExecute extends Component
{
use TaskResolve;
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int $workerId
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
{
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$server = Kiri::getDi()->get(SwooleServerInterface::class);
if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) {
$workerId = random_int(0, $server->setting['task_worker_num'] - 1);
}
$server->task(serialize($handler), $workerId);
}
}
+85
View File
@@ -0,0 +1,85 @@
<?php
namespace Kiri\Task;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Kiri\Exception\ConfigException;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Server\Task;
class CoroutineTaskExecute extends Component
{
use TaskResolve;
private HashMap $hashMap;
private Coroutine\Channel $channel;
private OnServerTask $taskServer;
private int $total = 50;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
$this->channel = new Coroutine\Channel($this->total);
$this->taskServer = \Kiri::getDi()->get(OnServerTask::class);
Coroutine::create(function () {
$barrier = Coroutine\Barrier::make();
for ($i = 0; $i < 50; $i++) {
Coroutine::create(function () {
$this->handler();
});
}
Coroutine\Barrier::wait($barrier);
});
}
/**
* @return void
* @throws ConfigException
*/
protected function handler()
{
$data = $this->channel->pop(-1);
$task = new Task();
$task->data = $data;
$this->taskServer->onCoroutineTask(null, $task);
$this->handler();
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int $workerId
* @return void
* @throws ReflectionException
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
{
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->channel->push(serialize($handler));
}
}
+94
View File
@@ -0,0 +1,94 @@
<?php
namespace Kiri\Task;
use Kiri\Annotation\Inject;
use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Task\OnTaskInterface;
use Swoole\Server;
/**
* Class OnServerTask
* @package Server\Task
*/
class OnServerTask
{
#[Inject(Logger::class)]
public Logger $logger;
/**
* @param Server $server
* @param int $task_id
* @param int $src_worker_id
* @param mixed $data
* @throws ConfigException
*/
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data)
{
try {
$data = $this->resolve($data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [error_trigger_format($exception)]);
} finally {
$server->finish($data);
}
}
/**
* @param Server|null $server
* @param Server\Task $task
* @throws ConfigException
*/
public function onCoroutineTask(?Server $server, Server\Task $task)
{
try {
$data = $this->resolve($task->data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [error_trigger_format($exception)]);
} finally {
$task->finish($data);
}
}
/**
* @param $data
* @return null
*/
private function resolve($data)
{
$execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
return $execute->execute();
}
return null;
}
/**
* @param Server $server
* @param int $task_id
* @param mixed $data
*/
public function onFinish(Server $server, int $task_id, mixed $data)
{
if (!($data instanceof OnTaskInterface)) {
return;
}
$data->finish($server, $task_id);
}
}
+17
View File
@@ -0,0 +1,17 @@
<?php
namespace Kiri\Task;
use Swoole\Server;
interface OnTaskInterface
{
public function execute();
public function finish(Server $server, int $task_id);
}
+77
View File
@@ -0,0 +1,77 @@
<?php
namespace Kiri\Task;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Server;
class TaskManager extends Component
{
private HashMap $hashMap;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
}
/**
* @param Server $swollen
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function taskListener(Server $swollen)
{
if (!isset($swollen->setting['task_worker_num']) || $swollen->setting['task_worker_num'] < 1) {
return;
}
$task_use_object = $swollen->setting['task_object'] ?? $swollen->setting['task_use_object'] ?? false;
$reflect = $this->container->get(OnServerTask::class);
$swollen->on('finish', [$reflect, 'onFinish']);
if ($task_use_object || $swollen->setting['task_enable_coroutine']) {
$swollen->on('task', [$reflect, 'onCoroutineTask']);
} else {
$swollen->on('task', [$reflect, 'onTask']);
}
}
/**
* @param string $key
* @param $handler
*/
public function add(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param string $key
* @return OnTaskInterface
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function get(string $key): OnTaskInterface
{
$task = $this->hashMap->get($key);
if (is_string($task)) {
$task = $this->getContainer()->get($task);
}
return $task;
}
}
+31
View File
@@ -0,0 +1,31 @@
<?php
namespace Kiri\Task;
use Exception;
use ReflectionException;
trait TaskResolve
{
/**
* @param $handler
* @param $params
* @return object
* @throws ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
+10
View File
@@ -28,6 +28,16 @@ class Sender implements WebSocketInterface
}
/**
* @param AliasServer|Server $server
*/
public function setServer(mixed $server): void
{
$this->server = $server;
}
/**
* @param int $fd
* @param mixed $data
+9 -3
View File
@@ -3,15 +3,15 @@
namespace Kiri\Websocket;
use Exception;
use Kiri\Message\Handler\DataGrip;
use Kiri\Abstracts\AbstractServer;
use Kiri\Message\Handler\DataGrip;
use Kiri\Message\Handler\RouterCollector;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnHandshakeInterface;
use Kiri\Server\Contract\OnMessageInterface;
use Kiri\Server\Contract\OnOpenInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\Frame;
@@ -32,6 +32,9 @@ class Server extends AbstractServer
public mixed $callback = null;
public Sender $sender;
/**
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
@@ -44,6 +47,9 @@ class Server extends AbstractServer
return;
}
$this->callback = $handler->callback[0];
$this->sender = $this->container->get(Sender::class);
$this->sender->setServer($this->server);
}