418 lines
10 KiB
PHP
418 lines
10 KiB
PHP
<?php
|
|
/**
|
|
* Created by PhpStorm.
|
|
* User: whwyy
|
|
* Date: 2024/12/19
|
|
* Time: 14:00
|
|
*/
|
|
declare(strict_types=1);
|
|
|
|
namespace Kiri\NoSql;
|
|
|
|
use Kiri;
|
|
use Kiri\Exception\RedisConnectException;
|
|
use Kiri\Pool\Pool;
|
|
use MongoDB\Client;
|
|
use MongoDB\Database;
|
|
use MongoDB\Collection;
|
|
|
|
use function config;
|
|
|
|
/**
|
|
* Class MongoDB
|
|
* @package Kiri\NoSql
|
|
* @mixin Client
|
|
* @mixin Database
|
|
* @mixin Collection
|
|
*/
|
|
class MongoDB
|
|
{
|
|
|
|
public string $host = 'localhost';
|
|
public int $port = 27017;
|
|
public string $database = '';
|
|
public string $username = '';
|
|
public string $password = '';
|
|
public string $authSource = 'admin';
|
|
public int $timeout = 30;
|
|
public array $options = [];
|
|
|
|
/**
|
|
* @var array|int[]
|
|
*/
|
|
public array $pool = ['min' => 1, 'max' => 100];
|
|
|
|
|
|
/**
|
|
* 初始化
|
|
*/
|
|
public function __construct()
|
|
{
|
|
Kiri::configure($this, config('mongodb', []));
|
|
}
|
|
|
|
/**
|
|
* @return void
|
|
* @throws
|
|
*/
|
|
public function init(): void
|
|
{
|
|
}
|
|
|
|
|
|
/**
|
|
* @param $name
|
|
* @param $arguments
|
|
* @return mixed
|
|
* @throws
|
|
*/
|
|
public function __call($name, $arguments): mixed
|
|
{
|
|
if (method_exists($this, $name)) {
|
|
return $this->{$name}(...$arguments);
|
|
} else {
|
|
return $this->proxy($name, $arguments);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* 获取数据库实例
|
|
* @param string|null $database
|
|
* @return Database
|
|
* @throws
|
|
*/
|
|
public function getDatabase(?string $database = null): Database
|
|
{
|
|
$dbName = $database ?? $this->database;
|
|
if (empty($dbName)) {
|
|
throw new RedisConnectException('MongoDB database name is required.');
|
|
}
|
|
return $this->getClient()->selectDatabase($dbName);
|
|
}
|
|
|
|
|
|
/**
|
|
* 获取集合实例
|
|
* @param string $collection
|
|
* @param string|null $database
|
|
* @return Collection
|
|
* @throws
|
|
*/
|
|
public function getCollection(string $collection, ?string $database = null): Collection
|
|
{
|
|
return $this->getDatabase($database)->selectCollection($collection);
|
|
}
|
|
|
|
|
|
/**
|
|
* 代理方法调用到 MongoDB Client,内置连接健康检查和回收
|
|
* 异常时关闭连接并回退计数器,防止断连对象污染连接池
|
|
* @param $name
|
|
* @param $arguments
|
|
* @return mixed
|
|
* @throws
|
|
*/
|
|
public function proxy($name, $arguments): mixed
|
|
{
|
|
$client = $this->getClient();
|
|
try {
|
|
// 如果方法存在于 Client,直接调用
|
|
if (method_exists($client, $name)) {
|
|
return $client->{$name}(...$arguments);
|
|
}
|
|
|
|
// 如果方法存在于 Database,通过默认数据库调用
|
|
$database = $this->getDatabase();
|
|
if (method_exists($database, $name)) {
|
|
$result = $database->{$name}(...$arguments);
|
|
$this->returnClient($client);
|
|
return $result;
|
|
}
|
|
|
|
throw new \BadMethodCallException("Method {$name} does not exist on MongoDB Client or Database.");
|
|
} catch (\Throwable $throwable) {
|
|
\Kiri::getLogger()->json_log($throwable);
|
|
|
|
$this->closeClient($client);
|
|
|
|
return false;
|
|
}
|
|
$this->returnClient($client);
|
|
}
|
|
|
|
|
|
/**
|
|
* 归还连接
|
|
* @param Client $client
|
|
* @return void
|
|
*/
|
|
private function returnClient(Client $client): void
|
|
{
|
|
try {
|
|
$this->pool()->push($this->getName(), $client);
|
|
} catch (\Throwable) {
|
|
$this->closeClient($client);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* 关闭连接并回退计数器
|
|
* @param Client $client
|
|
* @return void
|
|
*/
|
|
private function closeClient(Client $client): void
|
|
{
|
|
try {
|
|
$client->close();
|
|
} catch (\Throwable) {
|
|
}
|
|
$this->pool()->abandon($this->getName());
|
|
}
|
|
|
|
|
|
/**
|
|
* 执行 MongoDB 命令
|
|
* @param array|object $command
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return array|object
|
|
* @throws
|
|
*/
|
|
public function command(array|object $command, array $options = [], ?string $database = null): array|object
|
|
{
|
|
$db = $this->getDatabase($database);
|
|
return $db->command($command, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* 插入文档
|
|
* @param string $collection
|
|
* @param array|object $document
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return \MongoDB\InsertOneResult|\MongoDB\InsertManyResult
|
|
* @throws
|
|
*/
|
|
public function insert(string $collection, array|object $document, array $options = [], ?string $database = null)
|
|
{
|
|
$coll = $this->getCollection($collection, $database);
|
|
if (is_array($document) && isset($document[0]) && is_array($document[0])) {
|
|
// 批量插入
|
|
return $coll->insertMany($document, $options);
|
|
}
|
|
// 单条插入
|
|
return $coll->insertOne($document, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* 查找文档
|
|
* @param string $collection
|
|
* @param array $filter
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return \MongoDB\Driver\Cursor
|
|
* @throws
|
|
*/
|
|
public function find(string $collection, array $filter = [], array $options = [], ?string $database = null)
|
|
{
|
|
$coll = $this->getCollection($collection, $database);
|
|
return $coll->find($filter, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* 查找单个文档
|
|
* @param string $collection
|
|
* @param array $filter
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return array|object|null
|
|
* @throws
|
|
*/
|
|
public function findOne(string $collection, array $filter = [], array $options = [], ?string $database = null): array|object|null
|
|
{
|
|
$coll = $this->getCollection($collection, $database);
|
|
return $coll->findOne($filter, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* 更新文档
|
|
* @param string $collection
|
|
* @param array $filter
|
|
* @param array $update
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return \MongoDB\UpdateResult
|
|
* @throws
|
|
*/
|
|
public function update(string $collection, array $filter, array $update, array $options = [], ?string $database = null)
|
|
{
|
|
$coll = $this->getCollection($collection, $database);
|
|
if (isset($options['multi']) && $options['multi']) {
|
|
unset($options['multi']);
|
|
return $coll->updateMany($filter, $update, $options);
|
|
}
|
|
return $coll->updateOne($filter, $update, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* 删除文档
|
|
* @param string $collection
|
|
* @param array $filter
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return \MongoDB\DeleteResult
|
|
* @throws
|
|
*/
|
|
public function delete(string $collection, array $filter, array $options = [], ?string $database = null)
|
|
{
|
|
$coll = $this->getCollection($collection, $database);
|
|
if (isset($options['limit']) && $options['limit'] == 1) {
|
|
unset($options['limit']);
|
|
return $coll->deleteOne($filter, $options);
|
|
}
|
|
return $coll->deleteMany($filter, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* 统计文档数量
|
|
* @param string $collection
|
|
* @param array $filter
|
|
* @param array $options
|
|
* @param string|null $database
|
|
* @return int
|
|
* @throws
|
|
*/
|
|
public function count(string $collection, array $filter = [], array $options = [], ?string $database = null): int
|
|
{
|
|
$coll = $this->getCollection($collection, $database);
|
|
return $coll->countDocuments($filter, $options);
|
|
}
|
|
|
|
|
|
/**
|
|
* @return void
|
|
* @throws
|
|
*/
|
|
public function destroy(): void
|
|
{
|
|
$this->pool()->close($this->getName());
|
|
}
|
|
|
|
|
|
/**
|
|
* 获取 MongoDB 客户端
|
|
* @return Client
|
|
* @throws
|
|
*/
|
|
private function getClient(): Client
|
|
{
|
|
return $this->pool()->get($this->getName());
|
|
}
|
|
|
|
|
|
/**
|
|
* @return Pool
|
|
* @throws
|
|
*/
|
|
protected function pool(): Pool
|
|
{
|
|
$pool = Kiri::getPool();
|
|
if (!$pool->hasChannel($this->getName())) {
|
|
$pool->created($this->getName(), $this->pool['max'], [$this, 'connect']);
|
|
}
|
|
return $pool;
|
|
}
|
|
|
|
|
|
/**
|
|
* @return string
|
|
*/
|
|
private function getName(): string
|
|
{
|
|
return 'mongodb.' . $this->host . '.' . $this->database;
|
|
}
|
|
|
|
|
|
/**
|
|
* 创建 MongoDB 连接
|
|
* @return Client
|
|
* @throws
|
|
*/
|
|
protected function connect(): Client
|
|
{
|
|
$uri = $this->buildUri();
|
|
$clientOptions = $this->buildClientOptions();
|
|
|
|
try {
|
|
$client = new Client($uri, $clientOptions);
|
|
|
|
// 测试连接
|
|
$client->selectDatabase($this->database)->command(['ping' => 1]);
|
|
|
|
return $client;
|
|
} catch (\Throwable $e) {
|
|
\Kiri::getLogger()->json_log($e);
|
|
|
|
throw new RedisConnectException(sprintf('MongoDB Connect %s Fail: %s', $uri, $e->getMessage()));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* 构建 MongoDB 连接 URI
|
|
* @return string
|
|
*/
|
|
private function buildUri(): string
|
|
{
|
|
$auth = '';
|
|
if (!empty($this->username) && !empty($this->password)) {
|
|
$auth = $this->username . ':' . urlencode($this->password) . '@';
|
|
}
|
|
|
|
// 支持多种 host 格式:host:port 或 host1,host2:port
|
|
$hosts = $this->host;
|
|
if (!str_contains($hosts, ',') && !str_contains($hosts, ':')) {
|
|
$hosts = $hosts . ':' . $this->port;
|
|
}
|
|
|
|
$uri = 'mongodb://' . $auth . $hosts;
|
|
|
|
// 添加数据库和认证源
|
|
$query = [];
|
|
if (!empty($this->database)) {
|
|
$query[] = 'database=' . $this->database;
|
|
}
|
|
if (!empty($this->authSource)) {
|
|
$query[] = 'authSource=' . $this->authSource;
|
|
}
|
|
|
|
if (!empty($query)) {
|
|
$uri .= '/?' . implode('&', $query);
|
|
}
|
|
|
|
return $uri;
|
|
}
|
|
|
|
|
|
/**
|
|
* 构建客户端选项
|
|
* @return array
|
|
*/
|
|
private function buildClientOptions(): array
|
|
{
|
|
return array_merge([
|
|
'connectTimeoutMS' => $this->timeout * 1000,
|
|
'serverSelectionTimeoutMS' => $this->timeout * 1000,
|
|
'socketTimeoutMS' => $this->timeout * 1000,
|
|
], $this->options);
|
|
}
|
|
}
|