Utilities#
Executors#
- class AsyncExecutor(generation_fn, concurrency=3, tqdm_bar_format=None, max_retries=10, exit_on_error=True, fallback_return_value=_unset, termination_signal=signal.SIGINT, timeout=None, *, enable_dynamic_concurrency=True, dynamic_initial_target=None, dynamic_window_seconds=5.0, dynamic_increase_step=1, dynamic_decrease_ratio=0.5, dynamic_inactive_check_interval=1.0)#
Bases:
ExecutorA class that provides asynchronous execution of tasks using a producer-consumer pattern.
An async interface is provided by the execute method, which returns a coroutine, and a sync interface is provided by the run method.
- Parameters:
generation_fn (Callable[[Any], Coroutine[Any, Any, Any]]) – A coroutine function that generates tasks to be executed.
concurrency (int, optional) – The number of concurrent consumers. Defaults to 3.
tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults to None.
max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to 10.
exit_on_error (bool, optional) – Whether to exit execution on the first encountered error. Defaults to True.
fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks that encounter errors. Defaults to _unset.
termination_signal (signal.Signals, optional) – The signal handled to terminate the executor.
- async consumer(outputs, execution_details, queue, done_producing, termination_event, progress_bar, worker_index)#
- async execute(inputs)#
- async producer(inputs, queue, max_fill, done_producing, termination_signal)#
- run(inputs)#
- class ConcurrencyController(*, max_concurrency, initial_target, window_seconds=5, increase_step=0.5, decrease_ratio=0.5, inactive_check_interval=1.0, smoothing_factor=0.2, collapse_window_seconds=15.0, collapse_error_threshold=2)#
Bases:
objectAIMD (Additive Increase/Multiplicative Decrease) controller for target concurrency.
Per window: if no error, increase target by +a (increase_step); otherwise decrease concurrency by a factor of β, clamped to [1, max_concurrency].
- Steady-state guide for choosing feedback constants:
concurrency ~= a * (1 - r_e) / ((1 - β) * r_e)
where r_e is the fraction of windows that observe at least one error. To tend toward a single active worker when errors are frequent, select (a, β) so that
concurrency <= 1 when r_e >= a / (a + 1 - β).
Example: a=1, β=0.5 ⇒ threshold r_e >= 2/3.
- record_error()#
- record_success(latency_seconds)#
- record_timeout()#
- property target_concurrency#
- EvalsRateLimitError#
alias of
_EvalsRateLimitErrorFallback
- class ExecutionDetails#
Bases:
object- complete()#
- fail()#
- log_exception(exc)#
- log_runtime(start_time)#
- class ExecutionStatus(value)#
Bases:
Enum- COMPLETED = 'COMPLETED'#
- COMPLETED_WITH_RETRIES = 'COMPLETED WITH RETRIES'#
- DID_NOT_RUN = 'DID NOT RUN'#
- FAILED = 'FAILED'#
- class SyncExecutor(generation_fn, tqdm_bar_format=None, max_retries=10, exit_on_error=True, fallback_return_value=_unset, termination_signal=signal.SIGINT)#
Bases:
ExecutorSynchronous executor for generating outputs from inputs using a given generation function.
- Parameters:
generation_fn (Callable[[Any], Any]) – The generation function that takes an input and returns an output.
tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults to None. If None, the progress bar is disabled.
max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to 10.
exit_on_error (bool, optional) – Whether to exit execution on the first encountered error. Defaults to True.
fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks that encounter errors. Defaults to _unset.
- run(inputs)#
- class Unset#
Bases:
object
- get_executor_on_sync_context(sync_fn, async_fn, run_sync=False, concurrency=3, tqdm_bar_format=None, max_retries=10, exit_on_error=True, fallback_return_value=_unset, timeout=None)#
Rate Limiters#
- class AdaptiveTokenBucket(initial_per_second_request_rate, maximum_per_second_request_rate=None, minimum_per_second_request_rate=0.1, enforcement_window_minutes=1, rate_reduction_factor=0.5, rate_increase_factor=0.01, cooldown_seconds=5)#
Bases:
objectAn adaptive rate-limiter that adjusts the rate based on the number of rate limit errors.
This rate limiter does not need to know the exact rate limit. Instead, it starts with a high rate and reduces it whenever a rate limit error occurs. The rate is increased slowly over time if no further errors occur.
Args: initial_per_second_request_rate (float): The allowed request rate. maximum_per_second_request_rate (float): The maximum allowed request rate. enforcement_window_minutes (float): The time window over which the rate limit is enforced. rate_reduction_factor (float): Multiplier used to reduce the rate limit after an error. rate_increase_factor (float): Exponential factor increasing the rate limit over time. cooldown_seconds (float): The minimum time before allowing the rate limit to decrease again.
- async async_wait_until_ready(max_wait_time=10)#
- available_requests()#
- increase_rate()#
- make_request_if_ready()#
- max_tokens()#
- on_rate_limit_error(request_start_time, verbose=False)#
- wait_until_ready(max_wait_time=300)#
- exception RateLimitError(message='Exceeded rate limit retries', *, current_rate_tokens_per_sec=None, initial_rate_tokens_per_sec=None, enforcement_window_seconds=None)#
Bases:
PhoenixException
- class RateLimiter(rate_limit_error=None, max_rate_limit_retries=0, initial_per_second_request_rate=1.0, maximum_per_second_request_rate=None, enforcement_window_minutes=1, rate_reduction_factor=0.5, rate_increase_factor=0.01, cooldown_seconds=5, verbose=False)#
Bases:
object- alimit(fn)#
- limit(fn)#
Bases:
PhoenixException
Configuration#
- get_base_url()#
- get_env_client_headers()#
- get_env_collector_endpoint()#
- get_env_host()#
- get_env_host_root_path()#
- get_env_phoenix_api_key()#
- get_env_port()#
- get_env_project_name()#
- getenv(key, default=None)#
Retrieves the value of an environment variable.
- Parameters:
key (str) – The name of the environment variable.
default (Optional[str], optional) – The default value to return if the environment variable is not set, by default None.
- Returns:
The value of the environment variable, or default if the variable is not set. Leading and trailing whitespaces are stripped from the value, assuming they were inadvertently added.
- Return type:
Optional[str]
Template Formatters#
- class BaseTemplateFormatter#
Bases:
ABC- format(template, /, *, variables=MappingProxyType({}))#
Format the template with the given variables.
- Parameters:
template (str) – The template string to format.
variables (Mapping[str, str]) – A mapping of variable names to their values.
- Returns:
The formatted template string.
- Return type:
str
- Raises:
TemplateFormatterError – If required template variables are missing.
- abstractmethod parse(template)#
Parse the template and return a set of variable names.
- Parameters:
template (str) – The template string to parse.
- Returns:
A set of variable names found in the template.
- Return type:
set[str]
- class FStringBaseTemplateFormatter#
Bases:
BaseTemplateFormatterF-string style template formatter using Python string formatting.
This formatter uses Python’s built-in string formatting with curly braces to substitute variables in templates.
Example:
formatter = FStringBaseTemplateFormatter() result = formatter.format("{hello}", variables={"hello": "world"}) # result == "world"
- parse(template)#
Parse the template and return a set of variable names.
- Parameters:
template (str) – The template string to parse.
- Returns:
A set of variable names found in the template.
- Return type:
set[str]
- class MustacheBaseTemplateFormatter#
Bases:
BaseTemplateFormatterMustache-style template formatter using double curly braces.
This formatter uses Mustache-style syntax with double curly braces and optional whitespace to substitute variables in templates.
Example:
formatter = MustacheBaseTemplateFormatter() result = formatter.format("{{ hello }}", variables={"hello": "world"}) # result == "world"
- PATTERN = re.compile('(?<!\\\\){{\\s*(\\w+)\\s*}}')#
- parse(template)#
Parse the template and return a set of variable names.
- Parameters:
template (str) – The template string to parse.
- Returns:
A set of variable names found in the template.
- Return type:
set[str]
- class NoOpFormatterBase#
Bases:
BaseTemplateFormatterNo-op template formatter that returns templates unchanged.
This formatter does not perform any variable substitution and simply returns the template string as-is.
Example:
formatter = NoOpFormatterBase() result = formatter.format("hello") # result == "hello"
- parse(template)#
Parse the template and return a set of variable names.
- Parameters:
template (str) – The template string to parse.
- Returns:
A set of variable names found in the template.
- Return type:
set[str]
- exception TemplateFormatterError#
Bases:
ExceptionAn error raised when template formatting fails.
This exception is raised when required template variables are missing or when template formatting encounters other errors.
- to_formatter(obj)#
Convert a PromptVersionData object to the appropriate template formatter.
- Parameters:
obj (v1.PromptVersionData) – The prompt version data containing template format information.
- Returns:
The appropriate formatter based on the template format.
- Return type:
ID Handling#
- is_node_id(s, node_type)#
Check if a string is a valid node ID.
- Parameters:
s (str) – The string to check.
node_type (str) – The type of node.
Environment Headers#
- parse_env_headers(s)#
Parse
s, which is astrinstance containing HTTP headers encoded for use in ENV variables per the W3C Baggage HTTP header format at https://www.w3.org/TR/baggage/#baggage-http-header-format, except that additional semicolon delimited metadata is not supported.
Path Encoding#
- encode_path_param(s)#
Quoting function for FastAPI path parameters.
Encodes a string for use in URL paths, raising ValueError if it contains characters that would cause routing issues (/, ?, #).
- Parameters:
s (str) – The string to encode
- Returns:
The encoded string safe for path parameters
- Return type:
str
- Raises:
ValueError – If the string contains /, ?, or # characters