evals.executors#
- class AsyncExecutor(generation_fn: ~typing.Callable[[~typing.Any], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]], concurrency: int = 3, tqdm_bar_format: str | None = None, max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: ~evals.executors.Unset | ~typing.Any = <evals.executors.Unset object>, termination_signal: ~signal.Signals = Signals.SIGINT)#
Bases:
Executor
A class that provides asynchronous execution of tasks using a producer-consumer pattern.
An async interface is provided by the execute method, which returns a coroutine, and a sync interface is provided by the run method.
- Parameters:
generation_fn (Callable[[Any], Coroutine[Any, Any, Any]]) – A coroutine function that
executed. (generates tasks to be)
concurrency (int, optional) – The number of concurrent consumers. Defaults to 3.
tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults
None. (to)
max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to
10.
exit_on_error (bool, optional) – Whether to exit execution on the first encountered error.
True. (Defaults to)
fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks
_unset. (that encounter errors. Defaults to)
termination_signal (signal.Signals, optional) – The signal handled to terminate the executor.
- async consumer(outputs: List[Any], execution_details: List[ExecutionDetails], queue: asyncio.PriorityQueue[Tuple[int, Any]], done_producing: asyncio.Event, termination_event: asyncio.Event, progress_bar: tqdm[Any]) None #
- async execute(inputs: Sequence[Any]) Tuple[List[Any], List[ExecutionDetails]] #
- async producer(inputs: Sequence[Any], queue: PriorityQueue[Tuple[int, Any]], max_fill: int, done_producing: Event, termination_signal: Event) None #
- run(inputs: Sequence[Any]) Tuple[List[Any], List[ExecutionDetails]] #
- class ExecutionDetails#
Bases:
object
- complete() None #
- fail() None #
- log_exception(exc: Exception) None #
- log_runtime(start_time: float) None #
- class ExecutionStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
Enum
- COMPLETED = 'COMPLETED'#
- COMPLETED_WITH_RETRIES = 'COMPLETED WITH RETRIES'#
- DID_NOT_RUN = 'DID NOT RUN'#
- FAILED = 'FAILED'#
- class Executor(*args, **kwargs)#
Bases:
Protocol
- run(inputs: Sequence[Any]) Tuple[List[Any], List[ExecutionDetails]] #
- class SyncExecutor(generation_fn: ~typing.Callable[[~typing.Any], ~typing.Any], tqdm_bar_format: str | None = None, max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: ~evals.executors.Unset | ~typing.Any = <evals.executors.Unset object>, termination_signal: ~signal.Signals | None = Signals.SIGINT)#
Bases:
Executor
Synchronous executor for generating outputs from inputs using a given generation function.
- Parameters:
generation_fn (Callable[[Any], Any]) – The generation function that takes an input and
output. (returns an)
tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults
None. (to)
max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to
10.
exit_on_error (bool, optional) – Whether to exit execution on the first encountered error.
True. (Defaults to)
fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks
_unset. (that encounter errors. Defaults to)
- run(inputs: Sequence[Any]) Tuple[List[Any], List[Any]] #
- class Unset#
Bases:
object
- get_executor_on_sync_context(sync_fn: ~typing.Callable[[~typing.Any], ~typing.Any], async_fn: ~typing.Callable[[~typing.Any], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]], run_sync: bool = False, concurrency: int = 3, tqdm_bar_format: str | None = None, max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: ~evals.executors.Unset | ~typing.Any = <evals.executors.Unset object>) Executor #