{-# LANGUAGE ScopedTypeVariables #-}
module Control.RateLimit (
generateRateLimitedFunction
, RateLimit(..)
, ResultsCombiner
, dontCombine
, rateLimitInvocation
, rateLimitExecution
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (void)
import Data.Functor (($>))
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Units
data RateLimit a
= PerInvocation a
| PerExecution a
type ResultsCombiner req resp = req -> req -> Maybe (req, resp -> (resp, resp))
dontCombine :: ResultsCombiner a b
dontCombine :: ResultsCombiner a b
dontCombine _ _ = Maybe (a, b -> (b, b))
forall a. Maybe a
Nothing
rateLimitInvocation :: TimeUnit t
=> t
-> (req -> IO resp)
-> IO (req -> IO resp)
rateLimitInvocation :: t -> (req -> IO resp) -> IO (req -> IO resp)
rateLimitInvocation pertime :: t
pertime action :: req -> IO resp
action =
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction (t -> RateLimit t
forall a. a -> RateLimit a
PerInvocation t
pertime) req -> IO resp
action ResultsCombiner req resp
forall a b. ResultsCombiner a b
dontCombine
rateLimitExecution :: TimeUnit t
=> t
-> (req -> IO resp)
-> IO (req -> IO resp)
rateLimitExecution :: t -> (req -> IO resp) -> IO (req -> IO resp)
rateLimitExecution pertime :: t
pertime action :: req -> IO resp
action =
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction (t -> RateLimit t
forall a. a -> RateLimit a
PerExecution t
pertime) req -> IO resp
action ResultsCombiner req resp
forall a b. ResultsCombiner a b
dontCombine
generateRateLimitedFunction :: forall req resp t
. TimeUnit t
=> RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction :: RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction ratelimit :: RateLimit t
ratelimit action :: req -> IO resp
action combiner :: ResultsCombiner req resp
combiner = do
TChan (req, MVar resp)
chan <- STM (TChan (req, MVar resp)) -> IO (TChan (req, MVar resp))
forall a. STM a -> IO a
atomically STM (TChan (req, MVar resp))
forall a. STM (TChan a)
newTChan
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO ()
forall a.
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner Maybe Integer
forall a. Maybe a
Nothing 0 TChan (req, MVar resp)
chan
(req -> IO resp) -> IO (req -> IO resp)
forall (m :: * -> *) a. Monad m => a -> m a
return ((req -> IO resp) -> IO (req -> IO resp))
-> (req -> IO resp) -> IO (req -> IO resp)
forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> req -> IO resp
resultFunction TChan (req, MVar resp)
chan
where
currentMicroseconds :: IO Integer
currentMicroseconds :: IO Integer
currentMicroseconds =
Picosecond -> Integer
forall a. TimeUnit a => a -> Integer
toMicroseconds (Picosecond -> Integer)
-> (POSIXTime -> Picosecond) -> POSIXTime -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Picosecond
forall a b. (Integral a, Num b) => a -> b
fromIntegral :: Int -> Picosecond) (Int -> Picosecond)
-> (POSIXTime -> Int) -> POSIXTime -> Picosecond
forall b c a. (b -> c) -> (a -> b) -> a -> c
. POSIXTime -> Int
forall a. Enum a => a -> Int
fromEnum (POSIXTime -> Integer) -> IO POSIXTime -> IO Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
IO POSIXTime
getPOSIXTime
runner :: Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner :: Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner mLastRun :: Maybe Integer
mLastRun lastAllowance :: Integer
lastAllowance chan :: TChan (req, MVar resp)
chan = do
(req :: req
req, respMV :: MVar resp
respMV) <- STM (req, MVar resp) -> IO (req, MVar resp)
forall a. STM a -> IO a
atomically (STM (req, MVar resp) -> IO (req, MVar resp))
-> STM (req, MVar resp) -> IO (req, MVar resp)
forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> STM (req, MVar resp)
forall a. TChan a -> STM a
readTChan TChan (req, MVar resp)
chan
let baseHandler :: resp -> IO ()
baseHandler resp :: resp
resp = MVar resp -> resp -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar resp
respMV resp
resp
Integer
beforeWait <- IO Integer
currentMicroseconds
let targetPeriod :: Integer
targetPeriod = t -> Integer
forall a. TimeUnit a => a -> Integer
toMicroseconds (t -> Integer) -> t -> Integer
forall a b. (a -> b) -> a -> b
$ RateLimit t -> t
getRate RateLimit t
ratelimit
timeSinceLastRun :: Integer
timeSinceLastRun = case Maybe Integer
mLastRun of
Just lastRun :: Integer
lastRun -> Integer
beforeWait Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
lastRun
Nothing -> Integer -> Integer
forall a. Num a => a -> a
negate Integer
targetPeriod
targetDelay :: Integer
targetDelay = Integer
targetPeriod Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
timeSinceLastRun Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
lastAllowance
Integer
nextAllowance <- if Integer
targetDelay Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
< 0
then Integer -> IO Integer
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> IO Integer) -> Integer -> IO Integer
forall a b. (a -> b) -> a -> b
$ Integer -> Integer
forall a. Num a => a -> a
abs Integer
targetDelay
else do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
targetDelay
Integer
afterWait <- IO Integer
currentMicroseconds
let slept :: Integer
slept = Integer
afterWait Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
beforeWait
overslept :: Integer
overslept = Integer
slept Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
targetDelay
Integer -> IO Integer
forall (m :: * -> *) a. Monad m => a -> m a
return Integer
overslept
(req' :: req
req', finalHandler :: resp -> IO ()
finalHandler) <- TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req resp -> IO ()
baseHandler
let run :: IO ()
run = req -> IO resp
action req
req' IO resp -> (resp -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= resp -> IO ()
finalHandler
Integer
beforeRun <- IO Integer
currentMicroseconds
if RateLimit t -> Bool
shouldFork RateLimit t
ratelimit
then IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO IO ()
run
else IO ()
run
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
forall a.
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner (Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
beforeRun) Integer
nextAllowance TChan (req, MVar resp)
chan
updateRequestWithFollowers :: TChan (req, MVar resp)
-> req
-> (resp -> IO ())
-> IO (req, (resp -> IO ()))
updateRequestWithFollowers :: TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers chan :: TChan (req, MVar resp)
chan req :: req
req handler :: resp -> IO ()
handler = do
Bool
isEmpty <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> STM Bool
forall a. TChan a -> STM Bool
isEmptyTChan TChan (req, MVar resp)
chan
if Bool
isEmpty
then (req, resp -> IO ()) -> IO (req, resp -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (req
req, resp -> IO ()
handler)
else do Maybe ((req, resp -> (resp, resp)), MVar resp)
mCombinedAndMV <- STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
-> IO (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall a. STM a -> IO a
atomically (STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
-> IO (Maybe ((req, resp -> (resp, resp)), MVar resp)))
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
-> IO (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall a b. (a -> b) -> a -> b
$ do
tup :: (req, MVar resp)
tup@(next :: req
next, nextRespMV :: MVar resp
nextRespMV) <- TChan (req, MVar resp) -> STM (req, MVar resp)
forall a. TChan a -> STM a
readTChan TChan (req, MVar resp)
chan
case ResultsCombiner req resp
combiner req
req req
next of
Nothing -> TChan (req, MVar resp) -> (req, MVar resp) -> STM ()
forall a. TChan a -> a -> STM ()
unGetTChan TChan (req, MVar resp)
chan (req, MVar resp)
tup STM ()
-> Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe ((req, resp -> (resp, resp)), MVar resp)
forall a. Maybe a
Nothing
Just combined :: (req, resp -> (resp, resp))
combined -> Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp)))
-> Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall a b. (a -> b) -> a -> b
$ ((req, resp -> (resp, resp)), MVar resp)
-> Maybe ((req, resp -> (resp, resp)), MVar resp)
forall a. a -> Maybe a
Just ((req, resp -> (resp, resp))
combined, MVar resp
nextRespMV)
case Maybe ((req, resp -> (resp, resp)), MVar resp)
mCombinedAndMV of
Nothing ->
(req, resp -> IO ()) -> IO (req, resp -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (req
req, resp -> IO ()
handler)
Just ((req' :: req
req', splitResponse :: resp -> (resp, resp)
splitResponse), nextRespMV :: MVar resp
nextRespMV) ->
TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req' ((resp -> IO ()) -> IO (req, resp -> IO ()))
-> (resp -> IO ()) -> IO (req, resp -> IO ())
forall a b. (a -> b) -> a -> b
$ \resp :: resp
resp -> do
let (theirs :: resp
theirs, mine :: resp
mine) = resp -> (resp, resp)
splitResponse resp
resp
MVar resp -> resp -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar resp
nextRespMV resp
mine
resp -> IO ()
handler resp
theirs
shouldFork :: RateLimit t -> Bool
shouldFork :: RateLimit t -> Bool
shouldFork (PerInvocation _) = Bool
True
shouldFork (PerExecution _) = Bool
False
getRate :: RateLimit t -> t
getRate :: RateLimit t -> t
getRate (PerInvocation x :: t
x) = t
x
getRate (PerExecution x :: t
x) = t
x
resultFunction :: TChan (req, MVar resp) -> req -> IO resp
resultFunction :: TChan (req, MVar resp) -> req -> IO resp
resultFunction chan :: TChan (req, MVar resp)
chan req :: req
req = do
MVar resp
respMV <- IO (MVar resp)
forall a. IO (MVar a)
newEmptyMVar
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> (req, MVar resp) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (req, MVar resp)
chan (req
req, MVar resp
respMV)
MVar resp -> IO resp
forall a. MVar a -> IO a
takeMVar MVar resp
respMV