class KafkaAdapter extends QueueAdapter (View source)

Constants

EXIT_SUCCESS

EXIT_ERROR

EXIT_MEMORY_LIMIT

Properties

protected float $start_time

Define the start time

from  QueueAdapter
protected float $processing_timeout

Define the processing timeout

from  QueueAdapter
protected int $timeout

Define the work time out

from  QueueAdapter
protected string $queue

Determine the default watch name

from  QueueAdapter
protected int $tries

The number of working attempts

from  QueueAdapter
protected int $sleep

Define the sleep time

from  QueueAdapter
static protected bool $suppressLogging

Whether to suppress logging (useful for testing)

from  QueueAdapter
protected Producer|null $producer
protected Consumer|null $consumer
protected array $config
protected string $topic
protected string $group_id

Methods

static void
suppressLogging(bool $suppress = true)

Enable or disable logging suppression

configure(array $config)

Configure the adapter

bool
push(QueueTask $task)

Push a new task onto the queue

string
serializeProducer(QueueTask $task)

Create task serialization

unserializeProducer(string $task)

Create task unserialize

void
sleep(int $seconds)

Sleep the process

void
setTimeout(int $timeout)

Set worker timeout

void
updateProcessingTimeout(int|null $timeout = null)

Update the processing timeout

void
work(int $timeout, int $memory)

Launch the worker

bool
supportsAsyncSignals()

Determine if "async" signals are supported.

void
listenForSignals()

Enable async signals for the process.

void
run(string|null $queue = null)

Run the worker to consume tasks

bool
timeoutReached(int $timeout)

Determine if the timeout is reached

void
kill(int $status = 0)

Kill the process.

void
setTries(int $tries)

Set task tries

int
getTries()

Get task tries

void
setSleep(int $sleep)

Set sleep time

string
getQueue(string|null $queue = null)

Get the queue or return the default.

void
setQueue(string $queue)

Set the queue/topic name

int
size(string|null $queue = null)

Get the queue size

void
flush(string|null $queue = null)

Flush the queue

void
logError(Throwable $exception)

Log an error

string
generateId()

Generate the task id

void
logProcessingTask(QueueTask $task)

Log processing task

void
logProcessedTask(QueueTask $task)

Log processed task

void
logFailedTask(QueueTask $task, Throwable $e)

Log failed task

void
initProducer()

Initialize the Kafka producer

void
initConsumer()

Initialize the Kafka consumer

string
getBrokers()

Get broker list from config

void
processMessage(Message $message)

Process a consumed message

TopicConf
getTopicConf()

Get topic configuration

Details

static void suppressLogging(bool $suppress = true)

Enable or disable logging suppression

Parameters

bool $suppress

Return Value

void

QueueAdapter configure(array $config)

Configure the adapter

Parameters

array $config

Return Value

QueueAdapter

bool push(QueueTask $task)

Push a new task onto the queue

Parameters

QueueTask $task

Return Value

bool

string serializeProducer(QueueTask $task)

Create task serialization

Parameters

QueueTask $task

Return Value

string

QueueTask unserializeProducer(string $task)

Create task unserialize

Parameters

string $task

Return Value

QueueTask

void sleep(int $seconds)

Sleep the process

Parameters

int $seconds

Return Value

void

void setTimeout(int $timeout)

Set worker timeout

Parameters

int $timeout

Return Value

void

void updateProcessingTimeout(int|null $timeout = null)

Update the processing timeout

Parameters

int|null $timeout

Return Value

void

final void work(int $timeout, int $memory)

Launch the worker

Parameters

int $timeout
int $memory

Return Value

void

protected bool supportsAsyncSignals()

Determine if "async" signals are supported.

Return Value

bool

protected void listenForSignals()

Enable async signals for the process.

Return Value

void

void run(string|null $queue = null)

Run the worker to consume tasks

Parameters

string|null $queue

Return Value

void

protected bool timeoutReached(int $timeout)

Determine if the timeout is reached

Parameters

int $timeout

Return Value

bool

void kill(int $status = 0)

Kill the process.

Parameters

int $status

Return Value

void

void setTries(int $tries)

Set task tries

Parameters

int $tries

Return Value

void

int getTries()

Get task tries

Return Value

int

void setSleep(int $sleep)

Set sleep time

Parameters

int $sleep

Return Value

void

string getQueue(string|null $queue = null)

Get the queue or return the default.

Parameters

string|null $queue

Return Value

string

void setQueue(string $queue)

Set the queue/topic name

Parameters

string $queue

Return Value

void

int size(string|null $queue = null)

Get the queue size

Parameters

string|null $queue

Return Value

int

void flush(string|null $queue = null)

Flush the queue

Parameters

string|null $queue

Return Value

void

protected void logError(Throwable $exception)

Log an error

Parameters

Throwable $exception

Return Value

void

final protected string generateId()

Generate the task id

Return Value

string

protected void logProcessingTask(QueueTask $task)

Log processing task

Parameters

QueueTask $task

Return Value

void

protected void logProcessedTask(QueueTask $task)

Log processed task

Parameters

QueueTask $task

Return Value

void

protected void logFailedTask(QueueTask $task, Throwable $e)

Log failed task

Parameters

QueueTask $task
Throwable $e

Return Value

void

protected void initProducer()

Initialize the Kafka producer

Return Value

void

protected void initConsumer()

Initialize the Kafka consumer

Return Value

void

protected string getBrokers()

Get broker list from config

Return Value

string

protected void processMessage(Message $message)

Process a consumed message

Parameters

Message $message

Return Value

void

protected TopicConf getTopicConf()

Get topic configuration

Return Value

TopicConf