delete kafka

This commit is contained in:
2021-08-26 15:03:19 +08:00
parent 835300e3c6
commit 8bf0941f2b
17 changed files with 102 additions and 2221 deletions
+34
View File
@@ -0,0 +1,34 @@
# Created by .ignore support plugin (hsz.mobi)
### Yii template
assets/*
!assets/.gitignore
protected/runtime/*
!protected/runtime/.gitignore
protected/data/*.db
themes/classic/views/
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
composer.lock
*.log
commands/result
config/setting.php
tests/
vendor/
runtime/
*.xml
*.lock
oot
d
composer.lock
+17
View File
@@ -0,0 +1,17 @@
<?php
namespace PHPSTORM_META {
// Reflect
use Http\Context\Context;
use Kiri\Di\Container;
override(Container::get(0), map('@'));
override(Container::newObject(0), map('@'));
// override(\Hyperf\Utils\Context::get(0), map('@'));
// override(\make(0), map('@'));
override(\di(0), map('@'));
override(\duplicate(0), map('@'));
override(Context::getContext(0), map('@'));
}
View File
+23
View File
@@ -0,0 +1,23 @@
{
"name": "game-worker/kiri-crontab",
"description": "db",
"authors": [
{
"name": "XiangLin",
"email": "as2252258@163.com"
}
],
"license": "MIT",
"require": {
"php": ">=8.0",
"ext-json": "*",
"psr/event-dispatcher": "^1.0"
},
"autoload": {
"psr-4": {
"Kiri\\Crontab\\": "src/"
}
},
"require-dev": {
}
}
-49
View File
@@ -1,49 +0,0 @@
<?php
namespace Kafka\Annotation;
use Annotation\Attribute;
use Exception;
use Kafka\ConsumerInterface;
use Kafka\KafkaProvider;
use Kiri\Kiri;
/**
* Class Kafka
* @package Annotation
*/
#[\Attribute(\Attribute::TARGET_CLASS)] class Kafka extends Attribute
{
/**
* Kafka constructor.
* @param string $topic
*/
public function __construct(public string $topic)
{
}
/**
* @param mixed $class
* @param mixed|null $method
* @return bool
* @throws Exception
*/
public function execute(mixed $class, mixed $method = null): bool
{
if (!in_array(ConsumerInterface::class, class_implements($class))) {
return false;
}
/** @var KafkaProvider $container */
$container = Kiri::getDi()->get(KafkaProvider::class);
$container->addConsumer($this->topic, $class);
return true;
}
}
File diff suppressed because it is too large Load Diff
-163
View File
@@ -1,163 +0,0 @@
<?php
namespace Kafka;
class Constant
{
const CONFIG_BUILTIN_FEATURES = 'builtin.features';
const CONFIG_CLIENT_ID = 'client.id';
const CONFIG_METADATA_BROKER_LIST = 'metadata.broker.list';
const CONFIG_BOOTSTRAP_SERVERS = 'bootstrap.servers';
const CONFIG_MESSAGE_MAX_BYTES = 'message.max.bytes';
const CONFIG_MESSAGE_COPY_MAX_BYTES = 'message.copy.max.bytes';
const CONFIG_RECEIVE_MESSAGE_MAX_BYTES = 'receive.message.max.bytes';
const CONFIG_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 'max.in.flight.requests.per.connection';
const CONFIG_MAX_IN_FLIGHT = 'max.in.flight';
const CONFIG_TOPIC_METADATA_REFRESH_INTERVAL_MS = 'topic.metadata.refresh.interval.ms';
const CONFIG_METADATA_MAX_AGE_MS = 'metadata.max.age.ms';
const CONFIG_TOPIC_METADATA_REFRESH_FAST_INTERVAL_MS = 'topic.metadata.refresh.fast.interval.ms';
const CONFIG_TOPIC_METADATA_REFRESH_FAST_CNT = 'topic.metadata.refresh.fast.cnt';
const CONFIG_TOPIC_METADATA_REFRESH_SPARSE = 'topic.metadata.refresh.sparse';
const CONFIG_TOPIC_METADATA_PROPAGATION_MAX_MS = 'topic.metadata.propagation.max.ms';
const CONFIG_TOPIC_BLACKLIST = 'topic.blacklist';
const CONFIG_DEBUG = 'debug';
const CONFIG_SOCKET_TIMEOUT_MS = 'socket.timeout.ms';
const CONFIG_SOCKET_BLOCKING_MAX_MS = 'socket.blocking.max.ms';
const CONFIG_SOCKET_SEND_BUFFER_BYTES = 'socket.send.buffer.bytes';
const CONFIG_SOCKET_RECEIVE_BUFFER_BYTES = 'socket.receive.buffer.bytes';
const CONFIG_SOCKET_KEEPALIVE_ENABLE = 'socket.keepalive.enable';
const CONFIG_SOCKET_NAGLE_DISABLE = 'socket.nagle.disable';
const CONFIG_SOCKET_MAX_FAILS = 'socket.max.fails';
const CONFIG_BROKER_ADDRESS_TTL = 'broker.address.ttl';
const CONFIG_BROKER_ADDRESS_FAMILY = 'broker.address.family';
const CONFIG_CONNECTIONS_MAX_IDLE_MS = 'connections.max.idle.ms';
const CONFIG_RECONNECT_BACKOFF_JITTER_MS = 'reconnect.backoff.jitter.ms';
const CONFIG_RECONNECT_BACKOFF_MS = 'reconnect.backoff.ms';
const CONFIG_RECONNECT_BACKOFF_MAX_MS = 'reconnect.backoff.max.ms';
const CONFIG_STATISTICS_INTERVAL_MS = 'statistics.interval.ms';
const CONFIG_ENABLED_EVENTS = 'enabled_events';
const CONFIG_THROTTLE_CB = 'throttle_cb';
const CONFIG_LOG_LEVEL = 'log_level';
const CONFIG_LOG_QUEUE = 'log.queue';
const CONFIG_LOG_THREAD_NAME = 'log.thread.name';
const CONFIG_ENABLE_RANDOM_SEED = 'enable.random.seed';
const CONFIG_LOG_CONNECTION_CLOSE = 'log.connection.close';
const CONFIG_BACKGROUND_EVENT_CB = 'background_event_cb';
const CONFIG_SOCKET_CB = 'socket_cb';
const CONFIG_CONNECT_CB = 'connect_cb';
const CONFIG_CLOSESOCKET_CB = 'closesocket_cb';
const CONFIG_OPEN_CB = 'open_cb';
const CONFIG_OPAQUE = 'opaque';
const CONFIG_DEFAULT_TOPIC_CONF = 'default_topic_conf';
const CONFIG_INTERNAL_TERMINATION_SIGNAL = 'internal.termination.signal';
const CONFIG_API_VERSION_REQUEST = 'api.version.request';
const CONFIG_API_VERSION_REQUEST_TIMEOUT_MS = 'api.version.request.timeout.ms';
const CONFIG_API_VERSION_FALLBACK_MS = 'api.version.fallback.ms';
const CONFIG_BROKER_VERSION_FALLBACK = 'broker.version.fallback';
const CONFIG_SECURITY_PROTOCOL = 'security.protocol';
const CONFIG_SSL_CIPHER_SUITES = 'ssl.cipher.suites';
const CONFIG_SSL_CURVES_LIST = 'ssl.curves.list';
const CONFIG_SSL_SIGALGS_LIST = 'ssl.sigalgs.list';
const CONFIG_SSL_KEY_LOCATION = 'ssl.key.location';
const CONFIG_SSL_KEY_PASSWORD = 'ssl.key.password';
const CONFIG_SSL_KEY_PEM = 'ssl.key.pem';
const CONFIG_SSL_KEY = 'ssl_key';
const CONFIG_SSL_CERTIFICATE_LOCATION = 'ssl.certificate.location';
const CONFIG_SSL_CERTIFICATE_PEM = 'ssl.certificate.pem';
const CONFIG_SSL_CERTIFICATE = 'ssl_certificate';
const CONFIG_SSL_CA_LOCATION = 'ssl.ca.location';
const CONFIG_SSL_CA = 'ssl_ca';
const CONFIG_SSL_CA_CERTIFICATE_STORES = 'ssl.ca.certificate.stores';
const CONFIG_SSL_CRL_LOCATION = 'ssl.crl.location';
const CONFIG_SSL_KEYSTORE_LOCATION = 'ssl.keystore.location';
const CONFIG_SSL_KEYSTORE_PASSWORD = 'ssl.keystore.password';
const CONFIG_SSL_ENGINE_LOCATION = 'ssl.engine.location';
const CONFIG_SSL_ENGINE_ID = 'ssl.engine.id';
const CONFIG_SSL_ENGINE_CALLBACK_DATA = 'ssl_engine_callback_data';
const CONFIG_ENABLE_SSL_CERTIFICATE_VERIFICATION = 'enable.ssl.certificate.verification';
const CONFIG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = 'ssl.endpoint.identification.algorithm';
const CONFIG_SSL_CERTIFICATE_VERIFY_CB = 'ssl.certificate.verify_cb';
const CONFIG_SASL_MECHANISMS = 'sasl.mechanisms';
const CONFIG_SASL_MECHANISM = 'sasl.mechanism';
const CONFIG_SASL_KERBEROS_SERVICE_NAME = 'sasl.kerberos.service.name';
const CONFIG_SASL_KERBEROS_PRINCIPAL = 'sasl.kerberos.principal';
const CONFIG_SASL_KERBEROS_KINIT_CMD = 'sasl.kerberos.kinit.cmd';
const CONFIG_SASL_KERBEROS_KEYTAB = 'sasl.kerberos.keytab';
const CONFIG_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 'sasl.kerberos.min.time.before.relogin';
const CONFIG_SASL_USERNAME = 'sasl.username';
const CONFIG_SASL_PASSWORD = 'sasl.password';
const CONFIG_SASL_OAUTHBEARER_CONFIG = 'sasl.oauthbearer.config';
const CONFIG_ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = 'enable.sasl.oauthbearer.unsecure.jwt';
const CONFIG_OAUTHBEARER_TOKEN_REFRESH_CB = 'oauthbearer_token_refresh_cb';
const CONFIG_PLUGIN_LIBRARY_PATHS = 'plugin.library.paths';
const CONFIG_INTERCEPTORS = 'interceptors';
const CONFIG_GROUP_ID = 'group.id';
const CONFIG_GROUP_INSTANCE_ID = 'group.instance.id';
const CONFIG_PARTITION_ASSIGNMENT_STRATEGY = 'partition.assignment.strategy';
const CONFIG_SESSION_TIMEOUT_MS = 'session.timeout.ms';
const CONFIG_HEARTBEAT_INTERVAL_MS = 'heartbeat.interval.ms';
const CONFIG_GROUP_PROTOCOL_TYPE = 'group.protocol.type';
const CONFIG_COORDINATOR_QUERY_INTERVAL_MS = 'coordinator.query.interval.ms';
const CONFIG_MAX_POLL_INTERVAL_MS = 'max.poll.interval.ms';
const CONFIG_ENABLE_AUTO_COMMIT = 'enable.auto.commit';
const CONFIG_AUTO_COMMIT_INTERVAL_MS = 'auto.commit.interval.ms';
const CONFIG_ENABLE_AUTO_OFFSET_STORE = 'enable.auto.offset.store';
const CONFIG_QUEUED_MIN_MESSAGES = 'queued.min.messages';
const CONFIG_QUEUED_MAX_MESSAGES_KBYTES = 'queued.max.messages.kbytes';
const CONFIG_FETCH_WAIT_MAX_MS = 'fetch.wait.max.ms';
const CONFIG_FETCH_MESSAGE_MAX_BYTES = 'fetch.message.max.bytes';
const CONFIG_MAX_PARTITION_FETCH_BYTES = 'max.partition.fetch.bytes';
const CONFIG_FETCH_MAX_BYTES = 'fetch.max.bytes';
const CONFIG_FETCH_MIN_BYTES = 'fetch.min.bytes';
const CONFIG_FETCH_ERROR_BACKOFF_MS = 'fetch.error.backoff.ms';
const CONFIG_OFFSET_STORE_METHOD = 'offset.store.method';
const CONFIG_ISOLATION_LEVEL = 'isolation.level';
const CONFIG_ENABLE_PARTITION_EOF = 'enable.partition.eof';
const CONFIG_CHECK_CRCS = 'check.crcs';
const CONFIG_ALLOW_AUTO_CREATE_TOPICS = 'allow.auto.create.topics';
const CONFIG_CLIENT_RACK = 'client.rack';
const CONFIG_TRANSACTIONAL_ID = 'transactional.id';
const CONFIG_TRANSACTION_TIMEOUT_MS = 'transaction.timeout.ms';
const CONFIG_ENABLE_IDEMPOTENCE = 'enable.idempotence';
const CONFIG_ENABLE_GAPLESS_GUARANTEE = 'enable.gapless.guarantee';
const CONFIG_QUEUE_BUFFERING_MAX_MESSAGES = 'queue.buffering.max.messages';
const CONFIG_QUEUE_BUFFERING_MAX_KBYTES = 'queue.buffering.max.kbytes';
const CONFIG_QUEUE_BUFFERING_MAX_MS = 'queue.buffering.max.ms';
const CONFIG_LINGER_MS = 'linger.ms';
const CONFIG_MESSAGE_SEND_MAX_RETRIES = 'message.send.max.retries';
const CONFIG_RETRIES = 'retries';
const CONFIG_RETRY_BACKOFF_MS = 'retry.backoff.ms';
const CONFIG_QUEUE_BUFFERING_BACKPRESSURE_THRESHOLD = 'queue.buffering.backpressure.threshold';
const CONFIG_COMPRESSION_CODEC = 'compression.codec';
const CONFIG_COMPRESSION_TYPE = 'compression.type';
const CONFIG_BATCH_NUM_MESSAGES = 'batch.num.messages';
const CONFIG_BATCH_SIZE = 'batch.size';
const CONFIG_DELIVERY_REPORT_ONLY_ERROR = 'delivery.report.only.error';
const CONFIG_DR_CB = 'dr_cb';
const CONFIG_STICKY_PARTITIONING_LINGER_MS = 'sticky.partitioning.linger.ms';
const TOPIC_CONF_REQUEST_REQUIRED_ACKS = 'request.required.acks';
const TOPIC_CONF_ACKS = 'acks';
const TOPIC_CONF_REQUEST_TIMEOUT_MS = 'request.timeout.ms';
const TOPIC_CONF_MESSAGE_TIMEOUT_MS = 'message.timeout.ms';
const TOPIC_CONF_DELIVERY_TIMEOUT_MS = 'delivery.timeout.ms';
const TOPIC_CONF_QUEUING_STRATEGY = 'queuing.strategy';
const TOPIC_CONF_PRODUCE_OFFSET_REPORT = 'produce.offset.report';
const TOPIC_CONF_PARTITIONER = 'partitioner';
const TOPIC_CONF_PARTITIONER_CB = 'partitioner_cb';
const TOPIC_CONF_MSG_ORDER_CMP = 'msg_order_cmp';
const TOPIC_CONF_OPAQUE = 'opaque';
const TOPIC_CONF_COMPRESSION_CODEC = 'compression.codec';
const TOPIC_CONF_COMPRESSION_TYPE = 'compression.type';
const TOPIC_CONF_COMPRESSION_LEVEL = 'compression.level';
const TOPIC_CONF_AUTO_COMMIT_ENABLE = 'auto.commit.enable';
const TOPIC_CONF_ENABLE_AUTO_COMMIT = 'enable.auto.commit';
const TOPIC_CONF_AUTO_COMMIT_INTERVAL_MS = 'auto.commit.interval.ms';
const TOPIC_CONF_AUTO_OFFSET_RESET = 'auto.offset.reset';
const TOPIC_CONF_OFFSET_STORE_PATH = 'offset.store.path';
const TOPIC_CONF_OFFSET_STORE_SYNC_INTERVAL_MS = 'offset.store.sync.interval.ms';
const TOPIC_CONF_OFFSET_STORE_METHOD = 'offset.store.method';
const TOPIC_CONF_CONSUME_CALLBACK_MAX_MESSAGES = 'consume.callback.max.messages';
}
-21
View File
@@ -1,21 +0,0 @@
<?php
declare(strict_types=1);
namespace Kafka;
/**
* Interface ConsumerInterface
* @package App\Kafka
*/
interface ConsumerInterface
{
/**
* @return mixed
*/
public function process(): void;
}
-194
View File
@@ -1,194 +0,0 @@
<?php
declare(strict_types=1);
namespace Kafka;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Exception;
use RdKafka\KafkaConsumer;
use Server\Abstracts\CustomProcess;
use Swoole\Process;
use Throwable;
/**
* Class Queue
* @package Queue
*/
class Kafka extends CustomProcess
{
/**
* @param array $kafkaConfig
*/
public function __construct(public array $kafkaConfig)
{
}
/**
* @param Process $process
* @return string
* @throws ConfigException
*/
public function getProcessName(Process $process): string
{
$name = Config::get('id', 'system') . '[' . $process->pid . ']';
return $name . '.' . 'Kafka Consumer ' . $this->kafkaConfig['topic'];
}
/**
* @param Process $process
* @throws \Exception
*/
public function onHandler(Process $process): void
{
$this->waite($process, $this->kafkaConfig);
}
/**
* @param Process $process
* @param array $kafkaServer
* @throws \Exception
*/
private function waite(Process $process, array $kafkaServer)
{
try {
[$config, $topic, $conf] = $this->kafkaConfig($kafkaServer);
if (empty($config) && empty($topic) && empty($conf)) {
return;
}
$objRdKafka = new Consumer($config);
$topic = $objRdKafka->newTopic($kafkaServer['topic'], $topic);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
do {
if ($this->checkProcessIsStop()) {
$this->exit();
break;
}
$this->resolve($topic, $conf['interval'] ?? 1000);
} while (true);
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');
}
}
/**
* @param ConsumerTopic $topic
* @param $interval
* @throws \Exception
*/
private function resolve(ConsumerTopic $topic, $interval)
{
try {
$message = $topic->consume(0, $interval);
if (!empty($message)) {
if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
$this->handlerExecute($message->topic_name, $message);
} else if ($message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
logger()->warning('No more messages; will wait for more');
} else if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) {
logger()->error('Kafka Timed out');
} else {
logger()->error($message->errstr());
}
}
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');
}
}
/**
* @param $topic
* @param $message
* @throws \Exception
*/
protected function handlerExecute($topic, $message)
{
go(function () use ($topic, $message) {
try {
$server = Kiri::app()->getSwoole();
$setting = $server->setting['worker_num'];
/** @var KafkaProvider $container */
$container = Kiri::getDi()->get(KafkaProvider::class);
$data = $container->getConsumer($topic);
if (!empty($data)) {
$server->sendMessage(new $data(new Struct($topic, $message)), random_int(0, $setting - 1));
}
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');
}
});
}
/**
* @param $kafka
* @return array
* @throws \Exception
*/
private function kafkaConfig($kafka): array
{
try {
$conf = new Configuration();
$conf->setRebalanceCb([$this, 'rebalanced_cb']);
$conf->setGroupId($kafka['groupId']);
$conf->setMetadataBrokerList($kafka['brokers']);
$conf->setSocketTimeoutMs(30000);
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->setInternalTerminationSignal((string)SIGIO);
}
$topicConf = new TopicConfig();
$topicConf->setAutoCommitEnable(true);
$topicConf->setAutoCommitIntervalMs(100);
//smallest:简单理解为从头开始消费,
//largest:简单理解为从最新的开始消费
$topicConf->setAutoOffsetReset('smallest');
$topicConf->setOffsetStorePath('kafka_offset.log');
$topicConf->setOffsetStoreMethod('broker');
return [$conf, $topicConf, $kafka];
} catch (Throwable $exception) {
logger()->addError($exception, 'throwable');
return [null, null, null];
}
}
/**
* @param KafkaConsumer $kafka
* @param $err
* @param array|null $partitions
* @throws Exception
* @throws \Exception
*/
public function rebalanced_cb(KafkaConsumer $kafka, $err, array $partitions = null)
{
if ($err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
$kafka->assign($partitions);
} else if ($err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
$kafka->assign(NULL);
} else {
throw new \Exception($err);
}
}
}
-161
View File
@@ -1,161 +0,0 @@
<?php
namespace Kafka;
use Exception;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Exception\NotFindClassException;
use Kiri\Kiri;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
use ReflectionException;
/**
*
*/
class KafkaClient
{
private Configuration $conf;
private TopicConfig $topicConf;
private bool $isAck = true;
/**
* Producer constructor.
* @param string $topic
* @param string $groupId
* @throws ConfigException
* @throws NotFindClassException
* @throws ReflectionException
*/
public function __construct(public string $topic, public string $groupId)
{
$this->conf = di(Configuration::class);
$this->topicConf = di(TopicConfig::class);
$this->setConfig();
}
/**
* @return TopicConfig
*/
public function getTopicConfig(): TopicConfig
{
return $this->topicConf;
}
/**
* @return Configuration
*/
public function getConfiguration(): Configuration
{
return $this->conf;
}
/**
* @throws ConfigException
*/
private function setConfig()
{
$config = Config::get('kafka.producers.' . $this->topic, null, true);
if (!isset($config['brokers'])) {
throw new ConfigException('Please configure relevant information.');
}
$this->conf->setMetadataBrokerList($config['brokers']);
$this->conf->setGroupId($this->groupId);
$this->conf->setClientId(current(swoole_get_local_ip()));
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
logger()->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
}
/**
* @param string $key
* @param string|array $params
* @param bool $isAck
* @throws Exception
*/
public function push(string $key, string|array $params, bool $isAck = false)
{
$this->sendMessage([$params], $key, $isAck);
}
/**
* @param string|null $key
* @param array $data
* @param bool $isAck
* @throws Exception
*/
public function batch(?string $key, array $data, bool $isAck = false)
{
$this->sendMessage($data, $key, $isAck);
}
/**
* @return Producer
* @throws Exception
*/
private function getProducer(): Producer
{
return Kiri::getDi()->get(Producer::class, [$this->conf]);
}
/**
* @param Producer $producer
* @param $topic
* @param $isAck
* @return ProducerTopic
*/
private function getProducerTopic(Producer $producer, $topic, $isAck): ProducerTopic
{
$this->topicConf->setRequestRequiredAcks($isAck ? '1' : '0');
return $producer->newTopic($topic, $this->topicConf);
}
/**
* @param array $message
* @param string $key
* @param bool $isAck
* @throws Exception
*/
private function sendMessage(array $message, string $key = '', bool $isAck = false)
{
$producer = $this->getProducer();
$producerTopic = $this->getProducerTopic($producer, $this->topic, $isAck);
if ($this->isAck) {
$this->flush($producer);
}
foreach ($message as $value) {
$producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key);
$producer->poll(0);
}
$this->flush($producer);
}
/**
* @param Producer $producer
*/
private function flush(Producer $producer)
{
while ($producer->getOutQLen() > 0) {
$result = $producer->flush(100);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
}
}
-46
View File
@@ -1,46 +0,0 @@
<?php
declare(strict_types=1);
namespace Kafka;
use Exception;
use Http\Server;
use Kiri\Abstracts\Config;
use Kiri\Abstracts\Config as SConfig;
use Kiri\Abstracts\Providers;
use Kiri\Application;
/**
* Class QueueProviders
* @package Queue
*/
class KafkaImports extends Providers
{
/**
* @param Application $application
* @throws Exception
*/
public function onImport(Application $application)
{
if (!extension_loaded('rdkafka')) {
return;
}
$kafka = SConfig::get('kafka', ['enable' => false]);
if (($kafka['enable'] ?? false) == false) {
return;
}
$kafkaServers = Config::get('kafka.consumers', []);
if (empty($kafkaServers)) {
return;
}
/** @var Server $server */
$server = $application->get('server');
foreach ($kafkaServers as $kafkaServer) {
$server->addProcess(new Kafka($kafkaServer));
}
}
}
-43
View File
@@ -1,43 +0,0 @@
<?php
namespace Kafka;
use Kiri\Abstracts\BaseObject;
/**
* Class KafkaProvider
* @package Kafka
*/
class KafkaProvider extends BaseObject
{
private array $_topics = [];
/**
* @param $topic
* @param $handler
*/
public function addConsumer($topic, $handler)
{
if (isset($this->_topics[$topic])) {
return;
}
$this->_topics[$topic] = $handler;
}
/**
* @param string $topic
* @return mixed
*/
public function getConsumer(string $topic): mixed
{
return $this->_topics[$topic] ?? null;
}
}
-115
View File
@@ -1,115 +0,0 @@
<?php
declare(strict_types=1);
namespace Kafka;
use Exception;
use Psr\Log\LoggerInterface;
use Kiri\Kiri;
/**
* Class Logger
* @package Kafka
*/
class Logger implements LoggerInterface
{
/**
* @param mixed $message
* @param array $context
*/
public function emergency(mixed $message, array $context = array())
{
// TODO: Implement emergency() method.
var_dump(func_get_args());
}
/**
* @param string $message
* @param array $context
* @throws Exception
*/
public function alert(mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->debug($message);
}
public function critical(mixed $message, array $context = array())
{
// TODO: Implement critical() method.
var_dump(func_get_args());
}
/**
* @param string $message
* @param array $context
* @throws Exception
*/
public function error(mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->error($message);
}
/**
* @param string $message
* @param array $context
* @throws Exception
*/
public function warning(mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->warning($message);
}
/**
* @param string $message
* @param array $context
* @throws Exception
*/
public function notice(mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->info($message);
}
/**
* @param string $message
* @param array $context
* @throws Exception
*/
public function info(mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->info($message);
}
/**
* @param string $message
* @param array $context
* @throws Exception
*/
public function debug(mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->debug($message);
}
/**
* @param $level
* @param $message
* @param array $context
* @throws Exception
*/
public function log($level, mixed $message, array $context = array())
{
$logger = Kiri::app()->getLogger();
$logger->debug($message);
}
}
-36
View File
@@ -1,36 +0,0 @@
<?php
declare(strict_types=1);
namespace Kafka;
use RdKafka\Message;
class Struct
{
public ?int $offset;
public ?Message $message;
public ?string $topic;
public mixed $value;
public ?int $part;
/**
* Struct constructor.
* @param string $topic
* @param Message $message
*/
public function __construct(string $topic, Message $message)
{
$message->payload = swoole_unserialize($message->payload);
$this->topic = $topic;
$this->offset = $message->offset;
$this->part = $message->partition;
$this->message = $message;
$this->value = $message->payload;
}
}
-287
View File
@@ -1,287 +0,0 @@
<?php
namespace Kafka;
use RdKafka\TopicConf;
/**
*
*/
class TopicConfig extends TopicConf
{
/**
* @param mixed|string $request_required_acks
*
* 此配置是表明当一次produce请求被认为完成时的确认值。特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括:
* 0 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。
* 1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。
* -1producer会获得所有同步replicas都收到数据的确认。同时时延最大,然而,这种方式并没有完全消除丢失消息的风险,因为同步replicas的数量可能是1.如果你想确保某些replicas接收到数据,那么你应该在topic-level设置中选项min.insync.replicas设置一下。请阅读一下设计文档,可以获得更深入的讨论。
*/
public function setRequestRequiredAcks(int $request_required_acks): void
{
$this->set(Constant::TOPIC_CONF_REQUEST_REQUIRED_ACKS, $request_required_acks);
}
/**
* @param mixed|string $acks
*
* producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
* 1acks=0 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
* (2)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
* 3acks=all 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
* (4)其他的设置,例如acks=2也是可以的,这将需要给定的acks数量,但是这种策略一般很少用。
*/
public function setAcks(int $acks): void
{
$this->set(Constant::TOPIC_CONF_ACKS, $acks);
}
/**
* @param mixed|string $request_timeout_ms
*
* broker尽力实现request.required.acks需求时的等待时间,否则会发送错误到客户端
*/
public function setRequestTimeoutMs(int $request_timeout_ms): void
{
$this->set(Constant::TOPIC_CONF_REQUEST_TIMEOUT_MS, $request_timeout_ms);
}
/**
* @param mixed|string $message_timeout_ms
*
* 本地消息超时。此值仅在本地强制执行,并限制生成的消息等待成功传递的时间。0的时间是无限的。
* 这是librdkafka用于传递消息(包括重试)的最长时间。
* 超过重试计数或消息超时时发生传递错误。
* 如果配置了transactional.id,则消息超时将自动调整为transaction.timeout.ms。
*/
public function setMessageTimeoutMs(int $message_timeout_ms): void
{
$this->set(Constant::TOPIC_CONF_MESSAGE_TIMEOUT_MS, $message_timeout_ms);
}
/**
* @param mixed|string $delivery_timeout_ms
*
* Alias for message.timeout.ms: Local message timeout.
* This value is only enforced locally and limits the time a produced message waits for successful delivery.
* A time of 0 is infinite.
* This is the maximum time librdkafka may use to deliver a message (including retries).
* Delivery error occurs when either the retry count or the message timeout are exceeded.
* The message timeout is automatically adjusted to transaction.timeout.ms if transactional.id is configured.
*/
public function setDeliveryTimeoutMs(int $delivery_timeout_ms): void
{
$this->set(Constant::TOPIC_CONF_DELIVERY_TIMEOUT_MS, $delivery_timeout_ms);
}
/**
* @param mixed|string $queuing_strategy
*
* EXPERIMENTAL: subject to change or removal.
* DEPRECATED Producer queuing strategy.
* FIFO preserves produce ordering, while LIFO prioritizes new messages
*/
public function setQueuingStrategy(mixed $queuing_strategy): void
{
$this->set(Constant::TOPIC_CONF_QUEUING_STRATEGY, $queuing_strategy);
}
/**
* @param mixed|string $produce_offset_report
*
* DEPRECATED No longer used.
*/
public function setProduceOffsetReport(bool $produce_offset_report): void
{
$this->set(Constant::TOPIC_CONF_PRODUCE_OFFSET_REPORT, $produce_offset_report);
}
/**
* @param mixed|string $partitioner
*
* Partitioner: random - random distribution,
* consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition),
* consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned),
* murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
* murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned.
* This is functionally equivalent to the default partitioner in the Java Producer.),
* fnv1a - FNV-1a hash of key (NULL keys are mapped to single partition),
* fnv1a_random - FNV-1a hash of key (NULL keys are randomly partitioned).
*/
public function setPartitioner(mixed $partitioner): void
{
$this->set(Constant::TOPIC_CONF_PARTITIONER, $partitioner);
}
/**
* @param mixed|string $partitioner_cb
*
* Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb())
*/
public function setPartitionerCb(mixed $partitioner_cb): void
{
$this->set(Constant::TOPIC_CONF_PARTITIONER_CB, $partitioner_cb);
}
/**
* @param mixed|string $msg_order_cmp
*
* EXPERIMENTAL: subject to change or removal.
* DEPRECATED Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()).
* Also see queuing.strategy.
*
*/
public function setMsgOrderCmp(mixed $msg_order_cmp): void
{
$this->set(Constant::TOPIC_CONF_MSG_ORDER_CMP, $msg_order_cmp);
}
/**
* @param mixed|string $opaque
*
* Application opaque (set with rd_kafka_topic_conf_set_opaque())
*/
public function setOpaque(mixed $opaque): void
{
$this->set(Constant::TOPIC_CONF_OPAQUE, $opaque);
}
/**
* @param mixed|string $compression_codec
*
* Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration.
*/
public function setCompressionCodec(mixed $compression_codec): void
{
$this->set(Constant::TOPIC_CONF_COMPRESSION_CODEC, $compression_codec);
}
/**
* @param mixed|string $compression_type
*
* Alias for compression.codec: compression codec to use for compressing message sets.
* This is the default value for all topics, may be overridden by the topic configuration property compression.codec.
*/
public function setCompressionType(mixed $compression_type): void
{
$this->set(Constant::TOPIC_CONF_COMPRESSION_TYPE, $compression_type);
}
/**
* @param mixed|string $compression_level
*
* Compression level parameter for algorithm selected by configuration property compression.codec.
* Higher values will result in better compression at the cost of more CPU usage.
* Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy;
* -1 = codec-dependent default compression level.
*/
public function setCompressionLevel(int $compression_level): void
{
$this->set(Constant::TOPIC_CONF_COMPRESSION_LEVEL, $compression_level);
}
/**
* @param mixed|string $auto_commit_enable
*
* DEPRECATED [LEGACY PROPERTY: This property is used by the simple legacy consumer only.
* When using the high-level KafkaConsumer, the global enable.auto.commit property must be used instead].
* If true, periodically commit offset of the last message handed to the application.
* This committed offset will be used when the process restarts to pick up where it left off.
* If false, the application will have to call rd_kafka_offset_store() to store an offset (optional).
* Offsets will be written to broker or local file according to offset.store.method.
*/
public function setAutoCommitEnable(bool $auto_commit_enable): void
{
$this->set(Constant::TOPIC_CONF_AUTO_COMMIT_ENABLE, $auto_commit_enable);
}
/**
* @param mixed|string $enable_auto_commit
*
* DEPRECATED Alias for auto.commit.enable: [LEGACY PROPERTY: This property is used by the simple legacy consumer only.
* When using the high-level KafkaConsumer, the global enable.auto.commit property must be used instead].
* If true, periodically commit offset of the last message handed to the application.
* This committed offset will be used when the process restarts to pick up where it left off.
* If false, the application will have to call rd_kafka_offset_store() to store an offset (optional).
* Offsets will be written to broker or local file according to offset.store.method.
*/
public function setEnableAutoCommit(bool $enable_auto_commit): void
{
$this->set(Constant::TOPIC_CONF_ENABLE_AUTO_COMMIT, $enable_auto_commit);
}
/**
* @param mixed|string $auto_commit_interval_ms
*
* [LEGACY PROPERTY: This setting is used by the simple legacy consumer only.
* When using the high-level KafkaConsumer, the global auto.commit.interval.ms property must be used instead].
* The frequency in milliseconds that the consumer offsets are committed (written) to offset storage.
*/
public function setAutoCommitIntervalMs(int $auto_commit_interval_ms): void
{
$this->set(Constant::TOPIC_CONF_AUTO_COMMIT_INTERVAL_MS, $auto_commit_interval_ms);
}
/**
* @param mixed|string $auto_offset_reset
*
* Action to take when there is no initial offset in offset store or the desired offset is out of range:
* 'smallest','earliest' - automatically reset the offset to the smallest offset,
* 'largest','latest' - automatically reset the offset to the largest offset,
* 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
*/
public function setAutoOffsetReset(mixed $auto_offset_reset): void
{
$this->set(Constant::TOPIC_CONF_AUTO_OFFSET_RESET, $auto_offset_reset);
}
/**
* @param mixed|string $offset_store_path
*
* DEPRECATED Path to local file for storing offsets.
* If the path is a directory a filename will be automatically generated in that directory based on the topic and partition.
* File-based offset storage will be removed in a future version.
*/
public function setOffsetStorePath(string $offset_store_path): void
{
$this->set(Constant::TOPIC_CONF_OFFSET_STORE_PATH, $offset_store_path);
}
/**
* @param mixed|string $offset_store_sync_interval_ms
*
* DEPRECATED fsync() interval for the offset file, in milliseconds.
* Use -1 to disable syncing, and 0 for immediate sync after each write.
* File-based offset storage will be removed in a future version.
*/
public function setOffsetStoreSyncIntervalMs(int $offset_store_sync_interval_ms): void
{
$this->set(Constant::TOPIC_CONF_OFFSET_STORE_SYNC_INTERVAL_MS, $offset_store_sync_interval_ms);
}
/**
* @param mixed|string $offset_store_method
*
* DEPRECATED Offset commit store method:
* 'file' - DEPRECATED: local file store (offset.store.path, et.al),
* 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.).
*/
public function setOffsetStoreMethod(mixed $offset_store_method): void
{
$this->set(Constant::TOPIC_CONF_OFFSET_STORE_METHOD, $offset_store_method);
}
/**
* @param mixed|string $consume_callback_max_messages
*
* Maximum number of messages to dispatch in one rd_kafka_consume_callback*()
*/
public function setConsumeCallbackMaxMessages(int $consume_callback_max_messages): void
{
$this->set(Constant::TOPIC_CONF_CONSUME_CALLBACK_MAX_MESSAGES, $consume_callback_max_messages);
}
}
-11
View File
@@ -1,11 +0,0 @@
<?php
return [
'kafka' => [
''
]
];
+28 -29
View File
@@ -22,29 +22,24 @@ class Crontab
public int $loopType = Crontab::LOOP_TYPE_MINUTE;
private int $startTime = 0;
public int $loopTime = 2;
private int|string $year = 2021;
private int|string $month = '*';
private int|string $day = '*';
private int|string $month = 8;
private int|string $hour = '*';
private int|string $day = 25;
private int|string $minute = '*/2';
private int|string $hour = 19;
private int|string $second = '1-30';
private int|string $minute = 02;
private int|string $second = 32;
private int|string $week = '*';
public function __construct()
{
$this->startTime = time();
}
@@ -63,33 +58,30 @@ class Crontab
public function next(): string
{
if ($this->loopType == Crontab::LOOP_TYPE_YEAR) $this->year = '*/' . $this->loopTime;
if ($this->loopType == Crontab::LOOP_TYPE_MONTH) $this->month = '*/' . $this->loopTime;
if ($this->loopType == Crontab::LOOP_TYPE_DAY) $this->day = '*/' . $this->loopTime;
if ($this->loopType == Crontab::LOOP_TYPE_HOUR) $this->hour = '*/' . $this->loopTime;
if ($this->loopType == Crontab::LOOP_TYPE_MINUTE) $this->minute = '*/' . $this->loopTime;
if ($this->loopType == Crontab::LOOP_TYPE_SECOND) $this->second = '*/' . $this->loopTime;
return sprintf('%s-%s-%s %s:%s:%s',
$this->format($this->year, 'Y'),
$this->format($this->month, 'm'),
$this->format($this->day, 'd'),
$this->format($this->hour, 'H'),
$this->format($this->minute, 'i'),
$this->format($this->second, 's')
$time = time();
return sprintf('%s-%s-%s %s:%s:%s %s',
date('Y'),
$this->format($time, $this->month, 'm', 'month'),
$this->format($time, $this->day, 'd', 'day'),
$this->format($time, $this->hour, 'H', 'hour'),
$this->format($time, $this->minute, 'i', 'minute'),
$this->format($time, $this->second, 's', 'second'),
$this->format($time, $this->week, 'N'),
);
}
/**
* @param int $startTime
* @param string $text
* @param string $match
* @param string|null $format
* @return string
*/
private function format(string $text, string $match): string
private function format(int &$startTime, string $text, string $match, ?string $format = null): string
{
$time = date($match);
if ($text == '*') {
if ($text == '*' || $text == '*/1') {
return $time;
}
if (str_contains($text, ',')) {
@@ -103,13 +95,13 @@ class Crontab
if (str_contains($text, '-')) {
$explode = explode('-', $text);
if ($time >= $explode[0] && $time <= $explode[1]) {
return intval($time) + 1;
return intval($time);
}
return '^';
}
if (str_contains($text, '/')) {
$explode = explode('/', $text);
if ($time % $this->loopTime !== 0) {
if ($time % $explode[1] !== 0) {
return '^';
}
if ($explode[0] != '*') {
@@ -123,6 +115,13 @@ class Crontab
}
//$date = date('Y-m-d H:i:s');
//var_dump(date('Y-m-d H:i:s', strtotime('+10 month', strtotime($date))));
//var_dump(date('Y-m-d H:i:s', strtotime('+10 day', strtotime($date))));
//var_dump(date('Y-m-d H:i:s', strtotime('+10 hour', strtotime($date))));
//var_dump(date('Y-m-d H:i:s', strtotime('+10 minute', strtotime($date))));
//var_dump(date('Y-m-d H:i:s', strtotime('+10 second', strtotime($date))));
//var_dump(date('Y-m-d H:i:s', strtotime('+10 week', strtotime($date))));
$c = new Crontab();