Compare commits

...

21 Commits

Author SHA1 Message Date
as2252258 9ce0cb96bf eee 2025-07-11 16:27:14 +08:00
as2252258 c7b5d5fb59 eee 2025-01-18 20:45:37 +08:00
as2252258 daa89f3794 eee 2024-11-28 14:35:20 +08:00
as2252258 60670f7f97 eee 2024-11-18 11:24:08 +08:00
as2252258 3a0c5f6d70 eee 2024-11-15 14:30:18 +08:00
as2252258 ef312c2aa4 eee 2024-11-15 14:24:19 +08:00
as2252258 a0eb46cf4e eee 2024-09-23 16:16:01 +08:00
as2252258 fbd3799081 eee 2024-09-12 09:06:02 +08:00
as2252258 97a3335217 eee 2024-09-11 15:22:25 +08:00
as2252258 1790d01730 eee 2024-09-11 11:10:15 +08:00
as2252258 8a4b62b2d8 eee 2024-09-11 11:09:26 +08:00
as2252258 526256302d eee 2024-09-04 10:14:31 +08:00
as2252258 95254ac300 eee 2024-09-04 10:07:59 +08:00
as2252258 fdf7757b6a eee 2024-09-03 15:05:19 +08:00
as2252258 6e4a045c7d eee 2024-09-03 14:47:30 +08:00
as2252258 03d16d8157 eee 2024-08-29 18:06:58 +08:00
as2252258 c8041cc09e eee 2024-08-29 17:01:09 +08:00
as2252258 976f67a838 eee 2024-05-01 02:06:14 +08:00
as2252258 c7e0cd4948 eee 2024-05-01 02:02:58 +08:00
as2252258 3fc1b16f33 eee 2024-04-15 15:39:31 +08:00
as2252258 103b757a05 eee 2024-04-15 15:39:03 +08:00
16 changed files with 204 additions and 571 deletions
+77 -9
View File
@@ -4,6 +4,7 @@ defined('APP_PATH') or define('APP_PATH', realpath(__DIR__ . '/../../'));
use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Kernel;
use Kiri\Application;
use Kiri\Config\ConfigProvider;
use Kiri\Core\ArrayAccess;
@@ -12,6 +13,7 @@ use Kiri\Events\EventDispatch;
use Kiri\Events\EventProvider;
use Kiri\Router\Request;
use Kiri\Router\Response;
use Kiri\Server\Task\TaskExecute;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Swoole\Process;
@@ -28,16 +30,16 @@ interface Arrayable
}
if (!function_exists('json_validator')) {
if (!function_exists('json_validate')) {
/**
* @param string $data
* @return bool
*/
function json_validator(string $data): bool
function json_validate(string $data): bool
{
return is_null(json_decode($data));
return is_array(json_decode($data, true));
}
}
@@ -46,17 +48,80 @@ if (!function_exists('json_validator')) {
if (!function_exists('application')) {
/**
* @param Kernel $Kernel
* @return Application
* @throws
*/
function application(): Application
function application(Kernel $Kernel): Application
{
return Kiri::getDi()->get(Application::class);
$application = Kiri::getDi()->get(Application::class);
$application->commands($Kernel);
return $application;
}
}
if (!function_exists('task')) {
/**
* @param string $handler
* @param mixed $data
* @param int $dstWorkerId
* @param callable|null $finishFinishCallback
* @return void
* @throws Exception
*/
function task(string $handler, mixed $data, int $dstWorkerId = -1, ?callable $finishFinishCallback = null): void
{
$execute = Kiri::getDi()->get(TaskExecute::class);
$execute->task($handler, $data, $dstWorkerId, $finishFinishCallback);
}
}
if (!function_exists('taskWait')) {
/**
* @param string $handler
* @param mixed $data
* @param float $timeout
* @param int $dstWorkerId
* @return mixed
* @throws Exception
*/
function taskWait(string $handler, mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed
{
$execute = Kiri::getDi()->get(TaskExecute::class);
return $execute->taskWait($handler, $data, $timeout, $dstWorkerId);
}
}
if (!function_exists('taskCo')) {
/**
* @param array $tasks
* @param float $timeout
* @return false|array
*/
function taskCo(array $tasks, float $timeout = 0.5): false|array
{
$execute = Kiri::getDi()->get(TaskExecute::class);
return $execute->taskCo($tasks, $timeout);
}
}
if (!function_exists('taskWaitMulti')) {
/**
* @param array $tasks
* @param float $timeout
* @return false|array
*/
function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array
{
$execute = Kiri::getDi()->get(TaskExecute::class);
return $execute->taskWaitMulti($tasks, $timeout);
}
}
if (!function_exists('make')) {
@@ -82,7 +147,8 @@ if (!function_exists('isJson')) {
function isJson(?string $string): bool
{
if (is_null($string)) return false;
if (is_null($string))
return false;
return (str_starts_with($string, '{') && str_ends_with($string, '}'))
|| (str_ends_with($string, '[') && str_starts_with($string, ']'));
}
@@ -142,7 +208,8 @@ if (!function_exists('checkPortIsAlready')) {
{
if (!Kiri::getPlatform()->isLinux()) {
exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output);
if (empty($output)) return FALSE;
if (empty($output))
return FALSE;
$output = explode(PHP_EOL, $output[0]);
return $output[0];
}
@@ -842,7 +909,8 @@ if (!function_exists('jTraceEx')) {
{
$starter = $seen ? 'Caused by: ' : '';
$result = [];
if (!$seen) $seen = [];
if (!$seen)
$seen = [];
$trace = $e->getTrace();
$prev = $e->getPrevious();
$result[] = sprintf('%s%s: %s', $starter, $e::class, $e->getMessage());
-238
View File
@@ -1,238 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
use Exception;
use JsonSerializable;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
/**
* @Actor
*/
abstract class Actor implements ActorInterface, JsonSerializable
{
/**
* @var Channel
*/
private Channel $channel;
/**
* @var bool
*/
private bool $isShutdown = false;
/**
* @var int
*/
private int $messageId = -1;
/**
* @var int
*/
private int $coroutineId = -1;
/**
* @var ActorState
*/
private ActorState $state;
/**
* @var float
*/
private float $startTime = 0;
/**
* @var int
*/
private int $refreshInterval = 0;
/**
* @return ActorState
*/
public function getState(): ActorState
{
return $this->state;
}
/**
* @param ActorState $state
*/
public function setState(ActorState $state): void
{
$this->state = $state;
}
/**
* @return float
*/
public function getRunTime(): float
{
return microtime(true) - $this->startTime;
}
/**
* @param string $uniqueId
*/
private function __construct(readonly public string $uniqueId)
{
$this->channel = new Channel(99);
$this->startTime = microtime(true);
}
/**
* @return void
*/
public function init(): void
{
}
/**
* @return bool
*/
public function isShutdown(): bool
{
return $this->isShutdown;
}
/**
* @param $id
* @return static
*/
public static function newActor($id): static
{
$actor = new static($id);
$actor->listen();
return $actor;
}
/**
* @return void
*/
private function listen(): void
{
Coroutine::create(function (Actor $actor) {
$actor->coroutineId = Coroutine::getCid();
$this->run();
}, $this);
}
/**
* @return string
*/
public function getName(): string
{
return $this->uniqueId;
}
/**
* @param mixed $response
* @return bool
*/
public function write(mixed $response): bool
{
return $this->channel->push($response);
}
/**
* @return void
*/
public function shutdown(): void
{
$this->isShutdown = true;
Coroutine::cancel($this->coroutineId);
if ($this->messageId > -1) {
Coroutine::cancel($this->messageId);
}
$this->channel->close();
}
/**
* @return void
*/
public function onUpdate(): void
{
}
/**
* @return void
* @throws Exception
*/
public function run(): void
{
if ($this->refreshInterval < 1) {
throw new Exception('Refresh interval must be greater than 1');
}
$this->setState(ActorState::BUSY);
$this->init();
$this->messageId = Coroutine::create(fn() => $this->loop());
$this->interval();
$this->setState(ActorState::IDLE);
}
/**
* @return void
*/
private function interval(): void
{
if ($this->isShutdown()) {
return;
}
try {
$this->onUpdate();
} catch (\Throwable $exception) {
error($exception);
}
Coroutine::sleep($this->refreshInterval / 1000);
$this->interval();
}
/**
* @return bool
*/
private function loop(): bool
{
if ($this->messageId == -1) {
$this->messageId = Coroutine::getCid();
}
if ($this->channel->errCode == SWOOLE_CHANNEL_CLOSED) {
$this->channel = new Channel(99);
}
$message = $this->channel->pop();
$this->process($message);
if ($this->isShutdown()) {
return true;
}
return $this->loop();
}
}
-15
View File
@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
interface ActorInterface
{
/**
* @param ActorMessage $message
* @return void
*/
public function process(ActorMessage $message): void;
}
-110
View File
@@ -1,110 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
use Swoole\Coroutine;
class ActorManager
{
/** @var array<string, ActorInterface> */
private array $nodes = [];
/**
* @param Actor $actor
* @return void
*/
public function addActor(ActorInterface $actor): void
{
$this->nodes[$actor->uniqueId] = $actor;
Coroutine::create(function (Actor $actor) {
$actor->run();
}, $actor);
}
/**
* @param $name
* @return void
*/
public function closeActor($name): void
{
$node = $this->nodes[$name] ?? null;
if (is_null($node)) {
return;
}
foreach ($node as $actor) {
$actor->shutdown();
}
}
/**
* @param $name
* @param $message
* @return bool
*/
public function write($name, $message): bool
{
$actor = $this->nodes[$name] ?? null;
if (is_null($actor)) {
return false;
}
return $actor->write($message);
}
/**
* @param $name
* @return array
*/
public function lists($name): array
{
$array = [];
foreach ($this->nodes[$name] as $actor) {
$array[] = [
'id' => $actor->getName(),
'state' => $actor->getState()->name,
'runTime' => $actor->getRunTime()
];
}
return $array;
}
/**
* @param string $uniqueId
* @return bool
*/
public function hasActor(string $uniqueId): bool
{
return isset($this->nodes[$uniqueId]) && $this->nodes[$uniqueId] instanceof ActorInterface;
}
/**
* @param array|null $data
* @return void
*/
public static function exec(?array $data): void
{
if (is_null($data)) {
return;
}
}
/**
* @return void
*/
public function clean(): void
{
foreach ($this->nodes as $actor) {
$actor->shutdown();
}
$this->nodes = [];
}
}
-78
View File
@@ -1,78 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
use JetBrains\PhpStorm\ArrayShape;
class ActorMessage implements \JsonSerializable
{
/**
* @var int
*/
private int $userId;
/**
* @var string
*/
private string $event;
/**
* @var array
*/
private array $body;
/**
* @param int $userId
* @param string $event
* @param array $body
*/
public function __construct(int $userId, string $event, array $body)
{
$this->userId = $userId;
$this->event = $event;
$this->body = $body;
}
/**
* @return int
*/
public function getUserId(): int
{
return $this->userId;
}
/**
* @return string
*/
public function getEvent(): string
{
return $this->event;
}
/**
* @return array
*/
public function getBody(): array
{
return $this->body;
}
/**
* @return array
*/
#[ArrayShape(['userId' => "int", 'event' => "string", 'body' => "array"])]
public function jsonSerialize(): array
{
return [
'userId' => $this->userId,
'event' => $this->event,
'body' => $this->body
];
}
}
-55
View File
@@ -1,55 +0,0 @@
<?php
namespace Kiri\Actor;
use Kiri\Server\Abstracts\BaseProcess;
use Swoole\Coroutine;
use Swoole\Process;
class ActorProcess extends BaseProcess
{
/**
* @var bool
*/
protected bool $enable_coroutine = true;
/**
* @return string
*/
public function getName(): string
{
// TODO: Change the autogenerated stub
return '[' . \config('id', 'system-service') . '].Actor Manager';
}
public function process(?Process $process): void
{
// TODO: Implement process() method.
while ($this->isStop() === false) {
$read = $process->read();
ActorManager::exec(json_decode($read, true));
Coroutine::sleep(1000 / 120);
}
}
/**
* @return $this
*/
public function onSigterm(): static
{
// TODO: Implement onSigterm() method.
Coroutine::create(function () {
$sign = Coroutine::waitSignal(SIGINT | SIGTERM);
if ($sign) {
$this->onShutdown(true);
}
});
return $this;
}
}
-18
View File
@@ -1,18 +0,0 @@
<?php
namespace Kiri\Actor;
enum ActorState
{
case IDLE;
case BUSY;
/**
*
*/
case CREATE;
case MESSAGE;
case SHUTDOWN;
}
+31 -1
View File
@@ -14,11 +14,16 @@ use Exception;
use JetBrains\PhpStorm\Pure;
use Kiri;
use Kiri\Error\StdoutLogger;
use ReflectionException;
use Kiri\Events\EventDispatch;
use Kiri\Events\EventProvider;
use Psr\Container\ContainerInterface;
/**
* Class Component
* @package Kiri\Base
* @property ContainerInterface $container
* @property EventDispatch $dispatch
* @property EventProvider $provider
*/
class Component implements Configure
{
@@ -96,4 +101,29 @@ class Component implements Configure
}
/**
* @return EventDispatch
*/
public function getDispatch(): EventDispatch
{
return Kiri::getDi()->get(EventDispatch::class);
}
/**
* @return EventProvider
*/
public function getProvider(): EventProvider
{
return Kiri::getDi()->get(EventProvider::class);
}
/**
* @return ContainerInterface
*/
public function getContainer(): ContainerInterface
{
return Kiri::getDi();
}
}
-13
View File
@@ -4,24 +4,11 @@ declare(strict_types=1);
namespace Kiri\Abstracts;
use Kiri;
use Psr\Container\ContainerInterface;
/**
* Class Providers
* @package Kiri\Abstracts
* @property-read ContainerInterface $container
*/
abstract class Providers extends Component implements Provider
{
/**
* @return ContainerInterface
*/
public function getContainer(): ContainerInterface
{
return Kiri::getDi();
}
}
-5
View File
@@ -10,17 +10,12 @@ declare(strict_types=1);
namespace Kiri;
use Exception;
use Kiri;
use Kiri\Abstracts\{BaseApplication, Kernel};
use Kiri\Di\Scanner;
use Kiri\Error\ErrorHandler;
use Kiri\Events\{OnAfterCommandExecute, OnBeforeCommandExecute};
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Symfony\Component\Console\{Application as ConsoleApplication,
Exception\ExceptionInterface,
Input\ArgvInput,
Output\ConsoleOutput,
Output\OutputInterface
+4 -5
View File
@@ -9,7 +9,7 @@ class Coordinator
const string WORKER_START = 'worker:start';
private bool $waite = false;
private bool $waite = true;
/**
@@ -17,10 +17,9 @@ class Coordinator
*/
public function yield(): void
{
if ($this->waite === false) {
return;
}
$this->yield();
while ($this->waite) {
usleep(1000);
}
}
+3 -15
View File
@@ -11,17 +11,12 @@ namespace Kiri\Error;
use Closure;
use ErrorException;
use Exception;
use Kiri\Abstracts\Component;
use Psr\Container\ContainerInterface;
use Kiri\Di\Inject\Container;
use ReflectionException;
use Kiri\Events\OnSystemError;
use Throwable;
/**
* Class ErrorHandler
* hahahah
* @package Kiri\Base
* @property-read $asError
*/
@@ -34,13 +29,6 @@ class ErrorHandler extends Component implements ErrorInterface
public string $category = 'app';
/**
* @var ContainerInterface
*/
#[Container(ContainerInterface::class)]
public ContainerInterface $container;
/**
* @param array|Closure|null $callback
* @return void
@@ -51,7 +39,7 @@ class ErrorHandler extends Component implements ErrorInterface
if (empty($callback)) {
$callback = [$this, 'exceptionHandler'];
} else if (is_array($callback) && is_string($callback[0])) {
$callback[0] = $this->container->get($callback[0]);
$callback[0] = \Kiri::getDi()->get($callback[0]);
}
set_exception_handler($callback);
}
@@ -67,7 +55,7 @@ class ErrorHandler extends Component implements ErrorInterface
if (empty($callback)) {
$callback = [$this, 'errorHandler'];
} else if (is_array($callback) && is_string($callback[0])) {
$callback[0] = $this->container->get($callback[0]);
$callback[0] = \Kiri::getDi()->get($callback[0]);
}
set_error_handler($callback);
}
@@ -83,7 +71,7 @@ class ErrorHandler extends Component implements ErrorInterface
if (empty($callback)) {
$callback = [$this, 'shutdown'];
} else if (is_array($callback) && is_string($callback[0])) {
$callback[0] = $this->container->get($callback[0]);
$callback[0] = \Kiri::getDi()->get($callback[0]);
}
register_shutdown_function($callback);
}
-1
View File
@@ -9,7 +9,6 @@ use Monolog\Formatter\LineFormatter;
use Monolog\Handler\RotatingFileHandler;
use Monolog\Logger;
use Psr\Log\LoggerInterface;
use ReflectionException;
/**
+14 -7
View File
@@ -9,11 +9,9 @@ declare(strict_types=1);
namespace Kiri\Redis;
use Exception;
use Kiri;
use Kiri\Exception\RedisConnectException;
use Kiri\Pool\Pool;
use RedisException;
use function config;
/**
@@ -133,7 +131,7 @@ SCRIPT;
*/
public function destroy(): void
{
$this->pool()->close($this->host);
$this->pool()->close($this->getName());
}
@@ -152,7 +150,7 @@ SCRIPT;
return trigger_print_error(throwable($throwable));
} finally {
if ($client->ping('h') == 'h') {
$this->pool()->push($this->host, $client);
$this->pool()->push($this->getName(), $client);
}
}
}
@@ -164,7 +162,7 @@ SCRIPT;
*/
private function getClient(): \Redis
{
return $this->pool()->get($this->host);
return $this->pool()->get($this->getName());
}
@@ -175,13 +173,22 @@ SCRIPT;
protected function pool(): Pool
{
$pool = Kiri::getPool();
if (!$pool->hasChannel($this->host)) {
$pool->created($this->host, $this->pool['max'], [$this, 'connect']);
if (!$pool->hasChannel($this->getName())) {
$pool->created($this->getName(), $this->pool['max'], [$this, 'connect']);
}
return $pool;
}
/**
* @return string
*/
private function getName(): string
{
return 'redis.' . $this->host;
}
/**
* @return \Redis
* @throws
+24
View File
@@ -1 +1,25 @@
<?php
use Swoole\Timer;
use Swoole\WebSocket\Server;
$client = new Server("0.0.0.0", 9511);
$client->addProcess(new Swoole\Process(function (\Swoole\Process $server) use ($client) {
while (true) {
echo json_encode($client->stats(), JSON_UNESCAPED_UNICODE) . PHP_EOL;
sleep(1);
}
}));
$client->on('open', function (Server $server) {
});
$client->on('message', function (Server $server, $frame) {
});
$client->on('close', function (Server $server, $fd) {
});
$client->start();
+50
View File
@@ -1 +1,51 @@
<?php
ini_set('memory_limit','64GB');
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use function Swoole\Coroutine\run;
function faker($page = 0)
{
$client = new Client('openapi.stupideyes.com', 443, true);
$client->get('/faker?offset=' . $page);
$client->close();
return json_decode($client->getBody(), true);
}
run(function () {
$offset = 1;
$success = 0;
for ($i = 1; $i <= 10000; $i++) {
$faker = faker($offset);
$offset++;
go(function () use ($faker, $offset, &$success) {
$socket = new Swoole\Coroutine\Http\Client('43.248.128.57', 14101);
$socket->upgrade("/ws?access_token=" . $faker['params']['token']);
if ($socket->connected) {
$success += 1;
while (true) {
$socket->recv();
// $socket->push('hello');
// var_dump($socket->recv());
Coroutine::sleep(0.1);
}
} else {
$success -= 1;
echo 'websocket fail: ' . socket_strerror($socket->errCode) . PHP_EOL;
}
});
Coroutine::sleep(0.1);
var_dump($success);
}
});