This commit is contained in:
2025-12-16 20:20:09 +08:00
parent d6c8380c64
commit 174d2f1659
3 changed files with 443 additions and 61 deletions
+59 -58
View File
@@ -1,58 +1,59 @@
{
"name": "game-worker/kiri-core",
"description": "test framework",
"authors": [
{
"name": "XiangLin",
"email": "as2252258@163.com"
}
],
"license": "MIT",
"require": {
"php": ">=8.4",
"ext-json": "*",
"ext-fileinfo": "*",
"ext-pdo": "*",
"ext-redis": "*",
"ext-simplexml": "*",
"ext-libxml": "*",
"ext-iconv": "*",
"ext-mbstring": "*",
"ext-xml": "*",
"ext-curl": "*",
"ext-openssl": "*",
"ext-swoole": "*",
"ext-msgpack": "*",
"symfony/console": "^v7.3.1",
"psr/log": "1.*",
"composer-runtime-api": "^2.0",
"psr/http-server-middleware": "1.0.1",
"ext-pcntl": "*",
"ext-sockets": "*",
"nikic/php-parser": "^v5.5.0",
"ext-inotify": "*",
"game-worker/kiri-pool": "~v1.0",
"monolog/monolog": "^2.9",
"psr/container": "^2.0",
"swiftmailer/swiftmailer": "^v6.3.0"
},
"replace": {
"symfony/polyfill-apcu": "*",
"symfony/polyfill-php80": "*",
"symfony/polyfill-mbstring": "*",
"symfony/polyfill-ctype": "*",
"symfony/polyfill-php73": "*",
"symfony/polyfill-php72": "*",
"symfony/polyfill-php81": "*"
},
"autoload": {
"psr-4": {
"Kiri\\": "kiri-engine/",
"Kiri\\Actor\\": "kiri-actor/"
},
"files": [
"Kiri.php",
"function.php"
]
}
}
{
"name": "game-worker/kiri-core",
"description": "test framework",
"authors": [
{
"name": "XiangLin",
"email": "as2252258@163.com"
}
],
"license": "MIT",
"require": {
"php": ">=8.4",
"ext-json": "*",
"ext-fileinfo": "*",
"ext-pdo": "*",
"ext-redis": "*",
"ext-simplexml": "*",
"ext-libxml": "*",
"ext-iconv": "*",
"ext-mbstring": "*",
"ext-xml": "*",
"ext-curl": "*",
"ext-openssl": "*",
"ext-swoole": "*",
"ext-msgpack": "*",
"symfony/console": "^v7.3.1",
"psr/log": "1.*",
"composer-runtime-api": "^2.0",
"psr/http-server-middleware": "1.0.1",
"ext-pcntl": "*",
"ext-sockets": "*",
"nikic/php-parser": "^v5.5.0",
"ext-inotify": "*",
"game-worker/kiri-pool": "~v1.0",
"monolog/monolog": "^2.9",
"psr/container": "^2.0",
"swiftmailer/swiftmailer": "^v6.3.0",
"ext-mongodb": "*"
},
"replace": {
"symfony/polyfill-apcu": "*",
"symfony/polyfill-php80": "*",
"symfony/polyfill-mbstring": "*",
"symfony/polyfill-ctype": "*",
"symfony/polyfill-php73": "*",
"symfony/polyfill-php72": "*",
"symfony/polyfill-php81": "*"
},
"autoload": {
"psr-4": {
"Kiri\\": "kiri-engine/",
"Kiri\\Actor\\": "kiri-actor/"
},
"files": [
"Kiri.php",
"function.php"
]
}
}
+381
View File
@@ -0,0 +1,381 @@
<?php
/**
* Created by PhpStorm.
* User: whwyy
* Date: 2024/12/19
* Time: 14:00
*/
declare(strict_types=1);
namespace Kiri\Redis;
use Kiri;
use Kiri\Exception\RedisConnectException;
use Kiri\Pool\Pool;
use MongoDB\Client;
use MongoDB\Database;
use MongoDB\Collection;
use function config;
/**
* Class MongoDB
* @package Kiri\NoSql
* @mixin Client
* @mixin Database
* @mixin Collection
*/
class MongoDB
{
public string $host = 'localhost';
public int $port = 27017;
public string $database = '';
public string $username = '';
public string $password = '';
public string $authSource = 'admin';
public int $timeout = 30;
public array $options = [];
/**
* @var array|int[]
*/
public array $pool = ['min' => 1, 'max' => 100];
/**
* 初始化
*/
public function __construct()
{
Kiri::configure($this, config('mongodb', []));
}
/**
* @return void
* @throws
*/
public function init(): void
{
}
/**
* @param $name
* @param $arguments
* @return mixed
* @throws
*/
public function __call($name, $arguments): mixed
{
if (method_exists($this, $name)) {
return $this->{$name}(...$arguments);
} else {
return $this->proxy($name, $arguments);
}
}
/**
* 获取数据库实例
* @param string|null $database
* @return Database
* @throws
*/
public function getDatabase(?string $database = null): Database
{
$dbName = $database ?? $this->database;
if (empty($dbName)) {
throw new RedisConnectException('MongoDB database name is required.');
}
return $this->getClient()->selectDatabase($dbName);
}
/**
* 获取集合实例
* @param string $collection
* @param string|null $database
* @return Collection
* @throws
*/
public function getCollection(string $collection, ?string $database = null): Collection
{
return $this->getDatabase($database)->selectCollection($collection);
}
/**
* 代理方法调用到 MongoDB Client
* @param $name
* @param $arguments
* @return mixed
* @throws
*/
public function proxy($name, $arguments): mixed
{
$client = $this->getClient();
try {
// 如果方法存在于 Client,直接调用
if (method_exists($client, $name)) {
return $client->{$name}(...$arguments);
}
// 如果方法存在于 Database,通过默认数据库调用
$database = $this->getDatabase();
if (method_exists($database, $name)) {
return $database->{$name}(...$arguments);
}
throw new \BadMethodCallException("Method {$name} does not exist on MongoDB Client or Database.");
} catch (\Throwable $throwable) {
return trigger_print_error(throwable($throwable));
} finally {
// MongoDB 连接是持久的,不需要释放
$this->pool()->push($this->getName(), $client);
}
}
/**
* 执行 MongoDB 命令
* @param array|object $command
* @param array $options
* @param string|null $database
* @return array|object
* @throws
*/
public function command(array|object $command, array $options = [], ?string $database = null): array|object
{
$db = $this->getDatabase($database);
return $db->command($command, $options);
}
/**
* 插入文档
* @param string $collection
* @param array|object $document
* @param array $options
* @param string|null $database
* @return \MongoDB\InsertOneResult|\MongoDB\InsertManyResult
* @throws
*/
public function insert(string $collection, array|object $document, array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
if (is_array($document) && isset($document[0]) && is_array($document[0])) {
// 批量插入
return $coll->insertMany($document, $options);
}
// 单条插入
return $coll->insertOne($document, $options);
}
/**
* 查找文档
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return \MongoDB\Driver\Cursor
* @throws
*/
public function find(string $collection, array $filter = [], array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
return $coll->find($filter, $options);
}
/**
* 查找单个文档
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return array|object|null
* @throws
*/
public function findOne(string $collection, array $filter = [], array $options = [], ?string $database = null): array|object|null
{
$coll = $this->getCollection($collection, $database);
return $coll->findOne($filter, $options);
}
/**
* 更新文档
* @param string $collection
* @param array $filter
* @param array $update
* @param array $options
* @param string|null $database
* @return \MongoDB\UpdateResult
* @throws
*/
public function update(string $collection, array $filter, array $update, array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
if (isset($options['multi']) && $options['multi']) {
unset($options['multi']);
return $coll->updateMany($filter, $update, $options);
}
return $coll->updateOne($filter, $update, $options);
}
/**
* 删除文档
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return \MongoDB\DeleteResult
* @throws
*/
public function delete(string $collection, array $filter, array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
if (isset($options['limit']) && $options['limit'] == 1) {
unset($options['limit']);
return $coll->deleteOne($filter, $options);
}
return $coll->deleteMany($filter, $options);
}
/**
* 统计文档数量
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return int
* @throws
*/
public function count(string $collection, array $filter = [], array $options = [], ?string $database = null): int
{
$coll = $this->getCollection($collection, $database);
return $coll->countDocuments($filter, $options);
}
/**
* @return void
* @throws
*/
public function destroy(): void
{
$this->pool()->close($this->getName());
}
/**
* 获取 MongoDB 客户端
* @return Client
* @throws
*/
private function getClient(): Client
{
return $this->pool()->get($this->getName());
}
/**
* @return Pool
* @throws
*/
protected function pool(): Pool
{
$pool = Kiri::getPool();
if (!$pool->hasChannel($this->getName())) {
$pool->created($this->getName(), $this->pool['max'], [$this, 'connect']);
}
return $pool;
}
/**
* @return string
*/
private function getName(): string
{
return 'mongodb.' . $this->host . '.' . $this->database;
}
/**
* 创建 MongoDB 连接
* @return Client
* @throws
*/
protected function connect(): Client
{
$uri = $this->buildUri();
$clientOptions = $this->buildClientOptions();
try {
$client = new Client($uri, $clientOptions);
// 测试连接
$client->selectDatabase($this->database)->command(['ping' => 1]);
return $client;
} catch (\Throwable $e) {
throw new RedisConnectException(sprintf('MongoDB Connect %s Fail: %s', $uri, $e->getMessage()));
}
}
/**
* 构建 MongoDB 连接 URI
* @return string
*/
private function buildUri(): string
{
$auth = '';
if (!empty($this->username) && !empty($this->password)) {
$auth = $this->username . ':' . urlencode($this->password) . '@';
}
// 支持多种 host 格式:host:port 或 host1,host2:port
$hosts = $this->host;
if (!str_contains($hosts, ',') && !str_contains($hosts, ':')) {
$hosts = $hosts . ':' . $this->port;
}
$uri = 'mongodb://' . $auth . $hosts;
// 添加数据库和认证源
$query = [];
if (!empty($this->database)) {
$query[] = 'database=' . $this->database;
}
if (!empty($this->authSource)) {
$query[] = 'authSource=' . $this->authSource;
}
if (!empty($query)) {
$uri .= '/?' . implode('&', $query);
}
return $uri;
}
/**
* 构建客户端选项
* @return array
*/
private function buildClientOptions(): array
{
return array_merge([
'connectTimeoutMS' => $this->timeout * 1000,
'serverSelectionTimeoutMS' => $this->timeout * 1000,
'socketTimeoutMS' => $this->timeout * 1000,
], $this->options);
}
}
@@ -15,7 +15,7 @@ use Kiri\Pool\Pool;
use function config;
/**
* Class Redis
* Class NoSql
* @package Kiri\Cache
* @mixin \Redis
*/
@@ -197,10 +197,10 @@ SCRIPT;
{
$redis = new \Redis();
if (!$redis->connect($this->host, $this->port, $this->timeout)) {
throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $this->host, $this->port));
throw new RedisConnectException(sprintf('The NoSql Connect %s::%d Fail.', $this->host, $this->port));
}
if (!empty($this->auth) && !$redis->auth($this->auth)) {
throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $this->host, $this->auth));
throw new RedisConnectException(sprintf('NoSql Error: %s, Host %s, Auth %s', $redis->getLastError(), $this->host, $this->auth));
}
$redis->select($this->databases);
if ($this->read_timeout > 0) {