PriorityDispatcher — Priority based Dispatcher in Kotlin Coroutines

PriorityDispatcher — Priority based Dispatcher in Kotlin Coroutines

Priority Queue based Dispatcher for prioritisation of task in Kotlin coroutines.

Introduction

In the world of asynchronous programming, managing tasks efficiently is important. However, not all tasks are created equal, and some may require immediate attention over others. I designed a small utility Kotlin library, PriorityDispatcher, that prioritise tasks within coroutines based on a priority queue. In this blog post, we’ll explore how PriorityDispatcher works, its key components, and how you can leverage it to for task management in your Kotlin projects.

Components

CustomPriorityDispatcher

Extends ExecutorCoroutineDispatcher and defines a custom coroutine dispatcher that dispatches tasks based on their priority and order sequence. On every dispatch, it creates PriorityRunnable and execute on executor.

/**
 * Custom priority dispatcher - On every dispatch, create [PriorityRunnable] and execute on [executor]
 *
 * @property executor ThreadPoolExecutor with PriorityBlockingQueue
 * @property sequence Order at which task gets queued
 * @property priority [Priority] of the task
 */
internal class CustomPriorityDispatcher(
    override val executor: Executor,
    private val sequence: Int,
    private val priority: Priority
) : ExecutorCoroutineDispatcher() {

    override fun close() {
        (executor as? ExecutorService)?.shutdown()
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        val runnable = PriorityRunnable(priority, sequence, block)
        executor.execute(runnable)
    }
}

PriorityRunnable

PriorityRunnable implements the Runnable interface and encapsulates the priority, order sequence, and the actual task to be executed. This class just acts as a wrapper for tasks within the priority queue.

/**
 * Priority runnable - Implements [Runnable] and pass to the executor for execution based on PriorityQueue
 *
 * @property priority [Priority] of the task
 * @property orderSequence Order at which task gets queued
 * @property runnable Actual Runnable to be executed by Executor
 */
internal class PriorityRunnable(
    val priority: Priority,
    val orderSequence: Int,
    private val runnable: Runnable
) : Runnable {

    override fun run() {
        runnable.run()
    }
}

Priority

Enum class defines the three levels of priority: LOW, MEDIUM, and HIGH.

/**
 * Priority - Three levels of Priority: [LOW], [MEDIUM], [HIGH]
 * Order of enum class is important, as it is used for comparison in priority queue
 *
 */
internal enum class Priority {
    /**
     * Low level Priority
     *
     */
    LOW,

    /**
     * Medium level Priority
     *
     */
    MEDIUM,

    /**
     * High level Priority
     *
     */
    HIGH
}

Connecting the components

PriorityDispatcher

This is the base class client interacts with. It creates a ThreadPool with thread equal to number of available processor. Once all the processor are busy, all upcoming tasks gets queued up inside Priority queue.

private val backgroundTaskExecutorService: ExecutorService by lazy {
    ThreadPoolExecutor(
        cores, cores, 0L, TimeUnit.MILLISECONDS,
        PriorityBlockingQueue(DEFAULT_INITIAL_CAPACITY) { o1, o2 ->
            //+ve --> o2>o1, -ve --> o1>o2
            val p1 = o1 as PriorityRunnable
            val p2 = o2 as PriorityRunnable
            if (p1.priority == p2.priority) p1.orderSequence - p2.orderSequence //less orderSequence, high priority (FIFO)
            else p2.priority.ordinal - p1.priority.ordinal //high ordinal, high priority
        }
    )
}

backgroundTaskExecutorService is a ThreadPool that uses PriorityBlockingQueue (an unbounded queue that uses priority queue). Prioritisation is done on basis of Priority of the task, in case of task with equal priority, task gets executed based on FIFO (First in First out) order.

Functions that are exposed to client internally creates CustomPriorityDispatcher with given Priority and order of execution.

/**
 * Low: Creates [CustomPriorityDispatcher] with low priority
 *
 * @return [CoroutineDispatcher]
 */
fun low(): CoroutineDispatcher {
    return CustomPriorityDispatcher(
        backgroundTaskExecutorService,
        sequence.incrementAndGet(),
        Priority.LOW
    )
}

/**
 * Medium: Creates [CustomPriorityDispatcher] with medium priority
 *
 * @return [CoroutineDispatcher]
 */
fun medium(): CoroutineDispatcher {
    return CustomPriorityDispatcher(
        backgroundTaskExecutorService,
        sequence.incrementAndGet(),
        Priority.MEDIUM
    )
}

/**
 * High: Creates [CustomPriorityDispatcher] with high priority
 *
 * @return [CoroutineDispatcher]
 */
fun high(): CoroutineDispatcher {
    return CustomPriorityDispatcher(
        backgroundTaskExecutorService,
        sequence.incrementAndGet(),
        Priority.HIGH
    )
}

Client Usage

To use, pass the PriorityDispatcher instead of dispatcher inside coroutines.

CoroutineScope(PriorityDispatcher.low()).launch {
    // Task with low priority
}

CoroutineScope(PriorityDispatcher.medium()).launch {
    // Task with medium priority
}

CoroutineScope(PriorityDispatcher.high()).launch {
    // Task with high priority
}

Immediate Task

Sometimes there are task that need to executed immediately irrespective of current workload. Best case to do in this case is to create the thread on the go if all threads are busy, and perform the task. To do that I have created a separate threadpool.

private val immediateTaskExecutorService: ExecutorService by lazy {
    Executors.newCachedThreadPool()
}

immediateTaskExecutorService is thread pool that have 0 corePoolSize, instead it creates thread on the go that lives up to 60 seconds, and for every new task it checks if thread can be reused or it will create new thread. Task are executed in FIFO order.

Function that is exposed to client internally creates object of ExecutorCoroutineDispatcher.

/**
 * Immediate: Creates [ExecutorCoroutineDispatcher] and execute task on [immediateTaskExecutorService]
 *
 * @return [CoroutineDispatcher]
 */
fun immediate(): CoroutineDispatcher {
    return object : ExecutorCoroutineDispatcher() {
        override val executor: Executor
            get() = immediateTaskExecutorService

        override fun close() {
            (executor as? ExecutorService)?.shutdown()
        }

        override fun dispatch(context: CoroutineContext, block: Runnable) {
            executor.execute(block)
        }
    }
}

Client Usage

To use, pass the PriorityDispatcher.immediate() instead of dispatcher inside coroutines.

CoroutineScope(PriorityDispatcher.immediate()).launch {
    // Task with immediate priority
}

Reference

Check out the Github code and installation guide: https://github.com/khushpanchal/PriorityDispatcher

If this project helps you, show love ❤️ by putting a ⭐ on this project ✌️

Contact Me: LinkedIn, Twitter