Utilities#

Executors#

class AsyncExecutor(generation_fn, concurrency=3, tqdm_bar_format=None, max_retries=10, exit_on_error=True, fallback_return_value=_unset, termination_signal=signal.SIGINT, timeout=None, *, enable_dynamic_concurrency=True, dynamic_initial_target=None, dynamic_window_seconds=5.0, dynamic_increase_step=1, dynamic_decrease_ratio=0.5, dynamic_inactive_check_interval=1.0)#

Bases: Executor

A class that provides asynchronous execution of tasks using a producer-consumer pattern.

An async interface is provided by the execute method, which returns a coroutine, and a sync interface is provided by the run method.

Parameters:
  • generation_fn (Callable[[Any], Coroutine[Any, Any, Any]]) – A coroutine function that generates tasks to be executed.

  • concurrency (int, optional) – The number of concurrent consumers. Defaults to 3.

  • tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults to None.

  • max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to 10.

  • exit_on_error (bool, optional) – Whether to exit execution on the first encountered error. Defaults to True.

  • fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks that encounter errors. Defaults to _unset.

  • termination_signal (signal.Signals, optional) – The signal handled to terminate the executor.

async consumer(outputs, execution_details, queue, done_producing, termination_event, progress_bar, worker_index)#
async execute(inputs)#
async producer(inputs, queue, max_fill, done_producing, termination_signal)#
run(inputs)#
class ConcurrencyController(*, max_concurrency, initial_target, window_seconds=5, increase_step=0.5, decrease_ratio=0.5, inactive_check_interval=1.0, smoothing_factor=0.2, collapse_window_seconds=15.0, collapse_error_threshold=2)#

Bases: object

AIMD (Additive Increase/Multiplicative Decrease) controller for target concurrency.

Per window: if no error, increase target by +a (increase_step); otherwise decrease concurrency by a factor of β, clamped to [1, max_concurrency].

Steady-state guide for choosing feedback constants:

concurrency ~= a * (1 - r_e) / ((1 - β) * r_e)

where r_e is the fraction of windows that observe at least one error. To tend toward a single active worker when errors are frequent, select (a, β) so that

concurrency <= 1 when r_e >= a / (a + 1 - β).

Example: a=1, β=0.5 ⇒ threshold r_e >= 2/3.

record_error()#
record_success(latency_seconds)#
record_timeout()#
property target_concurrency#
EvalsRateLimitError#

alias of _EvalsRateLimitErrorFallback

class ExecutionDetails#

Bases: object

complete()#
fail()#
log_exception(exc)#
log_runtime(start_time)#
class ExecutionStatus(value)#

Bases: Enum

COMPLETED = 'COMPLETED'#
COMPLETED_WITH_RETRIES = 'COMPLETED WITH RETRIES'#
DID_NOT_RUN = 'DID NOT RUN'#
FAILED = 'FAILED'#
class Executor(*args, **kwargs)#

Bases: Protocol

run(inputs)#
class SyncExecutor(generation_fn, tqdm_bar_format=None, max_retries=10, exit_on_error=True, fallback_return_value=_unset, termination_signal=signal.SIGINT)#

Bases: Executor

Synchronous executor for generating outputs from inputs using a given generation function.

Parameters:
  • generation_fn (Callable[[Any], Any]) – The generation function that takes an input and returns an output.

  • tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults to None. If None, the progress bar is disabled.

  • max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to 10.

  • exit_on_error (bool, optional) – Whether to exit execution on the first encountered error. Defaults to True.

  • fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks that encounter errors. Defaults to _unset.

run(inputs)#
class Unset#

Bases: object

get_executor_on_sync_context(sync_fn, async_fn, run_sync=False, concurrency=3, tqdm_bar_format=None, max_retries=10, exit_on_error=True, fallback_return_value=_unset, timeout=None)#

Rate Limiters#

class AdaptiveTokenBucket(initial_per_second_request_rate, maximum_per_second_request_rate=None, minimum_per_second_request_rate=0.1, enforcement_window_minutes=1, rate_reduction_factor=0.5, rate_increase_factor=0.01, cooldown_seconds=5)#

Bases: object

An adaptive rate-limiter that adjusts the rate based on the number of rate limit errors.

This rate limiter does not need to know the exact rate limit. Instead, it starts with a high rate and reduces it whenever a rate limit error occurs. The rate is increased slowly over time if no further errors occur.

Args: initial_per_second_request_rate (float): The allowed request rate. maximum_per_second_request_rate (float): The maximum allowed request rate. enforcement_window_minutes (float): The time window over which the rate limit is enforced. rate_reduction_factor (float): Multiplier used to reduce the rate limit after an error. rate_increase_factor (float): Exponential factor increasing the rate limit over time. cooldown_seconds (float): The minimum time before allowing the rate limit to decrease again.

