onseok

Understanding the Token Bucket Algorithm

In Android development, you frequently encounter situations where network requests need to be properly controlled. Common examples include users rapidly tapping a button, requesting numerous images while scrolling, or excessive background synchronization.

In this post, we take a deep dive into the Token Bucket algorithm, a core solution for these problems. In particular, we will focus on implementing it in Kotlin and exploring practical ways to use it in an Android environment.

Core Principles of the Token Bucket Algorithm

Token Bucket is a widely used algorithm in network traffic shaping. The basic principle works as follows:

Token Bucket Concept Diagram

  1. Token Bucket: Tokens are stored in a bucket with a fixed capacity
  2. Token Generation: Tokens are generated at a constant rate and added to the bucket
  3. Request Processing: When a request arrives, it consumes tokens to be processed
  4. Burst Allowance: If the bucket has enough tokens, it can handle a sudden burst of requests

The greatest advantage of this algorithm is that it limits the average processing rate while flexibly handling burst traffic. This aligns well with mobile app usage patterns.

Implementing a Basic Token Bucket in Kotlin

Let us start by implementing the most basic form of a Token Bucket. This implementation accounts for thread-safety.

import java.util.concurrent.atomic.AtomicLong
import kotlin.math.min

class TokenBucket(
    private val capacity: Long,      // Maximum bucket capacity
    private val refillRate: Double,   // Tokens refilled per second
    private val refillPeriodMillis: Long = 1000  // Refill period (default 1 second)
) {
    private val tokens = AtomicLong(capacity)
    private val lastRefillTimestamp = AtomicLong(System.nanoTime())

    /**
     * Consume tokens and return whether the request is allowed
     * @param numTokens Number of tokens to consume
     * @return Whether the request is allowed
     */
    fun tryConsume(numTokens: Long = 1): Boolean {
        refill()

        return tokens.updateAndGet { current ->
            if (current >= numTokens) current - numTokens else current
        } >= numTokens
    }

    /**
     * Refill tokens based on elapsed time
     */
    private fun refill() {
        val now = System.nanoTime()
        val lastRefill = lastRefillTimestamp.get()
        val elapsedNanos = now - lastRefill

        if (elapsedNanos > refillPeriodMillis * 1_000_000) {
            val elapsedPeriods = elapsedNanos / (refillPeriodMillis * 1_000_000)
            val tokensToAdd = (elapsedPeriods * refillRate).toLong()

            if (tokensToAdd > 0) {
                tokens.updateAndGet { current ->
                    min(capacity, current + tokensToAdd)
                }
                lastRefillTimestamp.set(now)
            }
        }
    }

    /**
     * Query the number of currently available tokens
     */
    fun availableTokens(): Long {
        refill()
        return tokens.get()
    }
}

The key to this implementation is the lock-free concurrency control using AtomicLong. Thread-safety is essential in Android apps because network requests can originate from multiple threads simultaneously.

An Android-Optimized Implementation

Real-world Android apps require more sophisticated control. Here is an approach that works with coroutines:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.min

class AdvancedTokenBucket(
    private val capacity: Int,
    private val refillRate: Double,
    // Note: GlobalScope is used here for simplicity. In production Android code, use a lifecycle-aware scope.
    private val scope: CoroutineScope = GlobalScope
) {
    data class BucketState(
        val tokens: Double,
        val lastRefillTime: Long = System.nanoTime()
    )

    private val state = AtomicReference(BucketState(capacity.toDouble()))
    private val waitingRequests = Channel<CompletableDeferred<Boolean>>(Channel.UNLIMITED)

    init {
        // Periodically refill tokens and process waiting requests in the background
        scope.launch {
            while (isActive) {
                delay(100) // Check every 100ms
                refillTokens()
                processWaitingRequests()
            }
        }
    }

    /**
     * Non-blocking attempt to consume tokens
     */
    fun tryConsume(tokens: Int = 1): Boolean {
        while (true) {
            val current = state.get()
            val now = System.nanoTime()
            val newState = calculateNewState(current, now)

            if (newState.tokens >= tokens) {
                val consumedState = newState.copy(tokens = newState.tokens - tokens)
                if (state.compareAndSet(current, consumedState)) {
                    return true
                }
            } else {
                return false
            }
        }
    }

    /**
     * Wait until tokens become available (suspend function)
     */
    suspend fun consume(tokens: Int = 1, timeoutMillis: Long = 5000): Boolean {
        // Try immediately first
        if (tryConsume(tokens)) {
            return true
        }

        // Register a waiting request
        return withTimeoutOrNull(timeoutMillis) {
            val deferred = CompletableDeferred<Boolean>()
            waitingRequests.send(deferred)
            deferred.await()
        } ?: false
    }

    private fun calculateNewState(current: BucketState, now: Long): BucketState {
        val elapsedSeconds = (now - current.lastRefillTime) / 1_000_000_000.0
        val tokensToAdd = elapsedSeconds * refillRate
        val newTokens = min(capacity.toDouble(), current.tokens + tokensToAdd)

        return BucketState(
            tokens = newTokens,
            lastRefillTime = if (tokensToAdd > 0) now else current.lastRefillTime
        )
    }

    private fun refillTokens() {
        while (true) {
            val current = state.get()
            val now = System.nanoTime()
            val newState = calculateNewState(current, now)

            if (state.compareAndSet(current, newState)) {
                break
            }
        }
    }

    // Note: This is conceptual pseudocode. Channel does not expose a public isEmpty property.
    // In practice, use tryReceive() in a loop and break when no elements are available.
    private suspend fun processWaitingRequests() {
        while (true) {
            val request = waitingRequests.tryReceive().getOrNull() ?: break
            if (tryConsume()) {
                request.complete(true)
            } else {
                // Re-add to the waiting queue
                waitingRequests.send(request)
                break
            }
        }
    }
}

