This commit is contained in:
2026-02-26 14:39:04 +08:00
parent c0a61aebdd
commit d90d2eb844
6 changed files with 463 additions and 0 deletions
+24
View File
@@ -0,0 +1,24 @@
{
"name": "game-worker/kiri-coroutine-server",
"description": "kiri coroutine server",
"authors": [
{
"name": "XiangLin",
"email": "as2252258@163.com"
}
],
"license": "MIT",
"require": {
"php": ">=8.4",
"ext-json": "*",
"ext-pdo": "*",
"game-worker/kiri-container": "^v1.6"
},
"autoload": {
"psr-4": {
"Coroutine\\Server\\": "./core"
}
},
"require-dev": {
}
}
+47
View File
@@ -0,0 +1,47 @@
<?php
namespace Coroutine\Server;
use Kiri\Di\Inject\Container;
use Kiri\NoSql\Redis;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
class QueueLoop
{
#[Container(Transport::class)]
public Transport $transport;
/**
* @return void
*/
public function loop(): void
{
$this->cleanOnline();
$redis = \Kiri::getDi()->get(Redis::class);
if ($redis instanceof Redis) {
$data = $redis->blPop(\config('redis.key'), 10);
if (!empty($data)) {
[$key, $data] = $data;
$json = json_decode($data, true);
$this->transport->send($json['userId'], json_encode($json['data']));
}
} else {
Coroutine::sleep(0.05);
}
$this->loop();
}
/**
* @return void
*/
public function cleanOnline(): void
{
$client = new Client(config('notice.host'), config('notice.port'));
$client->post('/cleanOnline', []);
$client->close();
}
}
+18
View File
@@ -0,0 +1,18 @@
<?php
namespace Coroutine\Server;
use Swoole\Http\Response;
class Struct
{
public mixed $user;
public int $fd;
public Response $ws;
public function __construct(mixed $user, int $fd, Response $ws)
{
$this->fd = $fd;
$this->user = $user;
$this->ws = $ws;
}
}
+88
View File
@@ -0,0 +1,88 @@
<?php
namespace Coroutine\Server;
class Transport
{
/**
* @var array<Struct>
*/
private array $clients = [];
/**
* @param int $fd
* @param Struct $data
* @return void
*/
public function add(int $fd, Struct $data): void
{
if (isset($this->clients[$fd])) {
$this->clients[$fd]->ws->close();
}
$this->clients[$fd] = $data;
}
/**
* @param int $fd
* @param mixed $data
* @return void
*/
public function send(int $fd, mixed $data): void
{
if (isset($this->clients[$fd])) {
$this->clients[$fd]->ws->push($data);
}
}
/**
* @param int $fd
* @return void
*/
public function close(int $fd): void
{
if (isset($this->clients[$fd])) {
$this->clients[$fd]->ws->close();
}
}
/**
* @param int $fd
* @return void
*/
public function remove(int $fd): void
{
if ($this->clients[$fd]) {
$this->clients[$fd]->ws->close();
}
unset($this->clients[$fd]);
}
/**
* @return array
*/
public function getLists(): array
{
$array = [];
foreach ($this->clients as $fd => $client) {
$array[] = [
'userId' => $fd,
'nickname' => $client->user['nickname'],
];
}
return $array;
}
/**
* @return int
*/
public function size(): int
{
return count($this->clients);
}
}
+155
View File
@@ -0,0 +1,155 @@
<?php
namespace Coroutine\Server;
use Closure;
use Kiri\Di\Context;
use Kiri\Di\Inject\Container;
use Kiri\Router\Constrict\ConstrictRequest;
use Kiri\Router\Constrict\ConstrictResponse;
use Kiri\Server\Contract\OnDisconnectInterface;
use Kiri\Server\Contract\OnMessageInterface;
use Psr\Http\Message\ResponseInterface;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
use Throwable;
abstract class Websocket implements OnMessageInterface, OnDisconnectInterface
{
#[Container(Transport::class)]
public Transport $transport;
private Server $server;
/**
* @param string $host
* @param int $port
* @return void
*/
public function start(string $host, int $port): void
{
$this->clearAllOnlineUsers();
$this->queueLoop();
$this->server = new Server($host, $port, false);
$this->server->handle('/websocket', function (Request $request, Response $ws) {
$this->WebsocketHandler($request, $ws);
});
$this->server->handle('/online/lists', function (Request $request, Response $ws) {
$data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE);
$ws->end($data);
});
echo 'websocket server start at ' . $host . ':' . $port . PHP_EOL;
$this->server->start();
}
/**
* @return void
*/
abstract public function clearAllOnlineUsers(): void;
/**
* @return void
*/
private function queueLoop(): void
{
Coroutine::create(function () {
$QueueLoop = \Kiri::getDi()->get(QueueLoop::class);
$QueueLoop->loop();
});
}
/**
* @param Request $request
* @param Response $ws
* @return void
*/
public function WebsocketHandler(Request $request, Response $ws): void
{
try {
$psr7Request = ConstrictRequest::builder($request);
if (!$psr7Request->hasQuery('auth')) {
throw new \Exception('Auth fail.', 401);
}
if (!$this->onAuthority($psr7Request, $ws)) {
throw new \Exception('Auth fail.', 401);
}
if ($psr7Request->getAuthority() == null || !$ws->upgrade()) {
throw new \Exception('Auth fail.', 401);
}
$userId = $psr7Request->getAuthority()->getUniqueId();
$ws->push($userId);
$user = [
'nickname' => $psr7Request->getAuthority()->getNickname(),
'avatar' => $psr7Request->getAuthority()->getAvatar(),
];
if (!$this->onConnected($psr7Request)) {
throw new \Exception('Auth fail.', 401);
}
$this->transport->add($userId, new Struct($user, $request->fd, $ws));
while (true) {
$frame = $ws->recv();
if ($frame === '' || $frame === false || $frame->data == 'close' || get_class($frame) === CloseFrame::class) {
$ws->close();
break;
}
$this->onMessage($this->server, $frame);
}
$this->onDisconnect($this->server, $userId);
} catch (Throwable $throwable) {
echo sprintf("Message: %s \n
File: %s \n
Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine());
$ws->setStatusCode(500);
$ws->end($throwable->getMessage());
$ws->close();
} finally {
if (isset($userId)) {
$this->transport->remove($userId);
}
}
}
/**
* @param ConstrictRequest $request
* @return bool
*/
abstract public function onConnected(ConstrictRequest $request): bool;
/**
* @param ConstrictRequest $request
* @param Response $ws
* @return bool
*/
abstract public function onAuthority(ConstrictRequest $request, Response $ws): bool;
/**
* @param \Swoole\Server|Server $server
* @param Frame $frame
* @return void
*/
abstract public function onMessage(\Swoole\Server|Server $server, Frame $frame): void;
/**
* @param \Swoole\WebSocket\Server|Server $server
* @param int $fd
* @return void
*/
abstract public function onDisconnect(\Swoole\WebSocket\Server|Server $server, int $fd): void;
}
+131
View File
@@ -0,0 +1,131 @@
<?php
use Coroutine\Server\Transport;
use Coroutine\Server\Websocket;
use Kiri\Router\Constrict\ConstrictRequest;
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\Frame;
use function Co\run;
class Test extends Websocket
{
/**
* @param ConstrictRequest $request
* @return bool
*/
public function onConnected(ConstrictRequest $request): bool
{
// TODO: Implement onConnected() method.
$this->stateChange($request->getAuthority()->getUniqueId(), 1);
return true;
}
/**
* @param ConstrictRequest $request
* @param Response $ws
* @return bool
*/
public function onAuthority(ConstrictRequest $request, Response $ws): bool
{
// TODO: Implement onAuthority() method.
$auth = $request->get['auth'] ?? '';
$client = new Client(config('user.host'), config('user.port'), true);
$client->post('/handshake', [
'token' => $auth,
'fd' => $request->fd,
'ip' => current(swoole_get_local_ip()),
]);
$client->close();
if ($client->getStatusCode() != 200) {
return false;
}
$data = json_decode($client->body, true);
$request->withAuthority($data);
return true;
}
/**
* @param \Swoole\Server|Server $server
* @param Frame $frame
* @return void
*/
public function onMessage(\Swoole\Server|Server $server, Frame $frame): void
{
// TODO: Implement onMessage() method.
}
/**
* @param Server|\Swoole\WebSocket\Server $server
* @param int $fd
* @return void
*/
public function onDisconnect(Server|\Swoole\WebSocket\Server $server, int $fd): void
{
// TODO: Implement onDisconnect() method.
$this->stateChange($fd, 0);
}
/**
* @return void
*/
public function clearAllOnlineUsers(): void
{
$request = [
'query' => [
'bool' => [
'filter' => [['term' => ['is_online' => 1]]],
],
],
'script' => [
'source' => "ctx._source['is_online'] = 0",
'lang' => 'painless',
],
];
$client = new Client(config('es_host'), +config('es_port'));
$client->setData(json_encode($request, JSON_NUMERIC_CHECK));
$client->setHeaders(['Content-Type' => 'application/json']);
$client->setMethod('POST');
$client->execute('/' . config('es_key') . '/_update_by_query');
$client->close();
}
/**
* @param int $userId
* @param int $status
* @return void
*/
private function stateChange(int $userId, int $status): void
{
$data = [
'doc' => [
'is_online' => $status,
],
];
$client = new Client(config('es_host'), +config('es_port'));
$client->setData(json_encode($data));
$client->setHeaders(['Content-Type' => 'application/json']);
$client->setMethod('POST');
$client->execute('/' . config('es_key') . '/_doc/' . $userId . '/_update');
$client->close();
}
}
run(function () {
$websocket = Kiri::getDi()->get(Test::class);
$websocket->start('0.0.0.0', 9504);
});