Skip to content
Wiktor Petryszyn edited this page Jan 27, 2022 · 3 revisions

Welcome to the ktor-rabbitmq wiki!

Gradle configuration

ktor-rabbitmq is hosted on JitPack, to use it in your project you have to:

Add JitPack at the END of your repositories:

allprojects {
    repositories {
        ...
        maven { url 'https://jitpack.io' }
    }
}

Add these dependencies to your module:

implementation "com.github.JUtupe:ktor-rabbitmq:$ktor_rabbitmq_version"
implementation "com.rabbitmq:amqp-client:$rabbitmq_version"

Plugin configuration

install(RabbitMQ) {
    uri = "amqp://guest:guest@localhost:5672"
    connectionName = "Connection name"

    enableLogging()

    //serialize and deserialize functions are required
    serialize { jacksonObjectMapper().writeValueAsBytes(it) }
    deserialize { bytes, type -> jacksonObjectMapper().readValue(bytes, type.javaObjectType) }
    
    //example initialization logic (create queues etc.)
    initialize { // RabbitMQ Channel.() block ->
        exchangeDeclare(/* exchange = */ "exchange", /* type = */ "direct", /* durable = */ true)
        queueDeclare(
            /* queue = */ "queue",
            /* durable = */true,
            /* exclusive = */false,
            /* autoDelete = */false,
            /* arguments = */emptyMap()
        )
        queueBind(/* queue = */ "queue", /* exchange = */ "exchange", /* routingKey = */ "routingKey")
    }
}

Consuming messages

Basic

//consume with autoack example
rabbitConsumer {
    consume<MyObject>("queue") { body ->
        println("Consumed message $body")
    }
}

Manual ack

//consume work queue with manual ack example
rabbitConsumer {
    consume<MyObject>("work_queue", /* autoAck= */ false, /* basicQos = */ 1) { body ->
        println("Consumed task $body")
        
        // We can omit 'this' part
        this.ack()
    }
}

Consumer-scope actions

Actions that you can invoke within the consume scope.

  • ack()
  • nack()
  • reject()

Publishing messages

  • publishing from call
routing {
    get("anyEndpoint") {
        call.publish("exchange", "routingKey", /* props= */ null, MyObject("test name"))
    }
}

Dependency injection compatibility

In case you need to initialize RabbitMQ prior to starting it (e. g. to kickstart your DI injection), you can pass pre-created RabbitMQ instance to the plugin:

install(RabbitMQ) {
    rabbitMQInstance = RabbitMQInstance(RabbitMQConfiguration.create()
        .apply {
            uri = "amqp://guest:guest@${rabbit.host}:${rabbit.amqpPort}"
            connectionName = "Connection name"

            serialize { jacksonObjectMapper().writeValueAsBytes(it) }
            deserialize { bytes, type -> jacksonObjectMapper().readValue(bytes, type.javaObjectType) }

            initialize {
                // ...
            }
        })
}

You can also use pre-created instance to publish messages:

rabbitMQInstance.publish("exchange", "routingKey", /* props= */ null, MyObject("test name"))