kithara.core

Public API for simple RabbitMQ consumer creation.

batching-consumer

added in 0.1.2

(batching-consumer batch-handler)(batching-consumer batch-handler {:keys [batch-size interval-ms offer-timeout-ms queue-size], :or {batch-size 128, interval-ms 200, offer-timeout-ms 1000, queue-size 256}, :as opts})

Generate a consumer component based on the given batch processing function.

  • :batch-size: the maximum size of a single batch,
  • :interval-ms: the maximum time between the processing of two batches,
  • :offer-timeout-ms: the maximum waiting time for a spot in the queue,
  • :queue-size: the capacity of the internal BlockingQueue.

All consumer options are allowed as well.

The batch processing function gets a seq of messages and produces either:

  • a single confirmation map for all messages, or
  • a seq of in-order confirmation maps for each individual message.

If less confirmations are returned than messages were given, nil will be used for the remaining ones, triggering the default confirmation handling of the consumer.

consumer

(consumer handler)(consumer handler {:keys [consumer-name], :or {consumer-name "kithara"}, :as opts})

Create a new kithara BaseConsumer using the given handler.

Options:

  • :consumer-name: the consumer’s name,
  • :default-confirmation: the default confirmation map,
  • :error-confirmation: the confirmation map in case of exception,
  • :as: the coercer to use for incoming message bodies,
  • :consumer-tag,
  • :local?,
  • :exclusive?,
  • :arguments.

See the documentation of basic.consume for an explanation of :consumer-tag, :local?, :exclusive? and :arguments.

The following values are valid for :as:

  • :bytes (default),
  • :string
  • any function taking the raw byte array as input,
  • any value implementing kithara.protocols/Coercer.

The handler function gets a message map with the following keys:

  • :channel: the channel the message was received on,
  • :exchange: the exchange the message was published to,
  • :routing-key: the message’s routing key,
  • :body: the coerced body,
  • :body-raw: the body as a byte array,
  • :properties: a map of message properties,
  • :redelivered?: whether the message was redelivered,
  • :delivery-tag: the message’s delivery tag.

Messages will be confirmed based on the return value of the handler function, a map providing a :status key having one of the following values:

  • :ack
  • :nack
  • :reject
  • :error (an uncaught exception occured)
  • :done (do nothing since the message was already explicitly handled)

For :nack, :reject and :error, the key :requeue? can be given to indicate whether or not the message should be requeued.

Additionally, :message (a string) and :error (a Throwable) keys can be added to trigger a log message. See wrap-confirmation and wrap-logging.

publisher

(publisher options & [properties])

Create a publisher component. Accepts all options supported by kithara.config/connection and kithara.config/behaviour.

properties will be used for each message unless explicitly overridden (see AMQP’s BasicProperties).

The resulting component implements IFn and can thus be called like a simple function to publish a message.

with-channel

(with-channel consumers)(with-channel consumers channel-options)

Wrap the given component(s) with setup/teardown of a RabbitMQ channel. The following options can be given:

  • :channel-number
  • :prefetch-count
  • :prefetch-size
  • :prefetch-global?

If no options are given, a channel with server-side default settings will be set up.

Note: set-channel will be used to inject the channel.

with-connection

(with-connection consumers)(with-connection consumers connection-options)

Wrap the given consumer(s) with connection setup/teardown. Accepts all options supported by kithara.config/connection and kithara.config/behaviour.

(defonce rabbitmq-consumer
  (with-connection
    ...
    {:host     "rabbitmq.host.com"
     :vhost    "/internal"
     :username "kithara"
     :password "i-am-secret"}))

Note: set-connection will be used to inject the connection.

with-durable-queue

(with-durable-queue consumers queue-name)(with-durable-queue consumers queue-name queue-options & more-bindings)

See with-queue. Will create/expect a durable, non-exclusive and non-auto-delete queue.

with-env

added in 0.1.2

(with-env components)(with-env components env)

Let each message have an :env key with the message handling environment. It is possible to assoc more keys into the resulting component before startup, allowing it to participate in com.stuartsierra/component systems, e.g.:

(defonce system
  (component/system-map
    :consumer (-> message-handler
                  (kithara.core/consumer ...)
                  ...
                  (kithara.core/with-env)
                  (component/using [:db :elastic]))
    :db       (map->DB {...})
    :elastic  (map->ES {...})))

For this to work, with-env has to be top-most layer of the kithara consumer. If you’re using the two-parameter version, you can explicitly specify the environment to inject.

with-prefetch-channel

(with-prefetch-channel consumers prefetch-count & [channel-options])

See with-channel. Convenience function setting the per-channel prefetch count directly.

with-queue

(with-queue consumers queue-name)(with-queue consumers queue-name queue-options & more-bindings)

Wrap the given consumer(s) with queue setup/teardown. If options are given, the queue will be actively declared using the following keys:

  • :durable?
  • :exclusive?
  • :auto-delete?
  • :declare-arguments

The queue can be bound to an exchange by specifying the following keys in queue-options or as additional parameter maps:

  • :exchange
  • :routing-keys
  • :arguments

Example:

(defonce rabbitmq-consumer
  (with-queue
    ...
    "rabbitm-queue"
    {:exchange "exchange", :routing-keys ["#"]}
    {:exchange "other", :routing-keys ["*.message"]}))

Note: set-queue will be used to inject the queue.

with-server-named-queue

(with-server-named-queue consumers & bindings)

Wrap the given consumer(s) with setup/teardown of a server-named, exclusive, non-durable, auto-deleted queue.

The queue can be bound to an exchange by specifying the following keys in as additional parameter maps:

  • :exchange
  • :routing-keys
  • :arguments

Example:

(defonce rabbitmq-consumer
  (with-server-named-queue
    ...
    {:exchange "exchange", :routing-keys ["#"]}))

Note: set-queue will be used to inject the queue.