1, 'max' => 100]; /** * 初始化 */ public function __construct() { Kiri::configure($this, config('mongodb', [])); } /** * @return void * @throws */ public function init(): void { } /** * @param $name * @param $arguments * @return mixed * @throws */ public function __call($name, $arguments): mixed { if (method_exists($this, $name)) { return $this->{$name}(...$arguments); } else { return $this->proxy($name, $arguments); } } /** * 获取数据库实例 * @param string|null $database * @return Database * @throws */ public function getDatabase(?string $database = null): Database { $dbName = $database ?? $this->database; if (empty($dbName)) { throw new RedisConnectException('MongoDB database name is required.'); } return $this->getClient()->selectDatabase($dbName); } /** * 获取集合实例 * @param string $collection * @param string|null $database * @return Collection * @throws */ public function getCollection(string $collection, ?string $database = null): Collection { return $this->getDatabase($database)->selectCollection($collection); } /** * 代理方法调用到 MongoDB Client * @param $name * @param $arguments * @return mixed * @throws */ public function proxy($name, $arguments): mixed { $client = $this->getClient(); try { // 如果方法存在于 Client,直接调用 if (method_exists($client, $name)) { return $client->{$name}(...$arguments); } // 如果方法存在于 Database,通过默认数据库调用 $database = $this->getDatabase(); if (method_exists($database, $name)) { return $database->{$name}(...$arguments); } throw new \BadMethodCallException("Method {$name} does not exist on MongoDB Client or Database."); } catch (\Throwable $throwable) { \Kiri::getLogger()->json_log($throwable); return false; } finally { // MongoDB 连接是持久的,不需要释放 $this->pool()->push($this->getName(), $client); } } /** * 执行 MongoDB 命令 * @param array|object $command * @param array $options * @param string|null $database * @return array|object * @throws */ public function command(array|object $command, array $options = [], ?string $database = null): array|object { $db = $this->getDatabase($database); return $db->command($command, $options); } /** * 插入文档 * @param string $collection * @param array|object $document * @param array $options * @param string|null $database * @return \MongoDB\InsertOneResult|\MongoDB\InsertManyResult * @throws */ public function insert(string $collection, array|object $document, array $options = [], ?string $database = null) { $coll = $this->getCollection($collection, $database); if (is_array($document) && isset($document[0]) && is_array($document[0])) { // 批量插入 return $coll->insertMany($document, $options); } // 单条插入 return $coll->insertOne($document, $options); } /** * 查找文档 * @param string $collection * @param array $filter * @param array $options * @param string|null $database * @return \MongoDB\Driver\Cursor * @throws */ public function find(string $collection, array $filter = [], array $options = [], ?string $database = null) { $coll = $this->getCollection($collection, $database); return $coll->find($filter, $options); } /** * 查找单个文档 * @param string $collection * @param array $filter * @param array $options * @param string|null $database * @return array|object|null * @throws */ public function findOne(string $collection, array $filter = [], array $options = [], ?string $database = null): array|object|null { $coll = $this->getCollection($collection, $database); return $coll->findOne($filter, $options); } /** * 更新文档 * @param string $collection * @param array $filter * @param array $update * @param array $options * @param string|null $database * @return \MongoDB\UpdateResult * @throws */ public function update(string $collection, array $filter, array $update, array $options = [], ?string $database = null) { $coll = $this->getCollection($collection, $database); if (isset($options['multi']) && $options['multi']) { unset($options['multi']); return $coll->updateMany($filter, $update, $options); } return $coll->updateOne($filter, $update, $options); } /** * 删除文档 * @param string $collection * @param array $filter * @param array $options * @param string|null $database * @return \MongoDB\DeleteResult * @throws */ public function delete(string $collection, array $filter, array $options = [], ?string $database = null) { $coll = $this->getCollection($collection, $database); if (isset($options['limit']) && $options['limit'] == 1) { unset($options['limit']); return $coll->deleteOne($filter, $options); } return $coll->deleteMany($filter, $options); } /** * 统计文档数量 * @param string $collection * @param array $filter * @param array $options * @param string|null $database * @return int * @throws */ public function count(string $collection, array $filter = [], array $options = [], ?string $database = null): int { $coll = $this->getCollection($collection, $database); return $coll->countDocuments($filter, $options); } /** * @return void * @throws */ public function destroy(): void { $this->pool()->close($this->getName()); } /** * 获取 MongoDB 客户端 * @return Client * @throws */ private function getClient(): Client { return $this->pool()->get($this->getName()); } /** * @return Pool * @throws */ protected function pool(): Pool { $pool = Kiri::getPool(); if (!$pool->hasChannel($this->getName())) { $pool->created($this->getName(), $this->pool['max'], [$this, 'connect']); } return $pool; } /** * @return string */ private function getName(): string { return 'mongodb.' . $this->host . '.' . $this->database; } /** * 创建 MongoDB 连接 * @return Client * @throws */ protected function connect(): Client { $uri = $this->buildUri(); $clientOptions = $this->buildClientOptions(); try { $client = new Client($uri, $clientOptions); // 测试连接 $client->selectDatabase($this->database)->command(['ping' => 1]); return $client; } catch (\Throwable $e) { \Kiri::getLogger()->json_log($e); throw new RedisConnectException(sprintf('MongoDB Connect %s Fail: %s', $uri, $e->getMessage())); } } /** * 构建 MongoDB 连接 URI * @return string */ private function buildUri(): string { $auth = ''; if (!empty($this->username) && !empty($this->password)) { $auth = $this->username . ':' . urlencode($this->password) . '@'; } // 支持多种 host 格式:host:port 或 host1,host2:port $hosts = $this->host; if (!str_contains($hosts, ',') && !str_contains($hosts, ':')) { $hosts = $hosts . ':' . $this->port; } $uri = 'mongodb://' . $auth . $hosts; // 添加数据库和认证源 $query = []; if (!empty($this->database)) { $query[] = 'database=' . $this->database; } if (!empty($this->authSource)) { $query[] = 'authSource=' . $this->authSource; } if (!empty($query)) { $uri .= '/?' . implode('&', $query); } return $uri; } /** * 构建客户端选项 * @return array */ private function buildClientOptions(): array { return array_merge([ 'connectTimeoutMS' => $this->timeout * 1000, 'serverSelectionTimeoutMS' => $this->timeout * 1000, 'socketTimeoutMS' => $this->timeout * 1000, ], $this->options); } }