This commit is contained in:
2021-12-02 14:06:57 +08:00
parent 7fef63fa39
commit 19c9b692d4
4 changed files with 102 additions and 64 deletions
+1
View File
@@ -55,6 +55,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
/** /**
* @param array $data * @param array $data
* @return ServerRequestInterface * @return ServerRequestInterface
* @throws \ReflectionException
*/ */
private function requestBody(array $data): ServerRequestInterface private function requestBody(array $data): ServerRequestInterface
{ {
+39 -29
View File
@@ -2,32 +2,31 @@
namespace Kiri\Rpc\Note; namespace Kiri\Rpc\Note;
use Kiri\Consul\Catalog\Catalog;
use Note\Attribute;
use Kiri\Abstracts\Config; use Kiri\Abstracts\Config;
use Kiri\Consul\Agent; use Kiri\Core\Network;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Kiri\Kiri; use Kiri\Kiri;
use Kiri\Rpc\RpcManager; use Kiri\Rpc\RpcManager;
use Note\Attribute;
use ReflectionException; use ReflectionException;
#[\Attribute(\Attribute::TARGET_CLASS)] class JsonRpc extends Attribute #[\Attribute(\Attribute::TARGET_CLASS)] class JsonRpc extends Attribute
{ {
private string $uniqueId = '';
/** /**
* @param string $service * @param string $service
* @param string $driver * @param string $driver
* @param array $tags
* @param array $meta
* @param array $checkOptions * @param array $checkOptions
*/ */
public function __construct(public string $service, public string $driver, public array $checkOptions = [ public function __construct(public string $service, public string $driver, public array $tags = [], public array $meta = [], public array $checkOptions = [])
"DeregisterCriticalServiceAfter" => "1m",
"Http" => "http://127.0.0.1:9527",
"Interval" => "1s",
"Timeout" => "1s"
])
{ {
$this->uniqueId = preg_replace('/(\w{11})(\w{4})(\w{3})(\w{8})(\w{6})/', '$1-$2-$3-$4-$5', md5(__DIR__ . '.' . md5(Network::local())));
} }
@@ -40,13 +39,7 @@ use ReflectionException;
*/ */
public function execute(mixed $class, mixed $method = ''): bool public function execute(mixed $class, mixed $method = ''): bool
{ {
$default = $this->create(); return Kiri::getDi()->get(RpcManager::class)->add($this->service, $class, $this->create());
$agent = Kiri::getDi()->get(Catalog::class);
$data = $agent->register($default);
if ($data->getStatusCode() != 200) {
exit($data->getBody()->getContents());
}
return RpcManager::add($this->service, $class, $default['id']);
} }
@@ -55,19 +48,36 @@ use ReflectionException;
*/ */
protected function create(): array protected function create(): array
{ {
$content = current(swoole_get_local_ip()); $rpcPort = Config::get('rpc.port');
return [ return [
"id" => "rpc.json.{$this->service}." . md5(__DIR__ . '.' . md5($content)), "ID" => "rpc.json.{$this->service}." . $this->uniqueId,
"name" => $this->service, "Service" => $this->service,
"address" => $content, "Address" => Network::local(),
"port" => 9526, "EnableTagOverride" => true,
"enableTagOverride" => true, "TaggedAddresses" => [
"check" => [ "lan" => [
"DeregisterCriticalServiceAfter" => "1m", "address" => "127.0.0.1",
"TCP" => $content . ":" . Config::get('rpc.port'), "port" => $rpcPort
"Interval" => "1s", ],
"Timeout" => "1s" "wan" => [
] "address" => Network::local(),
"port" => $rpcPort
]
],
"Meta" => $this->meta,
"Port" => $rpcPort,
"Check" => [
"CheckId" => "service:rpc.json.{$this->service}." . $this->uniqueId,
"Name" => "service " . $this->service . ' health check',
"Notes" => "Script based health check",
"ServiceID" => $this->service,
"Definition" => [
"TCP" => Network::local() . ":" . Config::get('rpc.port'),
"Interval" => "5s",
"Timeout" => "1s",
"DeregisterCriticalServiceAfter" => "30s"
]
],
]; ];
} }
+36 -25
View File
@@ -6,12 +6,9 @@ use Http\Constrict\RequestInterface;
use Http\Handler\Router; use Http\Handler\Router;
use Http\Message\ServerRequest; use Http\Message\ServerRequest;
use Kiri\Abstracts\Component; use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Consul\Agent; use Kiri\Consul\Agent;
use Kiri\Consul\Catalog\Catalog;
use Kiri\Context; use Kiri\Context;
use Kiri\Events\EventProvider; use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Kiri; use Kiri\Kiri;
use Note\Inject; use Note\Inject;
use Note\Note; use Note\Note;
@@ -19,14 +16,18 @@ use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface; use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
use ReflectionException;
use Server\Contract\OnCloseInterface; use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface; use Server\Contract\OnConnectInterface;
use Server\Contract\OnReceiveInterface; use Server\Contract\OnReceiveInterface;
use Server\Events\OnBeforeShutdown; use Server\Events\OnBeforeShutdown;
use Server\Events\OnServerBeforeStart;
use Server\Events\OnTaskerStart;
use Server\Events\OnWorkerStart; use Server\Events\OnWorkerStart;
use Swoole\Coroutine; use Swoole\Coroutine;
use Swoole\Coroutine\Channel; use Swoole\Coroutine\Channel;
use Swoole\Server; use Swoole\Server;
use Swoole\Timer;
/** /**
@@ -51,6 +52,10 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
#[Inject(ContainerInterface::class)] #[Inject(ContainerInterface::class)]
public ContainerInterface $container; public ContainerInterface $container;
private RpcManager $manager;
/** /**
* *
* @throws \Exception * @throws \Exception
@@ -61,7 +66,11 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
scan_directory(APP_PATH . 'rpc', 'Rpc'); scan_directory(APP_PATH . 'rpc', 'Rpc');
$this->eventProvider->on(OnWorkerStart::class, [$this, 'register']); $this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
$this->eventProvider->on(OnTaskerStart::class, [$this, 'consulWatches']);
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
$this->manager = Kiri::getDi()->get(RpcManager::class);
} }
@@ -72,38 +81,40 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
*/ */
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown) public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown)
{ {
$doneList = RpcManager::doneList(); $doneList = $this->manager->doneList();
$agent = $this->container->get(Agent::class); $agent = $this->container->get(Agent::class);
foreach ($doneList as $value) { foreach ($doneList as $value) {
$agent->service->deregister($value); $agent->service->deregister($value['config']['ID']);
$agent->checks->deregister($value['config']['Check']['CheckId']);
} }
} }
/** /**
* @param OnWorkerStart $server * @param OnWorkerStart|OnTaskerStart $server
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/ */
public function register(OnWorkerStart $server) public function consulWatches(OnWorkerStart|OnTaskerStart $server)
{ {
if ($server->workerId != 0) { Timer::tick(1000, static function () {
return; $lists = Kiri::getDi()->get(RpcManager::class)->doneList();
} $health = Kiri::getDi()->get(Agent::class)->checks;
foreach ($lists as $list) {
$config = Config::get('rpc'); $health->checks();
$catalog = $this->container->get(Catalog::class);
$catalog->register([
]); }
});
}
$agent = $this->container->get(Agent::class);
$data = $agent->service->register($config['registry']['config']); /**
if ($data->getStatusCode() != 200) { * @param OnServerBeforeStart $server
$server->server->shutdown(); * @throws ReflectionException
} */
public function register(OnServerBeforeStart $server)
{
$this->manager->register();
} }
@@ -193,7 +204,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
private function dispatch($data): array private function dispatch($data): array
{ {
try { try {
[$handler, $params, $_] = RpcManager::get($data['service'], $data['method']); [$handler, $params, $_] = $this->container->get(RpcManager::class)->get($data['service'], $data['method']);
if (is_null($handler)) { if (is_null($handler)) {
throw new \Exception('Method not found', -32601); throw new \Exception('Method not found', -32601);
} else { } else {
@@ -223,7 +234,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
* @param array $handler * @param array $handler
* @param $request * @param $request
* @return array * @return array
* @throws \ReflectionException * @throws ReflectionException
*/ */
private function handler(array $handler, $request): array private function handler(array $handler, $request): array
{ {
+26 -10
View File
@@ -2,6 +2,7 @@
namespace Kiri\Rpc; namespace Kiri\Rpc;
use Kiri\Consul\Agent;
use Kiri\Kiri; use Kiri\Kiri;
use ReflectionException; use ReflectionException;
@@ -12,27 +13,27 @@ class RpcManager
/** /**
* @var array * @var array
*/ */
private static array $_rpc = []; private array $_rpc = [];
/** /**
* @param string $name * @param string $name
* @param string $class * @param string $class
* @param string $serviceId * @param array $serviceConfig
* @return bool * @return bool
* @throws ReflectionException * @throws ReflectionException
*/ */
public static function add(string $name, string $class, string $serviceId): bool public function add(string $name, string $class, array $serviceConfig): bool
{ {
$methods = Kiri::getDi()->getReflect($class); $methods = Kiri::getDi()->getReflect($class);
$lists = $methods->getMethods(\ReflectionMethod::IS_PUBLIC); $lists = $methods->getMethods(\ReflectionMethod::IS_PUBLIC);
if (!isset(static::$_rpc[$name])) static::$_rpc[$name] = ['methods' => [], 'id' => $serviceId]; if (!isset($this->_rpc[$name])) $this->_rpc[$name] = ['methods' => [], 'id' => $serviceConfig['id'], 'config' => $serviceConfig];
foreach ($lists as $reflection) { foreach ($lists as $reflection) {
$methodName = $reflection->getName(); $methodName = $reflection->getName();
static::$_rpc[$name]['methods'][$methodName] = [[$class, $methodName], null]; $this->_rpc[$name]['methods'][$methodName] = [[$class, $methodName], null];
} }
return true; return true;
} }
@@ -41,24 +42,39 @@ class RpcManager
/** /**
* @return array * @return array
*/ */
public static function doneList(): array public function doneList(): array
{ {
$array = []; $array = [];
foreach (static::$_rpc as $list) { foreach ($this->_rpc as $list) {
$array[] = $list['id']; $array[] = $list;
} }
return $array; return $array;
} }
/**
* @throws ReflectionException
*/
public function register()
{
$agent = Kiri::getDi()->get(Agent::class);
foreach ($this->_rpc as $list) {
$data = $agent->service->register($list['config']);
if ($data->getStatusCode() != 200) {
exit($data->getBody()->getContents());
}
}
}
/** /**
* @param string $name * @param string $name
* @param string $method * @param string $method
* @return mixed * @return mixed
*/ */
public static function get(string $name, string $method): array public function get(string $name, string $method): array
{ {
return static::$_rpc[$name]['methods'][$method] ?? [null, null]; return $this->_rpc[$name]['methods'][$method] ?? [null, null];
} }
} }