Files
kiri-core/kiri-engine/NoSql/MongoDB.php
T
2026-06-24 20:11:12 +08:00

418 lines
10 KiB
PHP

<?php
/**
* Created by PhpStorm.
* User: whwyy
* Date: 2024/12/19
* Time: 14:00
*/
declare(strict_types=1);
namespace Kiri\NoSql;
use Kiri;
use Kiri\Exception\RedisConnectException;
use Kiri\Pool\Pool;
use MongoDB\Client;
use MongoDB\Database;
use MongoDB\Collection;
use function config;
/**
* Class MongoDB
* @package Kiri\NoSql
* @mixin Client
* @mixin Database
* @mixin Collection
*/
class MongoDB
{
public string $host = 'localhost';
public int $port = 27017;
public string $database = '';
public string $username = '';
public string $password = '';
public string $authSource = 'admin';
public int $timeout = 30;
public array $options = [];
/**
* @var array|int[]
*/
public array $pool = ['min' => 1, 'max' => 100];
/**
* 初始化
*/
public function __construct()
{
Kiri::configure($this, config('mongodb', []));
}
/**
* @return void
* @throws
*/
public function init(): void
{
}
/**
* @param $name
* @param $arguments
* @return mixed
* @throws
*/
public function __call($name, $arguments): mixed
{
if (method_exists($this, $name)) {
return $this->{$name}(...$arguments);
} else {
return $this->proxy($name, $arguments);
}
}
/**
* 获取数据库实例
* @param string|null $database
* @return Database
* @throws
*/
public function getDatabase(?string $database = null): Database
{
$dbName = $database ?? $this->database;
if (empty($dbName)) {
throw new RedisConnectException('MongoDB database name is required.');
}
return $this->getClient()->selectDatabase($dbName);
}
/**
* 获取集合实例
* @param string $collection
* @param string|null $database
* @return Collection
* @throws
*/
public function getCollection(string $collection, ?string $database = null): Collection
{
return $this->getDatabase($database)->selectCollection($collection);
}
/**
* 代理方法调用到 MongoDB Client,内置连接健康检查和回收
* 异常时关闭连接并回退计数器,防止断连对象污染连接池
* @param $name
* @param $arguments
* @return mixed
* @throws
*/
public function proxy($name, $arguments): mixed
{
$client = $this->getClient();
try {
// 如果方法存在于 Client,直接调用
if (method_exists($client, $name)) {
return $client->{$name}(...$arguments);
}
// 如果方法存在于 Database,通过默认数据库调用
$database = $this->getDatabase();
if (method_exists($database, $name)) {
$result = $database->{$name}(...$arguments);
$this->returnClient($client);
return $result;
}
throw new \BadMethodCallException("Method {$name} does not exist on MongoDB Client or Database.");
} catch (\Throwable $throwable) {
\Kiri::getLogger()->json_log($throwable);
$this->closeClient($client);
return false;
}
$this->returnClient($client);
}
/**
* 归还连接
* @param Client $client
* @return void
*/
private function returnClient(Client $client): void
{
try {
$this->pool()->push($this->getName(), $client);
} catch (\Throwable) {
$this->closeClient($client);
}
}
/**
* 关闭连接并回退计数器
* @param Client $client
* @return void
*/
private function closeClient(Client $client): void
{
try {
$client->close();
} catch (\Throwable) {
}
$this->pool()->abandon($this->getName());
}
/**
* 执行 MongoDB 命令
* @param array|object $command
* @param array $options
* @param string|null $database
* @return array|object
* @throws
*/
public function command(array|object $command, array $options = [], ?string $database = null): array|object
{
$db = $this->getDatabase($database);
return $db->command($command, $options);
}
/**
* 插入文档
* @param string $collection
* @param array|object $document
* @param array $options
* @param string|null $database
* @return \MongoDB\InsertOneResult|\MongoDB\InsertManyResult
* @throws
*/
public function insert(string $collection, array|object $document, array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
if (is_array($document) && isset($document[0]) && is_array($document[0])) {
// 批量插入
return $coll->insertMany($document, $options);
}
// 单条插入
return $coll->insertOne($document, $options);
}
/**
* 查找文档
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return \MongoDB\Driver\Cursor
* @throws
*/
public function find(string $collection, array $filter = [], array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
return $coll->find($filter, $options);
}
/**
* 查找单个文档
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return array|object|null
* @throws
*/
public function findOne(string $collection, array $filter = [], array $options = [], ?string $database = null): array|object|null
{
$coll = $this->getCollection($collection, $database);
return $coll->findOne($filter, $options);
}
/**
* 更新文档
* @param string $collection
* @param array $filter
* @param array $update
* @param array $options
* @param string|null $database
* @return \MongoDB\UpdateResult
* @throws
*/
public function update(string $collection, array $filter, array $update, array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
if (isset($options['multi']) && $options['multi']) {
unset($options['multi']);
return $coll->updateMany($filter, $update, $options);
}
return $coll->updateOne($filter, $update, $options);
}
/**
* 删除文档
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return \MongoDB\DeleteResult
* @throws
*/
public function delete(string $collection, array $filter, array $options = [], ?string $database = null)
{
$coll = $this->getCollection($collection, $database);
if (isset($options['limit']) && $options['limit'] == 1) {
unset($options['limit']);
return $coll->deleteOne($filter, $options);
}
return $coll->deleteMany($filter, $options);
}
/**
* 统计文档数量
* @param string $collection
* @param array $filter
* @param array $options
* @param string|null $database
* @return int
* @throws
*/
public function count(string $collection, array $filter = [], array $options = [], ?string $database = null): int
{
$coll = $this->getCollection($collection, $database);
return $coll->countDocuments($filter, $options);
}
/**
* @return void
* @throws
*/
public function destroy(): void
{
$this->pool()->close($this->getName());
}
/**
* 获取 MongoDB 客户端
* @return Client
* @throws
*/
private function getClient(): Client
{
return $this->pool()->get($this->getName());
}
/**
* @return Pool
* @throws
*/
protected function pool(): Pool
{
$pool = Kiri::getPool();
if (!$pool->hasChannel($this->getName())) {
$pool->created($this->getName(), $this->pool['max'], [$this, 'connect']);
}
return $pool;
}
/**
* @return string
*/
private function getName(): string
{
return 'mongodb.' . $this->host . '.' . $this->database;
}
/**
* 创建 MongoDB 连接
* @return Client
* @throws
*/
protected function connect(): Client
{
$uri = $this->buildUri();
$clientOptions = $this->buildClientOptions();
try {
$client = new Client($uri, $clientOptions);
// 测试连接
$client->selectDatabase($this->database)->command(['ping' => 1]);
return $client;
} catch (\Throwable $e) {
\Kiri::getLogger()->json_log($e);
throw new RedisConnectException(sprintf('MongoDB Connect %s Fail: %s', $uri, $e->getMessage()));
}
}
/**
* 构建 MongoDB 连接 URI
* @return string
*/
private function buildUri(): string
{
$auth = '';
if (!empty($this->username) && !empty($this->password)) {
$auth = $this->username . ':' . urlencode($this->password) . '@';
}
// 支持多种 host 格式:host:port 或 host1,host2:port
$hosts = $this->host;
if (!str_contains($hosts, ',') && !str_contains($hosts, ':')) {
$hosts = $hosts . ':' . $this->port;
}
$uri = 'mongodb://' . $auth . $hosts;
// 添加数据库和认证源
$query = [];
if (!empty($this->database)) {
$query[] = 'database=' . $this->database;
}
if (!empty($this->authSource)) {
$query[] = 'authSource=' . $this->authSource;
}
if (!empty($query)) {
$uri .= '/?' . implode('&', $query);
}
return $uri;
}
/**
* 构建客户端选项
* @return array
*/
private function buildClientOptions(): array
{
return array_merge([
'connectTimeoutMS' => $this->timeout * 1000,
'serverSelectionTimeoutMS' => $this->timeout * 1000,
'socketTimeoutMS' => $this->timeout * 1000,
], $this->options);
}
}