Asdfasf

Friday, July 13, 2012

Throttling


Cozmemiz gereken problem su, uygulamamiz belli bir interval'da belli sayida istegi kabul etmeli, bunun disindakileri reject etmeli. Yani sistemin TPS degerini garanti ettigimiz bir seviyede tutmak.

Bu problemin cozumu icin internette arastirirken, sagolsun Microsoft'dan Sergey abimizin cozumune rastladim. thanks Sergey, I owe a lot to you (http://1-800-magic.blogspot.com/)

Asagida Sergey'in Throttling mekanizmasini, thread pool ile bereaber kullanmaya calistim,
Talepleri, 10 TPS ile sinirlandirip, bu talepleri de 20'lik bir BlockingQueue icinden thread pool kullanarak eritelim.
  •  Max 10 TPS calisiyor  
RequestThrottler throttler = new RequestThrottler(10, 1000);
  • Talepleri 20'lik bir queue icinden eritiyoruz
new ArrayBlockingQueue(20);
  • Her talebin islenmesi 2 sn suruyor.

System.out.println("Executing index:" + index + " by "
                    + Thread.currentThread().getName());
            Thread.sleep(2000);

Test class'imiz asagidaki gibi:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test implements Runnable {
    int index;

    @Override
    public void run() {
        try {
            System.out.println("Executing index:" + index + " by "
                    + Thread.currentThread().getName());
            Thread.sleep(2000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 10 trx per second
        RequestThrottler throttler = new RequestThrottler(10, 1000);

        TestRejectedExecutionHandler execHandler = new TestRejectedExecutionHandler();

        BlockingQueue q = new ArrayBlockingQueue(20);
        ThreadPoolExecutor ex = new ThreadPoolExecutor(4, 10, 20,
                TimeUnit.SECONDS, q);
        ex.setRejectedExecutionHandler(execHandler);

        for (int i = 0; i < 10000; i++) {

            Thread.sleep(20);

            if (throttler.tryStartRequest() == 0) {
                System.out.println("Request Accepted " + i + " "
                        + System.currentTimeMillis());
                Test test = new Test();
                test.index = i;
                ex.execute(test);
            } else {
                System.err.println("Request Rejected by Throttler " + i);
            }
        }
    }
}

class TestRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
        Test test = (Test) arg0;
        System.out.println(test.index + " is rejected");
    }
} 

Asagida RequestThrottler'in code'u mevcut. Orjinali icin: http://1-800-magic.blogspot.com/2008/02/throttling-requests-java.html

Genel hatlari ile yaptigi, her bir request'i, talep zamanlari ile indexleyip
long[] _ticks
, ilk ve son talebin _interval araliginde ve _maxCalls adedini gecmemesini kontrol etmek, bu sureci her interval'da resetleyip tekrar etmek:

/**
 * Class that throttles requests. Ensures that the StartRequest cannot be called
 * more than a given amount of time in any given interval.
 * 
 * StartRequest blocks until this requirement is satisfied.
 * 
 * TryStartRequest returns 0 if the request was cleared, or a non-0 number of
 * millisecond to sleep before the next attempt.
 * 
 * Simple usage, 10 requests per second:
 * 
 * Throttler t = new Throttler(10, 1000); ... ServiceRequest(Throtter t, ...) {
 * t.StartRequest(); .. do work ..
 * 
 * @author sergey@solyanik.com (Sergey Solyanik)
 *         http://1-800-magic.blogspot.com/2008/02/throttling-requests-java.html
 * 
 */
public class RequestThrottler {
    /**
     * The interval within we're ensuring max number of calls.
     */
    private final long _interval;

    /**
     * The maximum number of calls that can be made within the interval.
     */
    private final int _maxCalls;

    /**
     * Previous calls within the interval.
     */
    private final long[] _ticks;

    /**
     * Available element at the insertion point (back of the queue).
     */
    private int _tickNext;

    /**
     * Element at the removal point (front of the queue).
     */
    private int _tickLast;

    /**
     * the time when the last expired request occured. might be used to auto
     * disable some services all together
     */
    private long _lastExpiredMaxWait = 0;

    /**
     * Constructor.
     * 
     * @param maxCalls
     *            Max number of calls that can be made within the interval.
     * @param interval
     *            The interval.
     */
    public RequestThrottler(final int maxCalls, final long interval) {
        if (maxCalls < 1) {
            throw new IllegalArgumentException("maxCalls must be >=1, was "
                    + maxCalls);
        }
        if (interval < 1) {
            throw new IllegalArgumentException("interval must be >=1, was "
                    + interval);
        }
        _interval = interval;
        _maxCalls = maxCalls + 1;
        _ticks = new long[_maxCalls];
        _tickLast = _tickNext = 0;
    }

    /**
     * Returns the next element in the queue.
     * 
     * @param index
     *            The element for which to compute the next.
     * @return
     */
    private int next(int index) {
        index += 1;
        return index < _maxCalls ? index : 0;
    }

    /**
     * Attempts to clear the request.
     * 
     * @return Returns 0 if successful, or a time hint (ms) in which we should
     *         attempt to clear it again.
     */
    public synchronized long tryStartRequest() {
        long result = 0;
        final long now = System.currentTimeMillis();
        while (_tickLast != _tickNext) {
            if (now - _ticks[_tickLast] < _interval) {
                break;
            }
            _tickLast = next(_tickLast);
        }

        final int next = next(_tickNext);
        if (next != _tickLast) {
            _ticks[_tickNext] = now;
            _tickNext = next;
        } else {
            result = _interval - (now - _ticks[_tickLast]);
        }
        return result;
    }

    /**
     * Clears the request. Blocks until the request can execute.
     */
    public void startRequest() {
        startRequest(Integer.MAX_VALUE);
    }

    /**
     * Clears the request. Blocks until the request can execute or until waxWait
     * would be exceeded.
     * 
     * @return true if successful or false if request should not execute
     */
    public boolean startRequest(final int maxWait) {
        long sleep;
        long total = 0;
        while ((sleep = tryStartRequest()) > 0) {
            if (maxWait > 0 && (total += sleep) > maxWait) {
                _lastExpiredMaxWait = System.currentTimeMillis();
                return false;
            } else {
                try {
                    Thread.sleep(sleep);
                } catch (final InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }
        return true;
    }

    public long getLastExpiredMaxWait() {
        return _lastExpiredMaxWait;
    }

}

No comments: