-
Notifications
You must be signed in to change notification settings - Fork 101
Add API rate limit handler #371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add API rate limit handler #371
Conversation
src/neo4j_graphrag/llm/rate_limit.py
Outdated
raise convert_to_rate_limit_error(e) | ||
raise | ||
|
||
return active_handler.handle_sync(inner_func)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call the returned function here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not so familiar with defining decorators. But don't we still need this to return the retry version of the inner function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The decorator should return a function, that's why I tend to think the final ()
are not needed, but I haven't tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we omit the trailing (), the decorator would return the wrapped function object instead of the function’s result.
src/neo4j_graphrag/llm/rate_limit.py
Outdated
before_sleep=before_sleep_log(logger, logging.WARNING), | ||
) | ||
@functools.wraps(func) | ||
async def wrapper(*args: Any, **kwargs: Any) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you manage to test it in async mode? I'm worried we'll have some weird behaviour with the concurrent mode if retries are using await asyncio.sleep(..)
, that would allow other calls to be "enqueued" and almost all of the following calls will have to wait, which will still fail pretty quickly since they are all waiting and retrying at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I havn't thought about that but we should definitely avoid this. One solution could be adding randomisation to delay retries. something like the below:
def handle_async(self, func: AF) -> AF:
@retry(
retry=retry_if_exception_type(RateLimitError),
stop=stop_after_attempt(self.max_attempts),
wait=wait_exponential(
multiplier=self.multiplier,
min=self.min_wait,
max=self.max_wait,
) + (wait_random(0, 1) if self.jitter else wait_fixed(0)), # add jitter
before_sleep=before_sleep_log(logger, logging.WARNING),
)
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be tested ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like Tenacity already has a function that does exponential backoff + jitter, so I ended up using it 8cfacb4
|
||
class CustomRateLimitHandler(RateLimitHandler): | ||
"""Implement your custom rate limiting strategy.""" | ||
# Implement required methods: handle_sync, handle_async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which parameters are these methods getting as input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func
representing sync/async functions that need rate limit handling applied to them as it is the case for current RetryRateLimitHandler
(the default one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this interface could be used to limit the number of requests per second for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we would like to extend our strategy for rate limit handling and do throttling, we can define a new subclass:
class ThrottlingRateLimitHandler(RateLimitHandler):
def __init__(self, requests_per_window: int = 10, window_size: float = 1.0):
self.requests_per_window = requests_per_window
self.window_size = window_size
....
def handle_sync(self, func: F) -> F:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# handle throttling here
return func(*args, **kwargs)
return wrapper
.....
then we can basically use the same decorators with the invoke functions of the LLM classes, we just need to ensure we pass the right rate limit handler in their constructors.
Description
RateLimitHandler
interfaceRateLimitError
RateLimitHandler
interfaceType of Change
Complexity
Complexity: Medium
How Has This Been Tested?
Checklist
The following requirements should have been met (depending on the changes in the branch):