94 Commits

Author SHA1 Message Date
as2252258 6926a636b4 eee 2026-06-12 23:57:23 +08:00
as2252258 fe09da4a97 eee 2025-12-31 00:19:31 +08:00
as2252258 cab1ed999a update composer.json.
Signed-off-by: 向林 <as2252258@163.com>
2025-12-23 02:27:26 +00:00
as2252258 349e07b12c eee 2025-12-17 20:51:16 +08:00
as2252258 5fb1f0fc99 eee 2025-07-14 15:36:22 +08:00
as2252258 524d2b8664 eee 2024-09-04 10:14:30 +08:00
as2252258 f4a5a4dbee eee 2024-09-03 15:05:19 +08:00
as2252258 af613784fd eee 2024-09-03 14:47:29 +08:00
as2252258 6418b3288b eee 2024-04-24 14:31:06 +08:00
as2252258 a761bb3428 eee 2023-11-30 17:03:17 +08:00
as2252258 2ecb13c453 eee 2023-11-30 17:02:20 +08:00
as2252258 0a705b27bc eee 2023-10-24 17:22:31 +08:00
as2252258 10a64ee646 qqq 2023-07-31 23:09:00 +08:00
as2252258 493a58281f qqq 2023-05-26 09:20:30 +08:00
as2252258 18a9b29bbd qqq 2023-05-25 16:59:19 +08:00
as2252258 d72ff2f82c qqq 2023-05-20 23:05:38 +08:00
as2252258 de2698ee12 qqq 2023-05-16 15:21:50 +08:00
as2252258 b1bc0ff23c 变更 2022-12-12 17:31:12 +08:00
as2252258 7f28443d99 变更 2022-10-11 18:41:58 +08:00
as2252258 7ea28f47e1 变更 2022-09-23 19:02:26 +08:00
as2252258 c01dc5f41a 变更 2022-09-23 18:55:46 +08:00
as2252258 c190c749f4 modify plugin name 2022-06-17 13:44:40 +08:00
as2252258 45315dcbb2 modify plugin name 2022-06-17 13:44:24 +08:00
as2252258 8d20f56660 modify plugin name 2022-06-16 18:49:33 +08:00
as2252258 348f0129e8 modify plugin name 2022-06-16 18:07:27 +08:00
as2252258 6371d5271e modify plugin name 2022-06-16 17:38:23 +08:00
as2252258 8f9de7a508 modify plugin name 2022-06-08 17:19:32 +08:00
as2252258 2a6b2a1979 modify plugin name 2022-06-08 15:08:07 +08:00
as2252258 b79513757b modify plugin name 2022-06-08 14:32:20 +08:00
as2252258 b495927099 modify plugin name 2022-06-08 14:27:49 +08:00
as2252258 188742360e 变更 2022-05-31 20:41:31 +08:00
as2252258 6715743843 变更 2022-05-31 11:54:06 +08:00
as2252258 f94a8a3792 变更 2022-05-31 11:43:06 +08:00
as2252258 0ad17ae4c5 变更 2022-05-31 11:11:12 +08:00
as2252258 95772e794b 变更 2022-05-31 11:03:52 +08:00
as2252258 9473384c18 变更 2022-05-31 11:02:15 +08:00
as2252258 152049fdae 变更 2022-05-31 10:59:22 +08:00
as2252258 4c6739a83b 变更 2022-05-31 10:53:48 +08:00
as2252258 5f81fd5ff9 变更 2022-05-31 10:53:33 +08:00
as2252258 cf8208bd12 变更 2022-05-31 10:02:47 +08:00
as2252258 0d2989171e 变更 2022-05-04 03:53:37 +08:00
as2252258 4b50c3fc18 变更 2022-05-04 03:32:10 +08:00
as2252258 ad346d01d1 变更 2022-05-04 03:29:14 +08:00
as2252258 c108de48af 变更 2022-05-04 03:25:37 +08:00
as2252258 3192b5940e 变更 2022-05-04 03:22:31 +08:00
as2252258 1d3b71b023 变更 2022-05-04 03:19:28 +08:00
as2252258 abaa2547f7 变更 2022-05-04 03:16:52 +08:00
as2252258 c982e44631 变更 2022-05-04 03:12:41 +08:00
as2252258 87d47e0971 变更 2022-05-04 03:11:10 +08:00
as2252258 072db59b9f 变更 2022-05-04 03:08:46 +08:00
as2252258 2620e88447 变更 2022-05-04 02:55:14 +08:00
as2252258 f3cd9b4b7c modify plugin name 2022-03-03 18:30:59 +08:00
as2252258 e270f9c588 modify plugin name 2022-03-03 16:39:46 +08:00
as2252258 f2cf841573 modify plugin name 2022-03-02 16:47:02 +08:00
as2252258 7a3299cb10 modify plugin name 2022-03-02 16:45:12 +08:00
as2252258 2ad3809736 modify plugin name 2022-03-02 16:27:15 +08:00
as2252258 219d246d4a modify plugin name 2022-03-02 15:15:36 +08:00
as2252258 a3dfcf02e6 modify plugin name 2022-02-28 14:59:34 +08:00
as2252258 688b2381da modify plugin name 2022-02-23 16:32:08 +08:00
as2252258 8824f711bb Revert "改名"
This reverts commit fdf58326
2022-01-20 19:04:16 +08:00
as2252258 f2d97832b5 Revert "改名"
This reverts commit fdf58326
2022-01-19 17:27:47 +08:00
as2252258 855e57a651 Revert "改名"
This reverts commit fdf58326
2022-01-19 16:39:29 +08:00
as2252258 0b3310b82e Revert "改名"
This reverts commit fdf58326
2022-01-14 11:29:16 +08:00
as2252258 e8cf7f29f0 Revert "改名"
This reverts commit fdf58326
2022-01-13 19:02:07 +08:00
as2252258 62e3b001f0 Revert "改名"
This reverts commit fdf58326
2022-01-13 19:00:01 +08:00
as2252258 f655cebdf3 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:58:40 +08:00
as2252258 ee492d97ad Revert "改名"
This reverts commit fdf58326
2022-01-13 18:57:53 +08:00
as2252258 a16d841d98 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:56:41 +08:00
as2252258 f015662fd6 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:52:42 +08:00
as2252258 021dcae59b Revert "改名"
This reverts commit fdf58326
2022-01-13 18:50:19 +08:00
as2252258 c6dc34c821 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:47:34 +08:00
as2252258 88c3958aba Revert "改名"
This reverts commit fdf58326
2022-01-13 18:42:42 +08:00
as2252258 24ebfac360 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:41:16 +08:00
as2252258 b3c200b443 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:34:23 +08:00
as2252258 3e2b50bf60 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:33:56 +08:00
as2252258 89c142930e Revert "改名"
This reverts commit fdf58326
2022-01-13 18:29:51 +08:00
as2252258 1515d9695c Revert "改名"
This reverts commit fdf58326
2022-01-13 18:28:22 +08:00
as2252258 f1f7b1081d Revert "改名"
This reverts commit fdf58326
2022-01-13 18:27:36 +08:00
as2252258 07533e29f9 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:25:29 +08:00
as2252258 4d60a24fac Revert "改名"
This reverts commit fdf58326
2022-01-12 14:10:33 +08:00
as2252258 4dc6bc661a Revert "改名"
This reverts commit fdf58326
2022-01-11 17:56:16 +08:00
as2252258 0d7fd5e356 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:53:41 +08:00
as2252258 da16b0685d Revert "改名"
This reverts commit fdf58326
2022-01-11 14:51:50 +08:00
as2252258 febbdea8c8 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:44:02 +08:00
as2252258 a45c3d875c Revert "改名"
This reverts commit fdf58326
2022-01-11 14:42:04 +08:00
as2252258 64d01c0a80 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:41:06 +08:00
as2252258 bf5fe594e4 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:30:58 +08:00
as2252258 5cfe3c6d6d Revert "改名"
This reverts commit fdf58326
2022-01-11 14:30:05 +08:00
as2252258 773e3c0f57 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:26:57 +08:00
as2252258 b1c91343ef Revert "改名"
This reverts commit fdf58326
2022-01-11 14:25:24 +08:00
as2252258 3f821ca9d0 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:23:48 +08:00
as2252258 d296b3107e Revert "改名"
This reverts commit fdf58326
2022-01-11 14:15:51 +08:00
as2252258 f606c58204 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:13:51 +08:00
as2252258 6c1d986814 Revert "改名"
This reverts commit fdf58326
2022-01-10 11:39:56 +08:00
20 changed files with 853 additions and 910 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ namespace PHPSTORM_META {
// Reflect
use Kiri\Di\Container;
use Psr\Container\ContainerInterface;
use Kiri\Di\ContainerInterface;
override(ContainerInterface::get(0), map('@'));
override(Container::get(0), map('@'));
+68
View File
@@ -0,0 +1,68 @@
<?php
namespace Kiri\Rpc;
use Kiri\Core\Json;
use Kiri\Core\Number;
use Kiri\Di\Inject\Container;
abstract class AbstractRpcClient
{
public string $service = '';
public string $version = '';
/**
* @var JsonRpcTransporterInterface
*/
#[Container(JsonRpcTransporterInterface::class)]
private JsonRpcTransporterInterface $transporter;
/**
* @param JsonRpcTransporterInterface $transporter
*/
public function setTransporter(JsonRpcTransporterInterface $transporter): void
{
$this->transporter = $transporter;
}
/**
* @return string
*/
public function getService(): string
{
if (empty($this->service)) {
return get_called_class();
}
return $this->service;
}
/**
* @param string $method
* @param ...$args
* @return string|bool
*/
protected function send(string $method, ...$args): string|bool
{
$result = $this->transporter->push(Json::encode([
'jsonrpc' => $this->version,
'service' => $this->getService(),
'method' => $method,
'params' => $args,
'id' => Number::create(time())
]), $this->getService());
if (is_string($result)) {
return json_decode($result, true);
} else {
return $result;
}
}
}
-91
View File
@@ -1,91 +0,0 @@
<?php
namespace Kiri\Rpc\Annotation;
use Kiri\Abstracts\Config;
use Kiri\Core\Network;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Rpc\RpcManager;
use Kiri\Annotation\Attribute;
use ReflectionException;
#[\Attribute(\Attribute::TARGET_CLASS)] class JsonRpc extends Attribute
{
private string $uniqueId = '';
/**
* @param string $service
* @param string $driver
* @param array $tags
* @param array $meta
* @param array $checkOptions
* @param string $checkUrl
*/
public function __construct(public string $service, public string $driver, public array $tags = [], public array $meta = [], public array $checkOptions = [], public string $checkUrl = '')
{
$this->uniqueId = preg_replace('/(\w{11})(\w{4})(\w{3})(\w{8})(\w{6})/', '$1-$2-$3-$4-$5', md5(__DIR__ . 'Annotation' . md5(Network::local())));
}
/**
* @param mixed $class
* @param mixed|string $method
* @return mixed
* @throws ReflectionException
* @throws ConfigException
*/
public function execute(mixed $class, mixed $method = ''): bool
{
return Kiri::getDi()->get(RpcManager::class)->add($this->service, $class, $this->create());
}
/**
* @throws ConfigException
*/
protected function create(): array
{
$rpcPort = Config::get('rpc.port');
if (empty($this->checkUrl)) {
$this->checkUrl = Network::local() . ":" . Config::get('rpc.port');
}
$defaultConfig = [
"ID" => "rpc.json.{$this->service}." . $this->uniqueId,
"Name" => $this->service,
"EnableTagOverride" => false,
"TaggedAddresses" => [
"lan_ipv4" => [
"address" => "127.0.0.1",
"port" => $rpcPort
],
"wan_ipv4" => [
"address" => Network::local(),
"port" => $rpcPort
]
],
"Check" => [
"CheckId" => "service:rpc.json.{$this->service}." . $this->uniqueId,
"Name" => "service " . $this->service . ' health check',
"Annotations" => "Script based health check",
"ServiceID" => $this->service,
"TCP" => $this->checkUrl,
"Interval" => "5s",
"Timeout" => "1s",
"DeregisterCriticalServiceAfter" => "30s"
],
];
if (!empty($this->meta)) {
$defaultConfig["Meta"] = $this->meta;
}
if (!empty($this->tags)) {
$defaultConfig["tags"] = $this->tags;
}
return $defaultConfig;
}
}
+84 -46
View File
@@ -3,13 +3,13 @@
namespace Kiri\Rpc;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Pool\Alias;
use Kiri\Pool\Pool;
use Swoole\Client;
use Kiri\Server\Events\OnBeforeShutdown;
use ReflectionException;
use Swoole\Coroutine\Client as CoroutineClient;
use Swoole\Client as AsyncClient;
/**
@@ -18,59 +18,97 @@ use Swoole\Client;
class ClientPool extends Component
{
const POOL_NAME = 'rpc.client.pool';
use Alias;
public int $max;
public int $max;
public int $min;
public int $min;
private array $names = [];
public int $waite;
/**
* @return void
*/
public function init(): void
{
on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
}
/**
* @param $config
* @param callable $callback
* @return mixed
* @throws ConfigException
* @throws Exception
*/
public function get($config, callable $callback): mixed
{
$coroutineName = $this->name(self::POOL_NAME . '::' . $config['Address'] . '::' . $config['Port'], true);
$pool = $config['pool'] ?? ['min' => 1, 'max' => 100];
return $this->getPool()->get($coroutineName, $callback, $pool['min'] ?? 1);
}
/**
* @return void
* @throws Exception
*/
public function onBeforeShutdown(): void
{
$pool = Kiri::getDi()->get(Pool::class);
foreach ($this->names as $name) {
$pool->clean($name);
}
}
/**
* @param \Swoole\Coroutine\Client|Client $client
* @param $host
* @param $port
* @throws ConfigException
* @throws Exception
*/
public function push(\Swoole\Coroutine\Client|Client $client, $host, $port)
{
$coroutineName = $this->name(self::POOL_NAME . '::' . $host . '::' . $port, true);
/**
* @param $config
* @return resource
* @throws Exception
*/
public function get($config): mixed
{
$coroutineName = $config['Address'] . '::' . $config['Port'];
$this->getPool()->push($coroutineName, $client);
}
if (!in_array($coroutineName, $this->names)) {
$this->names[] = $coroutineName;
}
return $this->getPool($config['Address'], $config['Port'])->get($coroutineName);
}
/**
* @return Pool
* @throws Exception
*/
public function getPool(): Pool
{
return Kiri::getDi()->get(Pool::class);
}
/**
* @param CoroutineClient|AsyncClient $client
* @param $host
* @param $port
* @throws Exception
*/
public function push(CoroutineClient|AsyncClient $client, $host, $port)
{
$this->getPool($host, $port)->push($host . '::' . $port, $client);
}
/**
* @param string $host
* @param int $port
* @return Pool
* @throws ReflectionException
*/
public function getPool(string $host, int $port): Pool
{
$pool = Kiri::getDi()->get(Pool::class);
$pool->created($host . '::' . $port, 10, [$this, 'connect']);
return $pool;
}
/**
* @param $host
* @param $port
* @return CoroutineClient|AsyncClient
* @throws Exception
*/
public function connect($host, $port): CoroutineClient|AsyncClient
{
if (Kiri\Di\Context::inCoroutine()) {
$client = new CoroutineClient(SWOOLE_SOCK_TCP);
} else {
$client = new AsyncClient(SWOOLE_SOCK_TCP);
}
if (!$client->connect($host, $port, 3)) {
throw new Exception('Connect ' . $host . '::' . $port . ' fail');
}
return $client;
}
}
+90
View File
@@ -0,0 +1,90 @@
<?php
use Etcd\Client;
use GuzzleHttp\Exception\BadResponseException;
use Kiri\Abstracts\Component;
use Kiri\Core\Str;
/**
*
*/
class Etcd extends Component
{
private Client $client;
private bool $isEnd = false;
private array $config = [];
private array|BadResponseException $grant;
/**
* @return void
* @throws Exception
*/
public function init(): void
{
$this->client = new Client('47.92.194.207:' . 2379, 'v3');
$this->grant = $this->client->grant(60);
if ($this->grant instanceof BadResponseException) {
throw new Exception($this->grant->getMessage());
}
$key = 'center.service.' . gethostbyname(gethostname());
\pcntl_signal(SIGINT, function () use ($key) {
$this->isEnd = true;
$this->client->del($key);
});
$this->client->put($key, json_encode([
'address' => gethostbyname(gethostname()) . ':10240',
'nodeId' => Str::rand(32)
]), ['lease' => (int)$this->grant["ID"]]);
}
/**
* @param string $key
* @return mixed
*/
public function get(string $key): mixed
{
return $this->config[$key] ?? null;
}
/**
* @param string $key
* @param mixed $value
* @return void
* @throws Exception
*/
public function put(string $key, mixed $value): void
{
$result = $this->client->put($key, $value);
if ($result instanceof BadResponseException) {
throw new Exception($result->getMessage());
}
$this->config[$key] = $value;
}
/**
* @return void
*/
public function waite(): void
{
while ($this->isEnd == false) {
$this->client->keepAlive((int)$this->grant["ID"]);
sleep(1);
}
}
}
+2 -1
View File
@@ -4,6 +4,7 @@ namespace Kiri\Rpc;
use JetBrains\PhpStorm\Pure;
use Throwable;
class InvalidRpcParamsException extends \Exception
@@ -15,7 +16,7 @@ class InvalidRpcParamsException extends \Exception
* @param int $code
* @param Throwable|null $previous
*/
public function __construct($message = "", $code = 0, Throwable $previous = null)
#[Pure] public function __construct(string $message = "", int $code = 0, Throwable $previous = null)
{
parent::__construct($message, -32602, $previous);
}
-150
View File
@@ -1,150 +0,0 @@
<?php
namespace Kiri\Rpc;
use Exception;
use Http\Message\ServerRequest;
use Http\Message\Stream;
use Kiri\Core\Number;
use Kiri\Kiri;
use Kiri\Pool\Pool;
use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
/**
*
*/
abstract class JsonRpcConsumers implements OnRpcConsumerInterface
{
/**
* @var Pool
*/
public Pool $pool;
protected string $name = '';
/**
* @param string $method
* @param mixed $data
* @param string $version
* @throws Exception
* @throws ClientExceptionInterface
*/
public function notify(string $method, mixed $data, string $version = '2.0'): void
{
$config = $this->get_consul($this->name);
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
$transporter->withConfig($config)->sendRequest(
$this->requestBody([
'jsonrpc' => $version,
'service' => $this->name,
'method' => $method,
'params' => $data,
])
);
}
/**
* @param array $data
* @return ServerRequestInterface
* @throws \ReflectionException
*/
private function requestBody(array $data): ServerRequestInterface
{
$server = Kiri::getDi()->get(ServerRequest::class);
return $server->withBody(new Stream(json_encode($data)));
}
/**
* @param string $method
* @param mixed $data
* @param string $version
* @param string $id
* @return mixed
* @throws Exception
* @throws ClientExceptionInterface
*/
public function get(string $method, mixed $data, string $version = '2.0', string $id = ''): ResponseInterface
{
if (empty($id)) $id = Number::create(time());
$config = $this->get_consul($this->name);
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
return $transporter->withConfig($config)->sendRequest(
$this->requestBody([
'jsonrpc' => $version,
'service' => $this->name,
'method' => $method,
'params' => $data,
'id' => $id
])
);
}
/**
* @param array $data
* @return mixed
* @throws ClientExceptionInterface
* @throws Exception
*/
public function batch(array $data): mixed
{
$config = $this->get_consul($this->name);
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
return $transporter->withConfig($config)->sendRequest(
$this->requestBody($data)
);
}
/**
* @param $service
* @return array
* @throws RpcServiceException|\ReflectionException
* @throws Exception
*/
private function get_consul($service): array
{
if (empty($service)) {
throw new RpcServiceException('You need set rpc service name if used.');
}
$sf = Kiri::getDi()->get(RpcManager::class)->getServices($service);
if (empty($sf) || !is_array($sf)) {
throw new RpcServiceException('You need set rpc service name if used.');
}
return $this->_loadRand($sf);
}
/**
* @param $services
* @return array
*/
private function _loadRand($services): array
{
$array = [];
foreach ($services as $value) {
$value['Weight'] = $value['Weights']['Passing'];
$array[] = $value;
}
if (count($array) < 2) {
$luck = $array[0];
} else {
$luck = Luckdraw::luck($array, 'Weight');
}
return [
'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'],
'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port']
];
}
}
+18 -37
View File
@@ -2,66 +2,47 @@
namespace Kiri\Rpc;
use Kiri\Annotation\Inject;
use Exception;
use Http\Message\Response;
use Http\Message\Stream;
use Kiri\Abstracts\Config;
use Kiri\Di\Inject\Container;
use Kiri\Exception\ConfigException;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Swoole\Coroutine\Client;
use ReflectionException;
class JsonRpcPoolTransporter implements ClientInterface
class JsonRpcPoolTransporter implements JsonRpcTransporterInterface
{
use TraitTransporter;
#[Inject(ClientPool::class)]
#[Container(ClientPool::class)]
public ClientPool $pool;
const POOL_NAME = 'rpc.client.pool';
/**
*/
public function init()
/**
* @param string $content
* @param string $service
* @return string|bool
* @throws RpcServiceException
* @throws ReflectionException
*/
public function push(string $content, string $service): string|bool
{
}
$client = $this->get_consul($service)->getClient();
/**
* @param RequestInterface $request
* @return ResponseInterface
* @throws Exception
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
$content = $request->getBody()->getContents();
$response = $this->request($client = $this->getClient(), $content, false);
$response = $this->request($client, $content);
$this->pool->push($client, $this->config['Address'], $this->config['Port']);
return (new Response())->withBody(new Stream($response));
return $response;
}
/**
* @return Client|\Swoole\Client
* @throws ConfigException
* @throws Exception
* @throws Exception
*/
private function getClient(): Client|\Swoole\Client
private function getClient()
{
$this->config['pool'] = Config::get('rpc.pool', ['max' => 10, 'min' => 1, 'waite' => 60]);
return $this->pool->get($this->config, function () {
return $this->newClient();
});
return $this->pool->get($this->config);
}
+13 -13
View File
@@ -2,17 +2,13 @@
namespace Kiri\Rpc;
use Http\Message\Response;
use Http\Message\Stream;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Exception;
/**
*
*/
class JsonRpcTransporter implements ClientInterface
class JsonRpcTransporter implements JsonRpcTransporterInterface
{
@@ -20,17 +16,21 @@ class JsonRpcTransporter implements ClientInterface
/**
* @param RequestInterface $request
* @return ResponseInterface
* @throws \Exception
* @param string $content
* @param string $service
* @return string|bool
* @throws RpcServiceException
* @throws Exception
*/
public function sendRequest(RequestInterface $request): ResponseInterface
public function push(string $content, string $service): string|bool
{
$content = $request->getBody()->getContents();
$client = $this->get_consul($service)->newClient();
$response = $this->request($this->newClient(), $content, true);
$body = $this->request($client, $content);
return (new Response())->withBody(new Stream($response));
$client->close();
return $body;
}
+15
View File
@@ -0,0 +1,15 @@
<?php
namespace Kiri\Rpc;
interface JsonRpcTransporterInterface
{
/**
* @param string $content
* @param string $service
* @return string|bool
*/
public function push(string $content, string $service): string|bool;
}
+185
View File
@@ -0,0 +1,185 @@
<?php
namespace Kiri\Rpc;
class LotteryDraw
{
/**
* @param array $goods
* @param string $wight
* @return mixed
* @uses $reward = Lucked::luck($data);
*/
public static function luck(array $goods, string $wight = 'probability'): mixed
{
static $class = null;
if ($class === null) $class = new LotteryDraw();
if (count($goods) < 1) return null;
$prob = $alias = [];
$class->ket($array = $class->init($goods, $wight), $prob, $alias);
$result = $class->generation($array, $prob, $alias);
if (!isset($goods[$result])) {
return null;
}
return $goods[$result];
}
/**
* @param array $goods
* @param string $wight
* @return array
*/
protected function init(array $goods, string $wight): array
{
$defaultIndex = -1;
$array = [];
foreach ($goods as $key => $val) {
if ($val[$wight] == 0) $defaultIndex = $key;
$array[] = (float)$val[$wight];
}
$total = array_sum($array);
if ($total > 1) {
$array = $this->reset($array, $total);
$total = array_sum($array);
}
if (1 - $total !== floatval(0)) {
if ($defaultIndex == -1) {
$defaultIndex = count($goods);
}
$array[$defaultIndex] = 1 - array_sum($array);
}
return $array;
}
/**
* @param array $data
* @param float $total
* @return array
*/
protected function reset(array $data, float $total): array
{
$length = $this->getMin($data);
foreach ($data as $key => $val) {
$data[$key] = ($val * $length - ($val * $length) / ($total * $length)) / $length;
}
return $data;
}
/**
* @param $data
* @param $prob
* @param $alias
* @return mixed
*/
private function generation($data, $prob, $alias): mixed
{
$nums = count($prob) - 1;
$MAX_P = $this->getMin($data); // 假设最小的几率是万分之一
$coin_toss = mt_rand(1, $MAX_P) / $MAX_P; // 抛出硬币
$col = mt_rand(0, $nums); // 随机落在一列
$b_head = $coin_toss > $prob[$col]; // 判断是否落在原色
return $b_head ? $alias[$col] : $col;
}
/**
* @param $num
* @return int
*/
private function getMin($num): int
{
$length = $this->getFloatLength($this->mini($num));
return (int)sprintf('1%0' . $length . 'd', 0);
}
/**
* @param $num
* @return int|float
*/
protected function mini($num): int|float
{
$def = current($num);
foreach ($num as $val) {
if ($val < $def) {
$def = $val;
}
}
return $def;
}
/**
* @param $float
* @return int
*/
private function getFloatLength($float): int
{
$ex = explode('.', $float);
return strlen(end($ex));
}
/**
* @param array $data
* @param array $prob
* @param array $alias
*/
private function ket(array $data, array &$prob, array &$alias): void
{
$nums = count($data);
$small = $large = [];
for ($i = 0; $i < $nums; ++$i) {
$data[$i] = $data[$i] * $nums; // 扩大倍数,使每列高度可为1
/** 分到两个数组,便于组合 */
if ($data[$i] < 1) {
$small[] = $i;
} else {
$large[] = $i;
}
}
/** 将超过1的色块与原色拼凑成1 */
while (!empty($small) && !empty($large)) {
$n_index = array_shift($small);
$a_index = array_shift($large);
$prob[$n_index] = $data[$n_index];
$alias[$n_index] = $a_index;
// 重新调整大色块
$data[$a_index] = ($data[$a_index] + $data[$n_index]) - 1;
if ($data[$a_index] < 1) {
$small[] = $a_index;
} else {
$large[] = $a_index;
}
}
/** 剩下大色块都设为1 */
while (!empty($large)) {
$n_index = array_shift($large);
$prob[$n_index] = 1;
}
/** 一般是精度问题才会执行这一步 */
while (!empty($small)) {
$n_index = array_shift($small);
$prob[$n_index] = 1;
}
}
}
-151
View File
@@ -1,151 +0,0 @@
<?php
namespace Kiri\Rpc;
use Exception;
class Luckdraw
{
/**
* @param array $goods
* @param string $wight
* @return mixed
* @array = [
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ['name'=> '商品名称', 'probability'=> '概率', 'total'=> '库存'],
* ]
*
* @uses $reward = Lucked::luck($data);
*/
public static function luck(array $goods, string $wight = 'probability'): mixed
{
static $class = null;
if ($class === null) $class = new Luckdraw();
if (empty($goods)) return null;
$array = $prob = $alias = [];
$defaultIndex = 0;
foreach ($goods as $key => $val) {
if ($val[$wight] == 0) $defaultIndex = $key;
$array[] = (float)$val[$wight];
}
$array[$defaultIndex] = 1 - array_sum($array);
$class->ket($array, $prob, $alias);
$result = $class->generation($array, $prob, $alias);
if (!isset($goods[$result])) {
return null;
}
return $goods[$result];
}
/**
* @param $data
* @param $prob
* @param $alias
* @return mixed
*/
private function generation($data, $prob, $alias): mixed
{
$nums = count($prob) - 1;
$MAX_P = $this->getMin($data); // 假设最小的几率是万分之一
$coin_toss = rand(1, $MAX_P) / $MAX_P; // 抛出硬币
$col = rand(0, $nums); // 随机落在一列
$b_head = $coin_toss < $prob[$col]; // 判断是否落在原色
return $b_head ? $col : @$alias[$col];
}
/**
* @param $num
* @return string
*/
private function getMin($num): string
{
$def = current($num);
foreach ($num as $val) {
if ($val < $def) {
$def = $val;
}
}
$length = $this->getFloatLength($def) + 1;
return sprintf('1%0' . $length . 'd', 0);
}
/**
* @param $float
* @return int
*/
private function getFloatLength($float): int
{
$ex = explode('.', 1 - $float);
return strlen(end($ex));
}
/**
* @param array $data
* @param array $prob
* @param array $alias
*/
private function ket(array $data, array &$prob, array &$alias)
{
$nums = count($data);
$small = $large = [];
for ($i = 0; $i < $nums; ++$i) {
$data[$i] = $data[$i] * $nums; // 扩大倍数,使每列高度可为1
/** 分到两个数组,便于组合 */
if ($data[$i] < 1) {
$small[] = $i;
} else {
$large[] = $i;
}
}
/** 将超过1的色块与原色拼凑成1 */
while (!empty($small) && !empty($large)) {
$n_index = array_shift($small);
$a_index = array_shift($large);
$prob[$n_index] = $data[$n_index];
$alias[$n_index] = $a_index;
// 重新调整大色块
$data[$a_index] = ($data[$a_index] + $data[$n_index]) - 1;
if ($data[$a_index] < 1) {
$small[] = $a_index;
} else {
$large[] = $a_index;
}
}
/** 剩下大色块都设为1 */
while (!empty($large)) {
$n_index = array_shift($large);
$prob[$n_index] = 1;
}
/** 一般是精度问题才会执行这一步 */
while (!empty($small)) {
$n_index = array_shift($small);
$prob[$n_index] = 1;
}
}
}
-14
View File
@@ -1,14 +0,0 @@
<?php
namespace Kiri\Rpc;
use Psr\Http\Client\ClientInterface;
/**
* @mixin JsonRpcTransporter
*/
interface RpcClientInterface extends ClientInterface
{
}
+208 -227
View File
@@ -2,35 +2,26 @@
namespace Kiri\Rpc;
use Http\Constrict\RequestInterface;
use Http\Handler\Handler;
use Http\Handler\Router;
use Http\Message\ServerRequest;
use Exception;
use JetBrains\PhpStorm\ArrayShape;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Consul\Agent;
use Kiri\Context;
use Kiri\Core\Json;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Annotation\Inject;
use Kiri\Annotation\Annotation;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnServerBeforeStart;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ServerRequestInterface;
use ReflectionException;
use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface;
use Server\Contract\OnReceiveInterface;
use Server\Events\OnBeforeShutdown;
use Server\Events\OnServerBeforeStart;
use Server\Events\OnTaskerStart;
use Server\Events\OnWorkerStart;
use Swoole\Coroutine;
use Kiri\Router\RouterCollector;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\Channel;
use Swoole\Server;
use Swoole\Timer;
use Kiri\Router\DataGrip;
/**
*
@@ -39,235 +30,225 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
{
#[Inject(Router::class)]
public Router $router;
public RouterCollector $collector;
#[Inject(Annotation::class)]
public Annotation $annotation;
/**
* @param ContainerInterface $container
* @param DataGrip $dataGrip
* @param RpcManager $manager
* @param EventProvider $eventProvider
* @throws Exception
*/
public function __construct(public ContainerInterface $container,
public DataGrip $dataGrip,
public RpcManager $manager,
public EventProvider $eventProvider)
{
parent::__construct();
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
public function init(): void
{
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
private RpcManager $manager;
/**
*
* @throws \Exception
*/
public function init(): void
{
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
scan_directory(APP_PATH . 'rpc', 'Rpc');
$this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
$this->manager = Kiri::getDi()->get(RpcManager::class);
}
$this->collector = $this->dataGrip->get('rpc');
}
/**
* @param OnBeforeShutdown $beforeShutdown
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown)
{
$doneList = $this->manager->doneList();
$agent = $this->container->get(Agent::class);
foreach ($doneList as $value) {
$agent->service->deregister($value['config']['ID']);
$agent->checks->deregister($value['config']['Check']['CheckId']);
}
}
/**
* @param OnBeforeShutdown $beforeShutdown
* @return void
*/
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void
{
if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) {
return;
}
}
/**
* @param OnWorkerStart|OnTaskerStart $server
* @throws ConfigException
*/
public function consulWatches(OnWorkerStart|OnTaskerStart $server)
{
if ($server->workerId != 0) {
return;
}
$async_time = (int)Config::get('consul.async_time', 1000);
Timer::tick($async_time, static function ($timeId) {
if (env('state', 'start') == 'exit') {
Timer::clear($timeId);
return;
}
Kiri::getDi()->get(RpcManager::class)->tick();
});
}
/**
* @param OnServerBeforeStart $server
*/
public function register(OnServerBeforeStart $server)
{
}
/**
* @param OnServerBeforeStart $server
*/
public function register(OnServerBeforeStart $server)
{
$this->manager->register();
}
/**
* @param Server $server
* @param int $fd
*/
public function onConnect(Server $server, int $fd): void
{
// TODO: Implement onConnect() method.
}
/**
* @param Server $server
* @param int $fd
*/
public function onConnect(Server $server, int $fd): void
{
// TODO: Implement onConnect() method.
}
/**
* @param Server $server
* @param int $fd
* @param int $reactor_id
* @param string $data
* @return bool
*/
public function onReceive(Server $server, int $fd, int $reactor_id, string $data): bool
{
try {
$data = json_decode($data, true);
if (is_null($data)) return $server->send($fd, 'success', $reactor_id);
$data = json_decode($data, true);
if (!is_array($data)) {
throw new Exception('Parse error语法解析错误', -32700);
}
if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') {
throw new Exception('Invalid Request无效请求', -32600);
}
return $server->send($fd, $this->batchDispatch($data), $reactor_id);
} catch (\Throwable $throwable) {
$response = Json::encode($this->failure(-32700, $throwable->getMessage()));
$this->getLogger()->json_log($throwable);
return $server->send($fd, $response, $reactor_id);
}
}
/**
* @param Server $server
* @param int $fd
* @param int $reactor_id
* @param string $data
*/
public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void
{
$data = json_decode($data, true);
if (is_null($data)) {
$this->failure(-32700, 'Parse error语法解析错误');
} else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') {
$this->failure(-32600, 'Invalid Request无效请求');
} else {
$this->batchDispatch($server, $fd, $data);
}
}
/**
* @param array $data
* @return string|bool
*/
private function batchDispatch(array $data): string|bool
{
if (isset($data['jsonrpc'])) {
$result = $this->dispatch($data);
if (!isset($data['id'])) {
$result = [1];
}
} else {
$channel = new Channel($total = count($data));
foreach ($data as $datum) {
$this->_execute($channel, $datum);
}
$result = [];
for ($i = 0; $i < $total; $i++) {
$result[] = $channel->pop();
}
}
return json_encode($result, JSON_UNESCAPED_UNICODE);
}
/**
* @param Server $server
* @param int $fd
* @param array $data
* @return void
*/
private function batchDispatch(Server $server, int $fd, array $data): void
{
if (isset($data['jsonrpc'])) {
$dispatch = $this->dispatch($data);
if (!isset($data['id'])) {
$dispatch = [1];
}
$result = json_encode($dispatch, JSON_UNESCAPED_UNICODE);
} else {
$channel = new Channel($total = count($data));
foreach ($data as $datum) {
$this->_execute($channel, $datum);
}
$result = [];
for ($i = 0; $i < $total; $i++) {
$result[] = $channel->pop();
}
}
$server->send($fd, json_encode($result, JSON_UNESCAPED_UNICODE));
}
/**
* @param $channel
* @param $datum
*/
private function _execute($channel, $datum)
{
Coroutine::create(function () use ($channel, $datum) {
if (empty($datum) || !isset($datum['jsonrpc'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else if (!isset($datum['method'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else {
$dispatch = $this->dispatch($datum);
if (!isset($dispatch['id'])) {
$dispatch = [1];
}
$channel->push($dispatch);
}
});
}
/**
* @param $channel
* @param $datum
*/
private function _execute($channel, $datum)
{
Coroutine::create(function () use ($channel, $datum) {
if (empty($datum) || !isset($datum['jsonrpc'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else if (!isset($datum['method'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else {
$dispatch = $this->dispatch($datum);
if (!isset($dispatch['id'])) {
$dispatch = [1];
}
$channel->push($dispatch);
}
});
}
/**
* @param $data
* @return array
*/
private function dispatch($data): array
{
// try {
// $class = $this->collector->query($data['service'], 'tcp');
// if (!$this->container->has($class)) {
// throw new Exception('Handler not found', -32601);
// }
// $controller = $this->container->get($class);
// if (!method_exists($controller, $data['method'])) {
// throw new Exception('Method not found', -32601);
// }
// if (!isset($data['params']) || !is_array($data['params'])) {
// $data['params'] = [];
// }
// return $this->handler($controller, $data['method'], $data['params']);
// } catch (\Throwable $throwable) {
// $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode();
// return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null);
// }
return $this->failure(404, 'Not found.');
}
/**
* @param $data
* @return array
*/
private function dispatch($data): array
{
try {
[$handler, $params] = $this->container->get(RpcManager::class)->get($data['service'], $data['method']);
if (is_null($handler)) {
throw new \Exception('Method not found', -32601);
} else {
Context::setContext(RequestInterface::class, $this->createServerRequest($params));
return $this->handler($handler);
}
} catch (\Throwable $throwable) {
$code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode();
return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null);
}
}
/**
* @param $controller
* @param string $method
* @param $params
* @return array
*/
#[ArrayShape([])]
private function handler($controller, string $method, $params): array
{
$result = call_user_func([$controller, $method], ...$params);
return [
'jsonrpc' => '2.0',
'result' => $result,
'id' => $data['id'] ?? null
];
}
/**
* @param $params
* @return ServerRequestInterface
* @throws \Exception
*/
private function createServerRequest($params): ServerRequestInterface
{
return (new ServerRequest())->withParsedBody($params);
}
/**
* @param $code
* @param $message
* @param array $data
* @param null $id
* @return array
*/
#[ArrayShape([])]
protected function failure($code, $message, array $data = [], $id = null): array
{
$error = [
'jsonrpc' => '2.0',
'error' => [
'code' => $code,
'message' => $message,
'data' => $data
]
];
if (!is_null($id)) {
$error['id'] = $id;
}
return $error;
}
/**
* @param Handler $handler
* @return array
*/
private function handler(Handler $handler): array
{
return [
'jsonrpc' => '2.0',
'result' => call_user_func($handler->callback, ...$handler->params),
'id' => $data['id'] ?? null
];
}
/**
* @param $code
* @param $message
* @param array $data
* @param null $id
* @return array
*/
protected function failure($code, $message, array $data = [], $id = null): array
{
$error = [
'jsonrpc' => '2.0',
'error' => [
'code' => $code,
'message' => $message,
'data' => $data
]
];
if (!is_null($id)) {
$error['id'] = $id;
}
return $error;
}
/**
* @param int $fd
*/
public function onClose(int $fd): void
{
// TODO: Implement onClose() method.
}
/**
* @param Server $server
* @param int $fd
* @return void
*/
public function OnClose(Server $server, int $fd): void
{
// TODO: Implement onClose() method.
}
}
+23 -122
View File
@@ -2,137 +2,38 @@
namespace Kiri\Rpc;
use Http\Handler\Handler;
use Etcd\Client;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Consul\Agent;
use Kiri\Consul\Health;
use Kiri\Kiri;
use ReflectionException;
use PhpParser\Node\Stmt\Return_;
/**
* class RpcManager
*/
class RpcManager extends Component
{
/**
* @var array
*/
private array $_rpc = [];
protected array $services = [];
/**
* @param $serviceName
* @return void
* @throws ReflectionException
* @throws \Exception
*/
public function async($serviceName): void
{
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
if ($lists->getStatusCode() != 200) {
return;
}
$body = json_decode($lists->getBody(), true);
$file = storage('.rpc.clients.' . md5($serviceName), 'rpc');
if (!empty($body) && is_array($body)) {
file_put_contents($file, json_encode(array_column($body, 'Service')), LOCK_EX);
} else {
file_put_contents($file, json_encode([]), LOCK_EX);
}
}
/**
* @return void
*/
public function watch(): void
{
$data = new Client('','');
}
/**
* @throws ReflectionException
*/
public function tick(): void
{
foreach ($this->_rpc as $name => $list) {
$this->async($name);
}
}
/**
* @param $serviceName
* @return array
* @throws \Exception
*/
public function getServices($serviceName): array
{
$file = storage('.rpc.clients.' . md5($serviceName), 'rpc');
if (!file_exists($file) || filesize($file) < 10) {
$this->async($serviceName);
}
$content = json_decode(file_get_contents($file), true);
if (empty($content) || !is_array($content)) {
return [];
}
return $content;
}
/**
* @param string $name
* @param string $class
* @param array $serviceConfig
* @return bool
* @throws ReflectionException
*/
public function add(string $name, string $class, array $serviceConfig): bool
{
$methods = Kiri::getDi()->getReflect($class);
$lists = $methods->getMethods(\ReflectionMethod::IS_PUBLIC);
if (!isset($this->_rpc[$name])) {
$this->_rpc[$name] = ['methods' => [], 'id' => $serviceConfig['ID'], 'config' => $serviceConfig];
}
foreach ($lists as $reflection) {
if ($reflection->getDeclaringClass() != $class) {
continue;
}
$methodName = $reflection->getName();
$this->_rpc[$name]['methods'][$methodName] = [new Handler('/', [$class, $methodName]), null];
}
return true;
}
/**
* @return array
*/
public function doneList(): array
{
$array = [];
foreach ($this->_rpc as $list) {
$array[] = $list;
}
return $array;
}
/**
*/
public function register()
{
$agent = Kiri::getDi()->get(Agent::class);
foreach ($this->_rpc as $list) {
$data = $agent->service->register($list['config']);
if ($data->getStatusCode() != 200) {
return;
}
}
}
/**
* @param string $name
* @param string $method
* @return mixed
*/
public function get(string $name, string $method): array
{
return $this->_rpc[$name]['methods'][$method] ?? [null, null];
}
/**
* @param $service
* @return mixed
*/
public function getServices($service): mixed
{
return $this->services[$service] ?? null;
}
}
+40
View File
@@ -0,0 +1,40 @@
<?php
namespace Kiri\Rpc;
use Kiri\Di\Context;
use Kiri\Server\Processes\AbstractProcess;
use Swoole\Coroutine;
use Swoole\Process;
use function pcntl_signal;
class RpcProcess extends AbstractProcess
{
/**
* @return string
*/
public function getName(): string
{
return "Rpc Manager";
}
/**
* @param Process|null $process
* @return void
*/
public function process(?Process $process): void
{
}
/**
* @return void
*/
public function onSigterm(): void
{
}
}
+25
View File
@@ -0,0 +1,25 @@
<?php
use Kiri\Rpc\AbstractRpcClient;
class TestRpc extends AbstractRpcClient
{
public string $service = '';
/**
* @param $data
* @param $nba
* @return mixed
*/
public function test($data, $nba): mixed
{
$resp = $this->send(__FUNCTION__, $data, $nba);
return json_decode($resp, true);
}
}
+58 -33
View File
@@ -3,7 +3,9 @@
namespace Kiri\Rpc;
use Exception;
use Kiri\Context;
use JetBrains\PhpStorm\ArrayShape;
use Kiri\Di\Context;
use Kiri\Di\Inject\Container;
use Swoole\Client;
use Swoole\Coroutine;
@@ -11,52 +13,86 @@ trait TraitTransporter
{
/**
* @var RpcManager
*/
#[Container(RpcManager::class)]
public RpcManager $manager;
protected array $config;
protected array $clients = [];
/**
* @param resource $client
* @param $content
* @return string|bool
*/
protected function request(mixed $client, $content): string|bool
{
socket_write($client, \msgpack_pack($content), mb_strlen($content));
socket_read($client, 1024);
return \msgpack_unpack($client->recv());
}
/**
* @param $config
* @param string $service
* @return $this
* @throws RpcServiceException
*/
public function withConfig($config): static
protected function get_consul(string $service): static
{
$this->config = $config;
if (empty($service)) {
throw new RpcServiceException('You need set rpc service name if used.');
}
$sf = $this->manager->getServices($service);
if (empty($sf) || !is_array($sf)) {
throw new RpcServiceException('You need set rpc service name if used.');
}
$this->config = $this->_loadRand($sf);
return $this;
}
/**
* @param Client|Coroutine\Client $client
* @param $content
* @param bool $isClose
* @return mixed
* @param $services
* @return array
*/
private function request(Client|Coroutine\Client $client, $content, bool $isClose): mixed
#[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])]
protected function _loadRand($services): array
{
$client->send($content);
$read = $client->recv();
if ($isClose) {
$client->close();
$array = [];
foreach ($services as $value) {
$value['Weight'] = $value['Weights']['Passing'];
$array[] = $value;
}
return $read;
if (count($array) < 2) {
$luck = $array[0];
} else {
$luck = LotteryDraw::luck($array, 'Weight');
}
return [
'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'],
'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port']
];
}
/**
* @return Client|Coroutine\Client
* @throws Exception
*/
private function newClient(): Coroutine\Client|Client
protected function newClient(): Coroutine\Client|Client
{
$alias = $this->alias($this->config);
$client = $this->clients[$alias] ?? null;
if (is_null($client)) {
$client = Context::inCoroutine() ? new Coroutine\Client(SWOOLE_SOCK_TCP) : new Client(SWOOLE_SOCK_TCP);
$this->clients[$alias] = $client;
}
if (Context::inCoroutine()) {
$client = new Coroutine\Client(SWOOLE_SOCK_TCP);
} else {
$client = new Client(SWOOLE_SOCK_TCP);
}
[$host, $port] = [$this->config['Address'], $this->config['Port']];
if (!$client->isConnected() && !$client->connect($host, $port, 60)) {
throw new Exception('connect fail.');
@@ -64,15 +100,4 @@ trait TraitTransporter
return $client;
}
/**
* @param array $config
* @return string
*/
private function alias(array $config): string
{
return $config['Address'] . '::' . $config['Port'];
}
}
+14 -5
View File
@@ -9,16 +9,25 @@
],
"license": "MIT",
"require": {
"php": ">=8.0",
"php": ">=8.5",
"ext-json": "*",
"psr/http-client": "^1.0",
"psr/http-message": "^1.0"
"ext-msgpack": "*",
"start-point/etcd-php": "^1.1",
"game-worker/kiri-pool": "^v1.0",
"linkorb/etcd-php": "^1.6"
},
"replace": {
"symfony/polyfill-apcu": "*",
"symfony/polyfill-php80": "*",
"symfony/polyfill-mbstring": "*",
"symfony/polyfill-ctype": "*",
"symfony/polyfill-php73": "*",
"symfony/polyfill-php72": "*",
"symfony/polyfill-php81": "*"
},
"autoload": {
"psr-4": {
"Kiri\\Rpc\\": "./"
}
},
"require-dev": {
}
}
+9 -19
View File
@@ -3,7 +3,7 @@
use Kiri\Rpc\RpcJsonp;
use Kiri\Rpc\TestRpcService;
use Server\Constant;
use Kiri\Server\Constant;
return [
'rpc' => [
@@ -17,17 +17,6 @@ return [
],
'events' => [
Constant::RECEIVE => [RpcJsonp::class, 'onReceive']
],
'consumers' => [
'class' => TestRpcService::class,
'name' => 'test-rpc',
'package' => 'test',
'register' => [
'host' => '',
'port' => ''
]
]
],
@@ -37,6 +26,7 @@ return [
"datacenter" => "dc1",
"id" => "40e4a748-2192-161a-0510-9bf59fe950b5",
"node" => "FriendRpcService",
'class' => TestRpc::class,
"skipNodeUpdate" => false,
"service" => [
"id" => "redis1",
@@ -58,13 +48,13 @@ return [
"port" => 8000
],
"check" => [
"node" => "t2.320",
"checkId" => "service:redis1",
"name" => "Redis health check",
"Annotations" => "Script based health check",
"status" => "passing",
"serviceID" => "redis1",
"definition" => [
"node" => "t2.320",
"checkId" => "service:redis1",
"name" => "NoSql health check",
"Annotations" => "Script based health check",
"status" => "passing",
"serviceID" => "redis1",
"definition" => [
"http" => "172.26.221.211:9527",
"interval" => "5s",
"timeout" => "1s",