This commit is contained in:
2021-08-05 16:26:51 +08:00
parent 64df33b1f2
commit 6f9d48f779
+4 -3
View File
@@ -127,14 +127,15 @@ class Kafka implements CustomProcess
$setting = $server->setting['worker_num'];
/** @var KafkaProvider $container */
$container = Snowflake::app()->get('kafka-container');
$handler = $container->getConsumer($topic);
if (!empty($handler)) {
$data = new Struct($topic, $message);
/** @var ConsumerInterface $data */
$data = new $handler();
$data->setParams(new Struct($topic, $message));
$server->sendMessage(new $handler($data), random_int(0, $setting - 1));
$server->sendMessage($data, random_int(0, $setting - 1));
}
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');