127 lines
2.8 KiB
PHP
127 lines
2.8 KiB
PHP
<?php
|
|
|
|
|
|
use Coroutine\Server\Websocket;
|
|
use Kiri\Router\Constrict\ConstrictRequest;
|
|
use Swoole\Coroutine\Http\Client;
|
|
use Swoole\Coroutine\Http\Server;
|
|
use Swoole\Http\Response;
|
|
use Swoole\WebSocket\Frame;
|
|
use function Co\run;
|
|
|
|
|
|
class Test extends Websocket
|
|
{
|
|
|
|
|
|
/**
|
|
* @param ConstrictRequest $request
|
|
* @return bool
|
|
*/
|
|
public function onConnected(ConstrictRequest $request): bool
|
|
{
|
|
// TODO: Implement onConnected() method.
|
|
$this->stateChange($request->getAuthority()->getUniqueId(), 1);
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* @param ConstrictRequest $request
|
|
* @param Response $ws
|
|
* @return bool
|
|
*/
|
|
public function onAuthority(ConstrictRequest $request, Response $ws): bool
|
|
{
|
|
// TODO: Implement onAuthority() method.
|
|
$auth = $request->get['auth'] ?? '';
|
|
$client = new Client(config('user.host'), config('user.port'), true);
|
|
$client->post('/handshake', [
|
|
'token' => $auth,
|
|
'fd' => $request->fd,
|
|
'ip' => current(swoole_get_local_ip()),
|
|
]);
|
|
$client->close();
|
|
if ($client->getStatusCode() != 200) {
|
|
return false;
|
|
} else {
|
|
$request->withAuthority(json_decode($client->body));
|
|
return true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param \Swoole\Server|Server $server
|
|
* @param Frame $frame
|
|
* @return void
|
|
*/
|
|
public function onMessage(\Swoole\Server|Server $server, Frame $frame): void
|
|
{
|
|
// TODO: Implement onMessage() method.
|
|
}
|
|
|
|
|
|
/**
|
|
* @param Server|\Swoole\WebSocket\Server $server
|
|
* @param int $fd
|
|
* @return void
|
|
*/
|
|
public function onDisconnect(Server|\Swoole\WebSocket\Server $server, int $fd): void
|
|
{
|
|
// TODO: Implement onDisconnect() method.
|
|
$this->stateChange($fd, 0);
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* @return void
|
|
*/
|
|
public function clearAllOnlineUsers(): void
|
|
{
|
|
$request = [
|
|
'query' => [
|
|
'bool' => [
|
|
'filter' => [['term' => ['is_online' => 1]]],
|
|
],
|
|
],
|
|
'script' => [
|
|
'source' => "ctx._source['is_online'] = 0",
|
|
'lang' => 'painless',
|
|
],
|
|
];
|
|
$client = new Client(config('es_host'), +config('es_port'));
|
|
$client->setData(json_encode($request, JSON_NUMERIC_CHECK));
|
|
$client->setHeaders(['Content-Type' => 'application/json']);
|
|
$client->setMethod('POST');
|
|
$client->execute('/' . config('es_key') . '/_update_by_query');
|
|
$client->close();
|
|
}
|
|
|
|
|
|
/**
|
|
* @param int $userId
|
|
* @param int $status
|
|
* @return void
|
|
*/
|
|
private function stateChange(int $userId, int $status): void
|
|
{
|
|
$data = [
|
|
'doc' => [
|
|
'is_online' => $status,
|
|
],
|
|
];
|
|
$client = new Client(config('es_host'), +config('es_port'));
|
|
$client->setData(json_encode($data));
|
|
$client->setHeaders(['Content-Type' => 'application/json']);
|
|
$client->setMethod('POST');
|
|
$client->execute('/' . config('es_key') . '/_doc/' . $userId . '/_update');
|
|
$client->close();
|
|
}
|
|
}
|
|
|
|
run(function () {
|
|
$websocket = Kiri::getDi()->get(Test::class);
|
|
$websocket->start('0.0.0.0', 9504);
|
|
});
|