This commit is contained in:
2021-08-05 16:04:59 +08:00
parent 300a016dd2
commit e03f8427ed
+5 -6
View File
@@ -11,6 +11,7 @@ use RdKafka\Exception;
use RdKafka\KafkaConsumer;
use RdKafka\TopicConf;
use ReflectionException;
use Server\ServerManager;
use Server\SInterface\CustomProcess;
use Snowflake\Abstracts\Config;
use Snowflake\Exception\ConfigException;
@@ -133,13 +134,11 @@ class Kafka implements CustomProcess
$container = Snowflake::app()->get('kafka-container');
$handler = $container->getConsumer($topic);
if (empty($handler)) {
return;
if (!empty($handler)) {
$data = new Struct($topic, $message);
$server->sendMessage(new $handler($data), random_int(0, $setting - 1));
}
$message = swoole_serialize(['action' => 'kafka', 'handler' => $handler, 'body' => [$topic, $message]]);
$server->sendMessage($message, random_int(0, $setting - 1));
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');
}