diff --git a/HttpServer/Server.php b/HttpServer/Server.php index 416bf802..4af45453 100644 --- a/HttpServer/Server.php +++ b/HttpServer/Server.php @@ -16,10 +16,12 @@ use HttpServer\Service\Websocket; use Exception; use ReflectionException; use Snowflake\Abstracts\Config; +use Snowflake\Core\Json; use Snowflake\Event; use Snowflake\Exception\ConfigException; use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; +use Swoole\Process; use Swoole\Runtime; /** @@ -63,14 +65,18 @@ class Server extends Application private array $process = []; + private array $params = []; + /** * @param $name * @param $process + * @param array $pramas */ - public function addProcess($name, $process) + public function addProcess($name, $process, $pramas = []) { $this->process[$name] = $process; + $this->params[$name] = $pramas; } @@ -241,6 +247,9 @@ class Server extends Application continue; } $system = new $process(Snowflake::app(), $name, $is_enable_coroutine); + if (isset($this->params[$name])) { + $system->write(Json::encode($this->params[$name])); + } $this->baseServer->addProcess($system); $application->set($process, $system); } diff --git a/Kafka/Kafka.php b/Kafka/Kafka.php index 612a4b19..376498b4 100644 --- a/Kafka/Kafka.php +++ b/Kafka/Kafka.php @@ -32,25 +32,13 @@ class Kafka extends \Snowflake\Process\Process /** * @param Process $process - * @throws ConfigException * @throws \Exception */ public function onHandler(Process $process) { $this->channelListener(); - $waite = new WaitGroup(); - $kafkaServers = SConfig::get('kafka.servers'); - foreach ($kafkaServers as $kafkaServer) { - $waite->add(); - go(function () use ($kafkaServer, $waite) { - defer(function () use ($waite) { - $waite->done(); - }); - $this->waite($kafkaServer); - }); - } - $waite->wait(); + $this->waite(json_decode($process->read(), true)); } diff --git a/Kafka/KafkaProviders.php b/Kafka/KafkaProviders.php index 860dbe3f..dd744a7e 100644 --- a/Kafka/KafkaProviders.php +++ b/Kafka/KafkaProviders.php @@ -6,6 +6,7 @@ namespace Kafka; use Exception; use HttpServer\Server; +use Snowflake\Abstracts\Config; use Snowflake\Abstracts\Config as SConfig; use Snowflake\Abstracts\Providers; use Snowflake\Application; @@ -34,7 +35,15 @@ class KafkaProviders extends Providers if (!extension_loaded('rdkafka')) { return; } - $server->addProcess('kafka', Kafka::class); + + $kafkaServers = Config::get('kafka.servers', false, []); + if (empty($kafkaServers)) { + return; + } + + foreach ($kafkaServers as $index => $kafkaServer) { + $server->addProcess('kafka_' . $index, Kafka::class, $kafkaServer); + } } }