This commit is contained in:
2021-08-05 16:47:59 +08:00
parent c75ac94d68
commit d8e7b88811
3 changed files with 32 additions and 18 deletions
-7
View File
@@ -12,13 +12,6 @@ interface ConsumerInterface
{
/**
* @param Struct $struct
*/
public function setParams(Struct $struct): void;
/**
* @return mixed
*/
+1 -11
View File
@@ -124,17 +124,7 @@ class Kafka implements CustomProcess
$setting = $server->setting['worker_num'];
$container = Snowflake::app()->get('kafka-container');
$handler = $container->getConsumer($topic);
var_dump($container);
if (!empty($handler)) {
/** @var ConsumerInterface $data */
$data = new $handler();
$data->setParams(new Struct($topic, $message));
$server->sendMessage($data, random_int(0, $setting - 1));
}
$server->sendMessage(new Message(new Struct($topic, $message)), random_int(0, $setting - 1));
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');
}
+31
View File
@@ -0,0 +1,31 @@
<?php
namespace Kafka;
use Server\SInterface\PipeMessage;
/**
*
*/
class Message implements ConsumerInterface, PipeMessage
{
/**
* @param Struct $struct
*/
public function __construct(public Struct $struct)
{
}
/**
*
*/
public function process(): void
{
// TODO: Implement process() method.
}
}