This commit is contained in:
2021-08-12 15:45:30 +08:00
parent 37fc82d7e1
commit bb3fd6988e
12 changed files with 0 additions and 762 deletions
-26
View File
@@ -1,26 +0,0 @@
{
"name": "game-worker/rpc-server",
"description": "db",
"authors": [
{
"name": "XiangLin",
"email": "as2252258@163.com"
}
],
"license": "MIT",
"require": {
"php": ">=8.0",
"ext-json": "*",
"ext-pdo": "*",
"game-worker/snowflake": "dev-master",
"game-worker/validator": "dev-master"
},
"autoload": {
"psr-4": {
"Rpc\\": "src/"
}
},
"require-dev": {
"kwn/php-rdkafka-stubs": "^2.0"
}
}
@@ -1,48 +0,0 @@
<?php
namespace Annotation\Rpc;
use Annotation\Attribute;
use Exception;
use Kiri\Kiri;
/**
* Class RpcClient
* @package Annotation\Rpc
*/
#[\Attribute(\Attribute::TARGET_CLASS)] class RpcConsumer extends Attribute
{
/**
* RpcClient constructor.
* @param string $package
* @param string $method
* @param int $timeout
* @param int $mode
*/
public function __construct(
public string $package,
public string $method,
public int $timeout,
public int $mode
)
{
}
/**
* @param mixed $class
* @param mixed $method
* @return bool
* @throws Exception
*/
public function execute(mixed $class, mixed $method = ''): bool
{
return true;
}
}
-57
View File
@@ -1,57 +0,0 @@
<?php
namespace Rpc\Annotation;
use Annotation\Attribute;
use Exception;
use HttpServer\Route\Router;
use JetBrains\PhpStorm\Pure;
use Kiri\Kiri;
/**
* Class RpcProducer
* @package Annotation\Route
*/
#[\Attribute(\Attribute::TARGET_CLASS)] class RpcService extends Attribute
{
const PROTOCOL_JSON = 'json';
const PROTOCOL_SERIALIZE = 'serialize';
/**
* Route constructor.
* @param string $package
* @param string $protocol
* @param string $server
* @param string $version
*/
#[Pure] public function __construct(public string $package, public string $protocol = self::PROTOCOL_SERIALIZE, public string $server = 'json-rpc',
public string $version = 'v1.0')
{
}
/**
* @param mixed $class
* @param mixed|null $method
* @return Router
* @throws Exception
*/
public function execute(mixed $class, mixed $method = null): Router
{
// TODO: Implement setHandler() method.
$router = Kiri::app()->getRouter();
$methods = Kiri::getDi()->getMethods($class::class);
foreach ($methods as $method => $reflectionMethod) {
$router->addRoute(':rpc/' . $this->package . '/' . $method . '/' . $this->version, [$class, $method], $this->server);
}
return $router;
}
}
-111
View File
@@ -1,111 +0,0 @@
<?php
namespace Rpc;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Core\Json;
use Swoole\Coroutine\Client as CClient;
/**
* Class Client
* @package Rpc
*/
class Client extends Component
{
public array $config = [];
public string $service = '';
private ?CClient $client = null;
/**
* @param string $package
* @param string $method
* @param array $param
* @return mixed
* @throws Exception
*/
public function dispatch(string $package, string $method, array $param): mixed
{
if (empty($this->config)) {
return $this->addError('Related service not found(404)');
}
if (!($this->client instanceof CClient)) {
$this->client = $this->getClient();
}
if (!$this->client->isConnected() && !$this->connect()) {
return false;
}
$isSend = $this->client->send(Protocol::encode($package, $method, $param));
if ($isSend === false) {
return $this->addError($this->client->errMsg . '(' . $this->client->errCode . ')');
}
defer(fn() => $this->client->close());
if (is_bool($unpack = Json::decode($this->client->recv()))) {
$unpack = $this->addError('Service return data format error(500)');
}
return $unpack;
}
/**
* @return bool
* @throws Exception
*/
private function connect(): bool
{
$host = $this->config['host'] ?? '127.0.0.1';
if (!isset($this->config['port'])) {
return $this->addError('Related service not have port(404)');
}
$timeout = $this->config['timeout'] ?? 0.2;
if (!$this->client->connect($host, $this->config['port'], $timeout)) {
return $this->addError($this->client->errMsg . '(' . $this->client->errCode . ')');
}
return true;
}
/**
* 断开链接
*/
public function close()
{
if (!$this->client || !$this->client->isConnected()) {
return;
}
$this->client->close();
}
/**
* @return mixed
* @throws Exception
*/
public function getClient(): CClient
{
$port = $this->config['port'] ?? 0;
if ($port < 0) {
throw new Exception('Related service not have port(404)');
}
$client = new CClient(SWOOLE_SOCK_TCP);
$client->set([
'timeout' => 0.5,
'connect_timeout' => 1.0,
'write_timeout' => 10.0,
'read_timeout' => 0.5,
'open_tcp_keepalive' => true,
]);
return $client;
}
}
-53
View File
@@ -1,53 +0,0 @@
<?php
namespace Rpc;
use Annotation\Inject;
use Exception;
use Kiri\Kiri;
/**
* Class Consumer
* @package Rpc
*/
abstract class Consumer implements IProducer
{
protected ?Client $client = null;
#[Inject('rpc')]
public ?Producer $rpc = null;
/**
* @return Client|null
*/
public function initClient(): ?Client
{
return $this->client;
}
/**
* @param string $name
* @return mixed
* @throws Exception
*/
public function __get(string $name): mixed
{
if (property_exists($this, $name)) {
return $this->$name;
}
$method = 'get' . ucfirst($name);
if (method_exists($this, $method)) {
return $this->{$method}();
}
return Kiri::app()->get($name);
}
}
-29
View File
@@ -1,29 +0,0 @@
<?php
namespace Rpc;
use HttpServer\Controller;
use HttpServer\Exception\RequestException;
use Rpc\Annotation\RpcService;
use Server\RequestInterface;
/**
*
*/
#[RpcService(package: "default", protocol: RpcService::PROTOCOL_JSON, server: 'json-rpc')]
class DefaultRpcController extends Controller
{
/**
* @param RequestInterface $request
* @return int
* @throws RequestException
*/
public function getSystemConfig(RequestInterface $request): int
{
return $request->int('a') + $request->int('b');
}
}
-18
View File
@@ -1,18 +0,0 @@
<?php
namespace Rpc;
interface IProducer
{
/**
* @return null|Client
* 初始化一个客户端
*/
public function initClient(): ?Client;
}
-129
View File
@@ -1,129 +0,0 @@
<?php
namespace Rpc;
use Exception;
use JetBrains\PhpStorm\Pure;
use Rpc\Annotation\RpcService;
/**
*
*/
class Protocol
{
private string $version = 'v1.0';
private array $headers = [];
private mixed $data = [];
/**
* @return string
*/
public function getVersion(): string
{
return $this->version;
}
/**
* @return array
*/
public function getHeaders(): array
{
return $this->headers;
}
/**
* @return mixed
*/
public function getData(): mixed
{
return $this->data;
}
/**
* @throws Exception
*/
public static function parse($data): static
{
$protocol = new Protocol();
$protocol->parseHeaders(...explode("\r\n\r\n", $data));
return $protocol;
}
/**
* @param string $service
* @param string $cmd
* @param array $param
* @return string
*/
public static function encode(string $service, string $cmd, array $param = []): string
{
$proto = 'REQUEST tcp/other.protocol v1.0' . "\r\n";
$proto .= ':Source: ' . implode(',', swoole_get_local_ip()) . "\r\n";
$proto .= ':Package: ' . $service . "\r\n";
$proto .= ':Path: ' . $cmd . "\r\n";
$proto .= ':Content-Type: ' . $cmd . "\r\n";
$proto .= ':Method: json-rpc' . "\r\n\r\n";
return $proto . json_encode($param) . "\r\n\r\n";
}
/**
* @param string $body
* @return void
*/
private function parseBody(string $body): void
{
if ($this->headers['Content-Type'] == RpcService::PROTOCOL_JSON) {
$this->data = json_decode($body, true);
} else {
$this->data = unserialize($body);
}
}
/**
* @param string $headers
* @param string $body
* @return void
* @throws Exception
*/
private function parseHeaders(string $headers, string $body): void
{
$explode = explode("\r\n", $headers);
$this->headers = [];
foreach ($explode as $key => $value) {
if ($key == 0) {
if (!str_starts_with($value, 'REQUEST tcp/other.protocol')) {
throw new Exception('Protocol format error.');
}
$this->version = str_replace('REQUEST tcp/other.protocol ', '', $value);
continue;
}
[$name, $item] = explode(': ', $value);
$this->headers[str_replace(':', '', $name)] = $item;
}
if (count(array_diff_key(Service::A_DEFAULT, $this->headers)) > 0) {
throw new Exception('Protocol format error.');
}
$this->parseBody($body);
}
/**
* @return string
*/
#[Pure] public function parseUrl(): string
{
return ':rpc/' . $this->headers['Package'] . '/' . $this->headers['Path'] . '/' . $this->getVersion();
}
}
-10
View File
@@ -1,10 +0,0 @@
<?php
namespace Rpc;
class Registry
{
}
-190
View File
@@ -1,190 +0,0 @@
<?php
namespace Rpc;
use Annotation\Inject;
use Exception;
use HttpServer\Exception\RequestException;
use HttpServer\Http\Context;
use HttpServer\Http\Request;
use HttpServer\Route\Node;
use HttpServer\Route\Router;
use Kiri\Abstracts\Config;
use Kiri\Events\EventDispatch;
use Kiri\Events\EventProvider;
use Kiri\Exception\NotFindClassException;
use Kiri\Kiri;
use Server\Constant;
use Server\Constrict\Response;
use Server\Constrict\ResponseEmitter;
use Server\Events\OnAfterRequest;
use Server\ExceptionHandlerDispatcher;
use Server\ExceptionHandlerInterface;
use Server\RequestInterface;
use Server\ResponseInterface;
use Swoole\Server;
use Server\Constrict\Request as cRequest;
use function Swoole\Coroutine\defer;
/**
* Class Service
* @package Rpc
*/
class Service extends \Server\Abstracts\Server
{
const A_DEFAULT = [
'Source' => '',
'Package' => '',
'Path' => '',
'Content-Type' => '',
'Method' => ''
];
private Router $router;
/** @var EventProvider */
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
public Response $response;
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
#[Inject(ResponseEmitter::class)]
public ResponseEmitter $responseEmitter;
/**
* @var ExceptionHandlerInterface
*/
public ExceptionHandlerInterface $exceptionHandler;
/**
* @throws Exception
*/
public function init()
{
$this->router = Kiri::getApp('router');
$exceptionHandler = Config::get('exception.http', ExceptionHandlerDispatcher::class);
if (!in_array(ExceptionHandlerInterface::class, class_implements($exceptionHandler))) {
$exceptionHandler = ExceptionHandlerDispatcher::class;
}
$this->exceptionHandler = Kiri::getDi()->get($exceptionHandler);
}
/**
* @param Server $server
* @param int $fd
* @param int $reactorId
* @throws Exception
*/
public function onConnect(Server $server, int $fd, int $reactorId)
{
}
/**
* @param Server $server
* @param int $fd
* on tcp client close
* @throws Exception
*/
public function onClose(Server $server, int $fd)
{
}
/**
* @param Server $server
* @param int $fd
* on tcp client close
* @throws Exception
*/
public function onDisconnect(Server $server, int $fd)
{
}
/**
* @param Server $server
* @param int $fd
* @param int $reID
* @param string $data
* @throws Exception
*/
public function onReceive(Server $server, int $fd, int $reID, string $data)
{
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
try {
$node = $this->router->Branch_search($this->requestSpl($data, $fd));
if (!($node instanceof Node)) {
throw new RequestException('<h2>HTTP 404 Not Found</h2><hr><i>Powered by Swoole</i>', 404);
}
$this->response->setClientId($fd, $reID);
if (!(($responseData = $node->dispatch()) instanceof ResponseInterface)) {
$responseData = $this->response->setContent($responseData)->setStatusCode(200);
}
} catch (\Throwable $exception) {
$responseData = $this->exceptionHandler->emit($exception, $this->response);
} finally {
$this->responseEmitter->sender($server, $responseData);
}
}
/**
* @param Server $server
* @param string $data
* @param array $client
* @throws Exception
*/
public function onPacket(Server $server, string $data, array $client)
{
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
try {
$request = $this->requestSpl((int)$client['server_port'], $data);
$result = $this->router->find_path($request)?->dispatch();
$server->sendto($client['address'], $client['port'], $result);
} catch (\Throwable $exception) {
$server->sendto($client['address'], $client['port'], $exception->getMessage());
}
}
/**
* @param string $data
* @param int $fd
* @return mixed
* @throws NotFindClassException
* @throws \ReflectionException
* @throws Exception
*/
public function requestSpl(string $data, int $fd = 0): RequestInterface
{
$data = Protocol::parse($data);
$sRequest = new Request();
$sRequest->setClientId($fd);
$sRequest->setPosts($data->getData());
$sRequest->setHeaders(array_merge($data->getHeaders(), [
'request_uri' => $data->parseUrl(),
'request_method' => $data->getHeaders()['Method']
]));
Context::setContext(Request::class, $sRequest);
return di(cRequest::class);
}
}
-52
View File
@@ -1,52 +0,0 @@
<?php
namespace Rpc;
use Annotation\Rpc\RpcConsumer;
use Kiri\Exception\NotFindClassException;
use Rpc\Annotation\RpcService;
/**
*
*/
#[RpcConsumer(package: 'default', method: '', timeout: 10, mode: 'json-rpc')]
class TestRpcClient
{
public string $package = 'default';
public string $protocol = RpcService::PROTOCOL_JSON;
public int $timeout = 10;
public string $method = '';
/**
* @return array
*/
protected function getRegistry(): array
{
return ['127.0.0.1', 5537];
}
/**
* @throws NotFindClassException
* @throws \ReflectionException
* @throws \Exception
*/
public function dispatch(array $param)
{
[$host, $port] = $this->getRegistry();
$client = di(Client::class);
$client->config = ['host' => $host, 'port' => $port, 'timeout' => $this->timeout];
$client->dispatch($this->package, $this->method, $param);
}
}
-39
View File
@@ -1,39 +0,0 @@
<?php
use Server\Constant;
return [
'rpc' => [
'name' => 'json-rpc',
'type' => Constant::SERVER_TYPE_TCP,
'host' => '0.0.0.0',
'mode' => SWOOLE_SOCK_TCP,
'port' => 5377,
'setting' => [
'open_tcp_keepalive' => true,
'tcp_keepidle' => 30,
'tcp_keepinterval' => 10,
'tcp_keepcount' => 10,
'open_http_protocol' => false,
'open_websocket_protocol' => false,
],
'events' => [
Constant::CONNECT => [],
Constant::CLOSE => [],
],
'registry' => [
'protocol' => 'consul',
'address' => [
'host' => '47.14.25.45',
'port' => 5527,
'path' => ''
],
],
'consumers' => [
]
]
];