Data retrieval service with exponential backoff

Posted on Posted in java

Here we will create Data retrieval service with exponential backoff that we covered in the previous post.

Implementation

 
package com.rms.blueprint.data;
 
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;
import java.util.function.Supplier;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class DataRetrievalWithBackoff implements Runnable
{
    public final Logger LOGGER = LoggerFactory.getLogger(DataRetrievalWithBackoff.class);
 
    private final Supplier<Long> pendingSupplier;
 
    private final ObjLongConsumer<Long> readyConsumer;
 
    private final Function<Long, Long> capacitySupplier;
 
    private final long minLoadDurationInSeconds;
 
    private final long capacity;
 
    private final long maxBackoffDelayInSeconds;
 
    /**
     * @param config
     *            Configuration properties
     * @param pendingSupplier
     *            Supplier that tells us how many items is being processed at
     *            this time
     * @param readyConsumer
     *            Consumer that will be called when data is ready to load
     * @param capacitySupplier
     *            Function to calculate current capacity
     */
    private DataRetrievalWithBackoff(final long capacity, final long minLoadDurationInSeconds,
            final long maxBackoffDelayInSeconds, final Supplier<Long> pendingSupplier,
            final ObjLongConsumer<Long> readyConsumer, final Function<Long, Long> capacitySupplier)
    {
        Objects.requireNonNull(pendingSupplier);
        Objects.requireNonNull(readyConsumer);
        Objects.requireNonNull(capacitySupplier);
 
        this.capacity = capacity;
        this.minLoadDurationInSeconds = minLoadDurationInSeconds;
        this.maxBackoffDelayInSeconds = maxBackoffDelayInSeconds;
 
        this.pendingSupplier = pendingSupplier;
        this.readyConsumer = readyConsumer;
        this.capacitySupplier = capacitySupplier;
 
        LOGGER.info(String.format("capacity ", capacity));
        LOGGER.info(String.format("minLoadDurationInSeconds ", minLoadDurationInSeconds));
        LOGGER.info(String.format("maxBackoffDelayInSeconds ", maxBackoffDelayInSeconds));
 
    }
 
    @Override
    public void run()
    {
        LOGGER.info("Running Data Retrieval");
 
        long lastLoadedTime = 0l;
        int attempt = 0;
 
        while (true)
        {
            if (Thread.currentThread().isInterrupted())
            {
                LOGGER.trace("Interrupted stopping [while]");
                break;
            }
 
            final long delta = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastLoadedTime);
            final long pending = pendingSupplier.get();
 
            final long backoffTime = DataUtil.backoff(attempt,
                                                      maxBackoffDelayInSeconds,
                                                      minLoadDurationInSeconds / 2.0);
 
            LOGGER.trace("Loading : lastLoaded : {} > {}  delta(s) {} pending : {} backoffTime  = {}",
                         new Object[] { lastLoadedTime, new Date(lastLoadedTime), delta, pending, backoffTime });
 
            if (delta >= minLoadDurationInSeconds && pending <= capacitySupplier.apply(capacity))
            {
                LOGGER.info("Loading : lastLoaded :  {} >  {}  delta(s) {} pending : {}",
                            new Object[] { lastLoadedTime, new Date(lastLoadedTime), delta, pending });
 
                // let the consumer know that we are ready
                readyConsumer.accept(backoffTime, attempt);
 
                if (pending == 0)
                    ++attempt;
                else
                    attempt = 0;
 
                lastLoadedTime = System.currentTimeMillis();
            }
            else
            {
                ++attempt;
            }
 
            try
            {
 
                Thread.sleep(TimeUnit.SECONDS.toMillis(backoffTime));
            }
            catch (final InterruptedException e)
            {
                LOGGER.trace("Interrupted stopping  [sleep]");
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
 
    public static class Builder
    {
        private final Function<Long, Long> DEFAULT_CAPACITY_SUPPLIER = (capacity) -> capacity / 2;
 
        private long minLoadDurationInSeconds = 60;
 
        private long capacity = 100;
 
        private long maxBackoffDelayInSeconds = 120;
 
        private Supplier<Long> pendingSupplier;
 
        private ObjLongConsumer<Long> readyConsumer;
 
        private Function<Long, Long> capacitySupplier;
 
        public Builder capacity(final long capacity)
        {
            this.capacity = capacity;
            return this;
        }
 
        public Builder maxBackoffDelay(final long duration, final TimeUnit unit)
        {
            Objects.requireNonNull(unit);
            this.maxBackoffDelayInSeconds = unit.toSeconds(duration);
            return this;
        }
 
        public Builder minLoadDuration(final long duration, final TimeUnit unit)
        {
            Objects.requireNonNull(unit);
            this.minLoadDurationInSeconds = unit.toSeconds(duration);
            return this;
        }
 
        public Builder readyConsumer(final ObjLongConsumer<Long> readyConsumer)
        {
            Objects.requireNonNull(readyConsumer);
            this.readyConsumer = readyConsumer;
            return this;
        }
 
        public Builder capacitySupplier(final Function<Long, Long> capacitySupplier)
        {
            Objects.requireNonNull(capacitySupplier);
            this.capacitySupplier = capacitySupplier;
            return this;
        }
 
        public Builder pendingSupplier(final Supplier<Long> pendingSupplier)
        {
            Objects.requireNonNull(capacitySupplier);
            this.pendingSupplier = pendingSupplier;
            return this;
        }
 
        public DataRetrievalWithBackoff build()
        {
            // check invariant
            Objects.requireNonNull(pendingSupplier, "Pening items supplier not provided");
            Objects.requireNonNull(readyConsumer, "Ready Consumer not provided");
 
            if (capacitySupplier == null)
                capacitySupplier = DEFAULT_CAPACITY_SUPPLIER;
 
            return new DataRetrievalWithBackoff(capacity,
                                                minLoadDurationInSeconds,
                                                maxBackoffDelayInSeconds,
                                                pendingSupplier,
                                                readyConsumer,
                                                capacitySupplier == null ? DEFAULT_CAPACITY_SUPPLIER
                                                    : capacitySupplier);
        }
    }
}

Usage

 
   //@formatter:off
            final DataRetrievalWithBackoff service = new DataRetrievalWithBackoff.Builder()
                    .capacity(1000)
                    .maxBackoffDelay(100, TimeUnit.SECONDS)
                    .minLoadDuration(10, TimeUnit.SECONDS)
                    .pendingSupplier(() -> getNumberOfPendingItemsToProcess())
                    .readyConsumer((time, attempt) -> fire(new DataLoadEvent()))
                .build(); 
           //@formatter:on

Leave a Reply

Your email address will not be published. Required fields are marked *