evals.executors#

class AsyncExecutor(generation_fn: ~typing.Callable[[~typing.Any], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]], concurrency: int = 3, tqdm_bar_format: str | None = None, max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: ~evals.executors.Unset | ~typing.Any = <evals.executors.Unset object>, termination_signal: ~signal.Signals = Signals.SIGINT)#

Bases: Executor

A class that provides asynchronous execution of tasks using a producer-consumer pattern.

An async interface is provided by the execute method, which returns a coroutine, and a sync interface is provided by the run method.

Parameters:
  • generation_fn (Callable[[Any], Coroutine[Any, Any, Any]]) – A coroutine function that

  • executed. (generates tasks to be)

  • concurrency (int, optional) – The number of concurrent consumers. Defaults to 3.

  • tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults

  • None. (to)

  • max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to

  • 10.

  • exit_on_error (bool, optional) – Whether to exit execution on the first encountered error.

  • True. (Defaults to)

  • fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks

  • _unset. (that encounter errors. Defaults to)

  • termination_signal (signal.Signals, optional) – The signal handled to terminate the executor.

async consumer(outputs: List[Any], execution_details: List[ExecutionDetails], queue: asyncio.PriorityQueue[Tuple[int, Any]], done_producing: asyncio.Event, termination_event: asyncio.Event, progress_bar: tqdm[Any]) None#
async execute(inputs: Sequence[Any]) Tuple[List[Any], List[ExecutionDetails]]#
async producer(inputs: Sequence[Any], queue: PriorityQueue[Tuple[int, Any]], max_fill: int, done_producing: Event, termination_signal: Event) None#
run(inputs: Sequence[Any]) Tuple[List[Any], List[ExecutionDetails]]#
class ExecutionDetails#

Bases: object

complete() None#
fail() None#
log_exception(exc: Exception) None#
log_runtime(start_time: float) None#
class ExecutionStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: Enum

COMPLETED = 'COMPLETED'#
COMPLETED_WITH_RETRIES = 'COMPLETED WITH RETRIES'#
DID_NOT_RUN = 'DID NOT RUN'#
FAILED = 'FAILED'#
class Executor(*args, **kwargs)#

Bases: Protocol

run(inputs: Sequence[Any]) Tuple[List[Any], List[ExecutionDetails]]#
class SyncExecutor(generation_fn: ~typing.Callable[[~typing.Any], ~typing.Any], tqdm_bar_format: str | None = None, max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: ~evals.executors.Unset | ~typing.Any = <evals.executors.Unset object>, termination_signal: ~signal.Signals | None = Signals.SIGINT)#

Bases: Executor

Synchronous executor for generating outputs from inputs using a given generation function.

Parameters:
  • generation_fn (Callable[[Any], Any]) – The generation function that takes an input and

  • output. (returns an)

  • tqdm_bar_format (Optional[str], optional) – The format string for the progress bar. Defaults

  • None. (to)

  • max_retries (int, optional) – The maximum number of times to retry on exceptions. Defaults to

  • 10.

  • exit_on_error (bool, optional) – Whether to exit execution on the first encountered error.

  • True. (Defaults to)

  • fallback_return_value (Union[Unset, Any], optional) – The fallback return value for tasks

  • _unset. (that encounter errors. Defaults to)

run(inputs: Sequence[Any]) Tuple[List[Any], List[Any]]#
class Unset#

Bases: object

get_executor_on_sync_context(sync_fn: ~typing.Callable[[~typing.Any], ~typing.Any], async_fn: ~typing.Callable[[~typing.Any], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]], run_sync: bool = False, concurrency: int = 3, tqdm_bar_format: str | None = None, max_retries: int = 10, exit_on_error: bool = True, fallback_return_value: ~evals.executors.Unset | ~typing.Any = <evals.executors.Unset object>) Executor#