async async_wait_until_ready(max_wait_time=10)#
available_requests()#
increase_rate()#
make_request_if_ready()#
max_tokens()#
on_rate_limit_error(request_start_time, verbose=False)#
wait_until_ready(max_wait_time=300)#
exception RateLimitError(message='Exceeded rate limit retries', *, current_rate_tokens_per_sec=None, initial_rate_tokens_per_sec=None, enforcement_window_seconds=None)#

Bases: PhoenixException

class RateLimiter(rate_limit_error=None, max_rate_limit_retries=0, initial_per_second_request_rate=1.0, maximum_per_second_request_rate=None, enforcement_window_minutes=1, rate_reduction_factor=0.5, rate_increase_factor=0.01, cooldown_seconds=5, verbose=False)#

Bases: object

alimit(fn)#
limit(fn)#
exception UnavailableTokensError#

Bases: PhoenixException

Configuration#

get_base_url()#
get_env_client_headers()#
get_env_collector_endpoint()#
get_env_host()#
get_env_host_root_path()#
get_env_phoenix_api_key()#
get_env_port()#
get_env_project_name()#
getenv(key, default=None)#

Retrieves the value of an environment variable.

Parameters:
  • key (str) – The name of the environment variable.

  • default (Optional[str], optional) – The default value to return if the environment variable is not set, by default None.

Returns:

The value of the environment variable, or default if the variable is not set. Leading and trailing whitespaces are stripped from the value, assuming they were inadvertently added.

Return type:

Optional[str]

Template Formatters#

class BaseTemplateFormatter#

Bases: ABC

format(template, /, *, variables=MappingProxyType({}))#

Format the template with the given variables.

Parameters:
  • template (str) – The template string to format.

  • variables (Mapping[str, str]) – A mapping of variable names to their values.

Returns:

The formatted template string.

Return type:

str

Raises:

TemplateFormatterError – If required template variables are missing.

abstractmethod parse(template)#

Parse the template and return a set of variable names.

Parameters:

template (str) – The template string to parse.

Returns:

A set of variable names found in the template.

Return type:

set[str]

class FStringBaseTemplateFormatter#

Bases: BaseTemplateFormatter

F-string style template formatter using Python string formatting.

This formatter uses Python’s built-in string formatting with curly braces to substitute variables in templates.

Example:

formatter = FStringBaseTemplateFormatter()
result = formatter.format("{hello}", variables={"hello": "world"})
# result == "world"
parse(template)#

Parse the template and return a set of variable names.

Parameters:

template (str) – The template string to parse.

Returns:

A set of variable names found in the template.

Return type:

set[str]

class MustacheBaseTemplateFormatter#

Bases: BaseTemplateFormatter

Mustache-style template formatter using double curly braces.

This formatter uses Mustache-style syntax with double curly braces and optional whitespace to substitute variables in templates.

Example:

formatter = MustacheBaseTemplateFormatter()
result = formatter.format("{{ hello }}", variables={"hello": "world"})
# result == "world"
PATTERN = re.compile('(?<!\\\\){{\\s*(\\w+)\\s*}}')#
parse(template)#

Parse the template and return a set of variable names.

Parameters:

template (str) – The template string to parse.

Returns:

A set of variable names found in the template.

Return type:

set[str]

class NoOpFormatterBase#

Bases: BaseTemplateFormatter

No-op template formatter that returns templates unchanged.

This formatter does not perform any variable substitution and simply returns the template string as-is.

Example:

formatter = NoOpFormatterBase()
result = formatter.format("hello")
# result == "hello"
parse(template)#

Parse the template and return a set of variable names.

Parameters:

template (str) – The template string to parse.

Returns:

A set of variable names found in the template.

Return type:

set[str]

class TemplateFormatter(*args, **kwargs)#

Bases: Protocol

format(template, /, *, variables)#
exception TemplateFormatterError#

Bases: Exception

An error raised when template formatting fails.

This exception is raised when required template variables are missing or when template formatting encounters other errors.

to_formatter(obj)#

Convert a PromptVersionData object to the appropriate template formatter.

Parameters:

obj (v1.PromptVersionData) – The prompt version data containing template format information.

Returns:

The appropriate formatter based on the template format.

Return type:

BaseTemplateFormatter

ID Handling#

is_node_id(s, node_type)#

Check if a string is a valid node ID.

Parameters:
  • s (str) – The string to check.

  • node_type (str) – The type of node.

Environment Headers#

parse_env_headers(s)#

Parse s, which is a str instance containing HTTP headers encoded for use in ENV variables per the W3C Baggage HTTP header format at https://www.w3.org/TR/baggage/#baggage-http-header-format, except that additional semicolon delimited metadata is not supported.

src: open-telemetry/opentelemetry-python

Path Encoding#

encode_path_param(s)#

Quoting function for FastAPI path parameters.

Encodes a string for use in URL paths, raising ValueError if it contains characters that would cause routing issues (/, ?, #).

Parameters:

s (str) – The string to encode

Returns:

The encoded string safe for path parameters

Return type:

str

Raises:

ValueError – If the string contains /, ?, or # characters