KafkaAdapter
class KafkaAdapter extends QueueAdapter (View source)
Constants
| EXIT_SUCCESS |
|
| EXIT_ERROR |
|
| EXIT_MEMORY_LIMIT |
|
Properties
| protected float | $start_time | Define the start time |
from QueueAdapter |
| protected float | $processing_timeout | Define the processing timeout |
from QueueAdapter |
| protected int | $timeout | Define the work time out |
from QueueAdapter |
| protected string | $queue | Determine the default watch name |
from QueueAdapter |
| protected int | $tries | The number of working attempts |
from QueueAdapter |
| protected int | $sleep | Define the sleep time |
from QueueAdapter |
| static protected bool | $suppressLogging | Whether to suppress logging (useful for testing) |
from QueueAdapter |
| protected Producer|null | $producer | ||
| protected Consumer|null | $consumer | ||
| protected array | $config | ||
| protected string | $topic | ||
| protected string | $group_id |
Methods
Enable or disable logging suppression
Configure the adapter
Update the processing timeout
Run the worker to consume tasks
Set the queue/topic name
Get the queue size
Flush the queue
Initialize the Kafka producer
Initialize the Kafka consumer
Get broker list from config
Process a consumed message
Get topic configuration
Details
static void
suppressLogging(bool $suppress = true)
Enable or disable logging suppression
QueueAdapter
configure(array $config)
Configure the adapter
bool
push(QueueTask $task)
Push a new task onto the queue
string
serializeProducer(QueueTask $task)
Create task serialization
QueueTask
unserializeProducer(string $task)
Create task unserialize
void
sleep(int $seconds)
Sleep the process
void
setTimeout(int $timeout)
Set worker timeout
void
updateProcessingTimeout(int|null $timeout = null)
Update the processing timeout
final void
work(int $timeout, int $memory)
Launch the worker
protected bool
supportsAsyncSignals()
Determine if "async" signals are supported.
protected void
listenForSignals()
Enable async signals for the process.
void
run(string|null $queue = null)
Run the worker to consume tasks
protected bool
timeoutReached(int $timeout)
Determine if the timeout is reached
void
kill(int $status = 0)
Kill the process.
void
setTries(int $tries)
Set task tries
int
getTries()
Get task tries
void
setSleep(int $sleep)
Set sleep time
string
getQueue(string|null $queue = null)
Get the queue or return the default.
void
setQueue(string $queue)
Set the queue/topic name
int
size(string|null $queue = null)
Get the queue size
void
flush(string|null $queue = null)
Flush the queue
protected void
logError(Throwable $exception)
Log an error
final protected string
generateId()
Generate the task id
protected void
logProcessingTask(QueueTask $task)
Log processing task
protected void
logProcessedTask(QueueTask $task)
Log processed task
protected void
initProducer()
Initialize the Kafka producer
protected void
initConsumer()
Initialize the Kafka consumer
protected string
getBrokers()
Get broker list from config
protected void
processMessage(Message $message)
Process a consumed message
protected TopicConf
getTopicConf()
Get topic configuration