This implementation manages tokens asynchronously using coroutines and efficiently processes waiting requests.

Practical Use Cases in Android Apps

1. Integration with a Retrofit Interceptor

class RateLimitInterceptor(
    private val tokenBucket: TokenBucket
) : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        // Apply different weights per API endpoint
        val request = chain.request()
        val weight = when {
            request.url.encodedPath.contains("/upload") -> 10  // Uploads cost 10 tokens
            request.url.encodedPath.contains("/search") -> 3   // Searches cost 3 tokens
            else -> 1  // Default: 1 token
        }

        // Return 429 error if insufficient tokens
        if (!tokenBucket.tryConsume(weight.toLong())) {
            return Response.Builder()
                .code(429)
                .message("Too Many Requests")
                .protocol(Protocol.HTTP_1_1)
                .request(request)
                .build()
        }

        return chain.proceed(request)
    }
}

// Retrofit configuration
val retrofit = Retrofit.Builder()
    .baseUrl("https://api.example.com/")
    .client(
        OkHttpClient.Builder()
            .addInterceptor(RateLimitInterceptor(tokenBucket))
            .build()
    )
    .build()

2. Integration with an Image Loading Library

class RateLimitedImageLoader(
    private val context: Context,
    private val tokenBucket: AdvancedTokenBucket
) {
    private val imageLoader = ImageLoader.Builder(context)
        .components {
            add(RateLimitFetcher.Factory(tokenBucket))
        }
        .build()

    suspend fun loadImage(url: String): Drawable? {
        val request = ImageRequest.Builder(context)
            .data(url)
            .build()

        return when (val result = imageLoader.execute(request)) {
            is SuccessResult -> result.drawable
            else -> null
        }
    }
}

class RateLimitFetcher(
    private val tokenBucket: AdvancedTokenBucket,
    private val wrapped: Fetcher
) : Fetcher {
    override suspend fun fetch(): FetchResult? {
        // Consume a token before loading the image (with timeout)
        if (!tokenBucket.consume(tokens = 1, timeoutMillis = 3000)) {
            // Note: Simplified for illustration. A real OkHttp Response.Builder requires
            // .request(request) and .protocol(Protocol.HTTP_1_1) as mandatory fields.
            throw HttpException(Response.Builder()
                .code(429)
                .message("Rate limit exceeded")
                .build())
        }

        return wrapped.fetch()
    }
}

Token Bucket in a Distributed Environment

When a mobile app communicates with multiple servers, or when server-side rate limiting is needed, a distributed Token Bucket can be implemented using Redis.

