变更
This commit is contained in:
@@ -2,8 +2,10 @@
|
||||
|
||||
namespace Database;
|
||||
|
||||
use Co\Channel;
|
||||
use Exception;
|
||||
use Kiri\Di\LocalService;
|
||||
use Swoole\Process;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
@@ -69,10 +71,14 @@ class BackupCommand extends Command
|
||||
|
||||
$path = $input->getArgument('path');
|
||||
|
||||
$data = $input->getOption('data');
|
||||
|
||||
if (!is_dir($path)) {
|
||||
mkdir($path);
|
||||
}
|
||||
|
||||
|
||||
$processes = [];
|
||||
foreach ($table as $value) {
|
||||
$tableInfo = $data->createCommand('show create table `' . $data->database . '`.`' . $value . '`')->one();
|
||||
|
||||
@@ -90,6 +96,17 @@ class BackupCommand extends Command
|
||||
}
|
||||
|
||||
file_put_contents($tmp, $tableCreator . ';' . PHP_EOL . PHP_EOL, FILE_APPEND);
|
||||
|
||||
if ($data == 1) {
|
||||
$process = new Process(fn(Process $process) => $this->writeData($process, $database, $value), false,
|
||||
2, true);
|
||||
$process->start();
|
||||
|
||||
$processes[] = $process;
|
||||
}
|
||||
}
|
||||
foreach ($processes as $process) {
|
||||
Process::wait();
|
||||
}
|
||||
} catch (\Throwable $throwable) {
|
||||
$output->writeln($throwable->getMessage());
|
||||
@@ -98,4 +115,63 @@ class BackupCommand extends Command
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Process $process
|
||||
* @param string $dbname
|
||||
* @param string $value
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function writeData(Process $process, string $dbname, string $tableName, string $path): void
|
||||
{
|
||||
$offset = 0;
|
||||
$size = 1000;
|
||||
|
||||
$channel = new Channel(200);
|
||||
for ($i = 0; $i < $channel->length(); $i++) {
|
||||
go(function () use ($channel, $path, $dbname, $tableName) {
|
||||
while ($channel->errCode != SWOOLE_CHANNEL_CLOSED) {
|
||||
$value = $channel->pop();
|
||||
|
||||
$value = $this->toSql($dbname, $tableName, $value);
|
||||
|
||||
file_put_contents($path, $value . PHP_EOL, FILE_APPEND);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** @var Connection $database */
|
||||
$database = \Kiri::service()->get($dbname);
|
||||
while (true) {
|
||||
$data = $database->createCommand("SELECT * FROM $tableName LIMIT $offset,$size")->all();
|
||||
$channel->push($data);
|
||||
if (count($data) < $size) {
|
||||
break;
|
||||
}
|
||||
$offset += $size;
|
||||
}
|
||||
|
||||
$channel->close();
|
||||
|
||||
$process->exit(0);
|
||||
}
|
||||
|
||||
|
||||
public function toSql(string $dbname, string $value, array $data): string
|
||||
{
|
||||
$strings = ['INSERT INTO ' . $dbname . '.' . $value];
|
||||
foreach ($data as $datum) {
|
||||
|
||||
if (count($strings) == 1) {
|
||||
$keys = array_keys($datum);
|
||||
$strings[] = '(' . implode(',', $keys) . ') VALUES';
|
||||
} else {
|
||||
$keys = array_values($datum);
|
||||
$strings[] = '(' . implode(',', $keys) . '),';
|
||||
}
|
||||
}
|
||||
return implode('', $strings);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user