class DistributedTokenBucket(
    private val redisClient: RedisClient,
    private val key: String,
    private val capacity: Int,
    private val refillRate: Double
) {
    // Lua script ensures atomic operations
    private val consumeScript = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        local requested = tonumber(ARGV[4])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        -- Calculate token refill
        local elapsed = math.max(0, now - last_refill)
        local refilled = math.min(capacity, tokens + (elapsed * rate))

        if refilled >= requested then
            -- Consume tokens
            redis.call('HMSET', key,
                'tokens', refilled - requested,
                'last_refill', now)
            redis.call('EXPIRE', key, 86400)  -- 24-hour TTL
            return 1
        else
            -- Insufficient tokens
            if elapsed > 0 then
                redis.call('HMSET', key,
                    'tokens', refilled,
                    'last_refill', now)
            end
            return 0
        end
    """.trimIndent()

    suspend fun tryConsume(tokens: Int = 1): Boolean {
        val result = redisClient.eval(
            script = consumeScript,
            keys = listOf(key),
            args = listOf(
                capacity.toString(),
                refillRate.toString(),
                (System.currentTimeMillis() / 1000.0).toString(),
                tokens.toString()
            )
        )

        return result == 1L
    }
}

Adaptive Rate Limiting

An adaptive Token Bucket that dynamically adjusts the rate based on network conditions or server response times can also be implemented.

class AdaptiveTokenBucket(
    initialCapacity: Int,
    private val minRate: Double = 1.0,
    private val maxRate: Double = 100.0
) {
    private var currentRate = minRate
    private var tokenBucket = TokenBucket(
        capacity = initialCapacity.toLong(),
        refillRate = currentRate
    )

    // Network performance metrics
    // CircularFifoQueue is from Apache Commons Collections
    private val latencyWindow = CircularFifoQueue<Long>(100)
    private val errorWindow = CircularFifoQueue<Boolean>(100)

    fun recordLatency(latencyMs: Long) {
        latencyWindow.add(latencyMs)
        adjustRateBasedOnPerformance()
    }

    fun recordResult(success: Boolean) {
        errorWindow.add(success)
        adjustRateBasedOnPerformance()
    }

    private fun adjustRateBasedOnPerformance() {
        if (errorWindow.isEmpty()) return
        val avgLatency = latencyWindow.average()
        val successRate = errorWindow.count { it } / errorWindow.size.toDouble()

        // Adjust rate based on network conditions
        currentRate = when {
            avgLatency < 100 && successRate > 0.95 -> {
                // Good network conditions - increase rate
                min(maxRate, currentRate * 1.2)
            }
            avgLatency > 500 || successRate < 0.8 -> {
                // Poor network conditions - decrease rate
                max(minRate, currentRate * 0.7)
            }
            else -> currentRate
        }

        // Recreate bucket with the new rate
        tokenBucket = TokenBucket(
            capacity = (currentRate * 10).toLong(),
            refillRate = currentRate
        )
    }
}

This follows a principle similar to TCP’s congestion control algorithm, employing the AIMD (Additive Increase Multiplicative Decrease) approach.

Token Bucket vs Other Rate Limiting Algorithms

Token Bucket vs Leaky Bucket

Algorithm Comparison

AlgorithmAdvantagesDisadvantagesSuitable Use Cases
Token BucketAllows bursts, flexibilityImplementation complexityMobile apps, API clients
Leaky BucketConstant output rateNo burst supportStreaming, constant throughput needed
Fixed WindowSimple implementationBoundary problemsSimple rate limiting
Sliding WindowAccuracyMemory usagePrecise control needed

Why Token Bucket is well-suited for mobile apps:

Monitoring and Debugging

The following is a monitoring implementation example for measuring and optimizing the effectiveness of a Token Bucket.

class TokenBucketMonitor(
    private val tokenBucket: TokenBucket
) {
    private val metrics = mutableMapOf<String, AtomicLong>()

    init {
        metrics["totalRequests"] = AtomicLong(0)
        metrics["allowedRequests"] = AtomicLong(0)
        metrics["rejectedRequests"] = AtomicLong(0)
        metrics["totalTokensConsumed"] = AtomicLong(0)
    }

    fun recordRequest(allowed: Boolean, tokensRequested: Long = 1) {
        metrics["totalRequests"]?.incrementAndGet()

        if (allowed) {
            metrics["allowedRequests"]?.incrementAndGet()
            metrics["totalTokensConsumed"]?.addAndGet(tokensRequested)
        } else {
            metrics["rejectedRequests"]?.incrementAndGet()
        }
    }

    fun getMetrics(): Map<String, Long> {
        return metrics.mapValues { it.value.get() } + mapOf(
            "availableTokens" to tokenBucket.availableTokens(),
            "rejectionRate" to calculateRejectionRate()
        )
    }

    private fun calculateRejectionRate(): Long {
        val total = metrics["totalRequests"]?.get() ?: 0
        val rejected = metrics["rejectedRequests"]?.get() ?: 0
        return if (total > 0) (rejected * 100 / total) else 0
    }

    // Send to Firebase Analytics or Crashlytics
    // Note: This is pseudocode. In practice, pass a Context parameter or inject FirebaseAnalytics.
    fun reportToAnalytics(context: Context) {
        val bundle = Bundle().apply {
            getMetrics().forEach { (key, value) ->
                putLong("token_bucket_$key", value)
            }
        }
        FirebaseAnalytics.getInstance(context).logEvent("rate_limit_metrics", bundle)
    }
}

Practical Implementation Considerations

1. Handling System Time Changes

Use System.nanoTime() to guard against system time changes on Android devices. It is a monotonic clock that is independent of the system clock.

2. App Lifecycle Considerations

class TokenBucketManager(private val context: Context) : DefaultLifecycleObserver {
    private var tokenBucket: TokenBucket? = null

    override fun onStart(owner: LifecycleOwner) {
        // Increase rate when the app comes to the foreground
        tokenBucket = TokenBucket(
            capacity = 100,
            refillRate = 10.0
        )
    }

    override fun onStop(owner: LifecycleOwner) {
        // Decrease rate when going to the background
        tokenBucket = TokenBucket(
            capacity = 10,
            refillRate = 1.0
        )
    }
}

3. Memory Efficiency

Manage Token Bucket instances through singletons or dependency injection (Hilt/Dagger) to optimize memory usage.

Summary

The Token Bucket algorithm is a powerful tool for effectively controlling network requests in mobile apps. By leveraging Kotlin’s coroutines and atomic operations, you can achieve an efficient and safe implementation.

Key takeaways:

Token Bucket goes beyond simple rate limiting – it is an essential component for stable and efficient network communication. Especially considering the unstable networks and limited resources of mobile environments, proper traffic control can significantly improve app quality.

References

#algorithm #rate-limiting #network