Skip to content

viv CLI

The viv CLI is a command-line tool for interacting with Vivaria.

Commands are documented below, in three groups:

  • Under Config, documentation for viv config subcommands: viv config get, viv config list, and viv config set
  • Under Vivaria, documentation for viv subcommands.
  • Under Task, documentation for viv task subcommands.

viv_cli.main

viv CLI.

Config

Group within the CLI for managing configuration.

Source code in cli/viv_cli/main.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class Config:
    """Group within the CLI for managing configuration."""

    @typechecked
    def get(self, key: str) -> None:
        """Get the value of a config key."""
        # Not get_user_config().dict() so that we can still get values if the config is invalid
        user_config = get_user_config_dict()
        if key not in user_config:
            err_exit(f"{key} not set")
        print(f"{key}: {user_config[key]}")

    @typechecked
    def list(self) -> None:
        """Print config and config path."""
        print(
            "user config path:",
            f"\t{user_config_path}",
            json.dumps(get_config_from_file(), indent=2),
            "",
            "default config:\n",
            json.dumps(default_config.model_dump(), indent=2),
            "",
            "environment variable overrides:",
            "\n".join(f"\t{k}: {v} ({os.environ.get(v, '')!r})" for k, v in env_overrides),
            sep="\n",
        )
        print(
            "\ncurrent config including env overrides:\n",
            json.dumps(get_user_config().model_dump(), indent=2),
        )

    @typechecked
    def set(self, key: str, value: Any) -> None:  # noqa: ANN401
        """Set the value of a config key."""
        set_user_config({key: value})

get(key)

Get the value of a config key.

Source code in cli/viv_cli/main.py
118
119
120
121
122
123
124
125
@typechecked
def get(self, key: str) -> None:
    """Get the value of a config key."""
    # Not get_user_config().dict() so that we can still get values if the config is invalid
    user_config = get_user_config_dict()
    if key not in user_config:
        err_exit(f"{key} not set")
    print(f"{key}: {user_config[key]}")

list()

Print config and config path.

Source code in cli/viv_cli/main.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@typechecked
def list(self) -> None:
    """Print config and config path."""
    print(
        "user config path:",
        f"\t{user_config_path}",
        json.dumps(get_config_from_file(), indent=2),
        "",
        "default config:\n",
        json.dumps(default_config.model_dump(), indent=2),
        "",
        "environment variable overrides:",
        "\n".join(f"\t{k}: {v} ({os.environ.get(v, '')!r})" for k, v in env_overrides),
        sep="\n",
    )
    print(
        "\ncurrent config including env overrides:\n",
        json.dumps(get_user_config().model_dump(), indent=2),
    )

set(key, value)

Set the value of a config key.

Source code in cli/viv_cli/main.py
147
148
149
150
@typechecked
def set(self, key: str, value: Any) -> None:  # noqa: ANN401
    """Set the value of a config key."""
    set_user_config({key: value})

RunBatch

Commands for managing run batches.

Source code in cli/viv_cli/main.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class RunBatch:
    """Commands for managing run batches."""

    @typechecked
    def update(self, name: str, concurrency_limit: int) -> None:
        """Update the concurrency limit for a run batch.

        Args:
            name: The name of the run batch.
            concurrency_limit: The new concurrency limit.
        """
        if concurrency_limit < 0:
            err_exit("concurrency limit must not be negative")

        viv_api.update_run_batch(name, concurrency_limit)

update(name, concurrency_limit)

Update the concurrency limit for a run batch.

Parameters:

Name Type Description Default
name str

The name of the run batch.

required
concurrency_limit int

The new concurrency limit.

required
Source code in cli/viv_cli/main.py
552
553
554
555
556
557
558
559
560
561
562
563
@typechecked
def update(self, name: str, concurrency_limit: int) -> None:
    """Update the concurrency limit for a run batch.

    Args:
        name: The name of the run batch.
        concurrency_limit: The new concurrency limit.
    """
    if concurrency_limit < 0:
        err_exit("concurrency limit must not be negative")

    viv_api.update_run_batch(name, concurrency_limit)

Task

Task environment management.

Group within the CLI for managing task environments.

Source code in cli/viv_cli/main.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
class Task:
    """Task environment management.

    Group within the CLI for managing task environments.
    """

    def __init__(self) -> None:
        """Initialize the task command group."""
        self._ssh = SSH()

    def _setup_task_commit(self, ignore_workdir: bool = False) -> viv_api.GitRepoTaskSource:
        """Set up git commit for task environment."""
        org, repo = gh.get_org_and_repo()
        _, commit, permalink = gh.create_working_tree_permalink(
            org=org, repo=repo, ignore_workdir=ignore_workdir
        )
        print("GitHub permalink to task commit:", permalink)
        return {"type": "gitRepo", "repoName": f"{org}/{repo}", "commitId": commit}

    def _get_final_json_from_response(self, response_lines: list[str]) -> dict | None:
        try:
            return json.loads(response_lines[-1])
        except json.JSONDecodeError:
            # If the last line of the response isn't JSON, it's probably an error message. We don't
            # want to print the JSONDecodeError and make it hard to see the error message from
            # Vivaria.
            return None

    @typechecked
    def start(  # noqa: PLR0913
        self,
        taskId: str,  # noqa: ANN001, RUF100, N803 (CLI argument so can't change)
        dont_cache: bool = False,
        ssh: bool = False,
        ssh_user: SSHUser = "root",
        task_family_path: str | None = None,
        env_file_path: str | None = None,
        ignore_workdir: bool = False,
        k8s: bool | None = None,
    ) -> None:
        """Start a task environment.

        Start a task environment that you can use to manually test a task, or as an environment
        for a QA run or a human baseline.

        Builds a Docker image for a particular task, starts a container from that image, and runs
        TaskFamily#start in the container.

        Args:
            taskId: The task to test.
            dont_cache: Rebuild the task environment primary machine's Docker image from scratch.
            ssh: SSH into the task environment after starting it.
            ssh_user: User to SSH into the task environment as.
            task_family_path: Path to a task family directory to use. If not provided, Vivaria may
                look up the task family directory from a Git repo that it's configured to use.
            env_file_path: Path to a file of environment variables that Vivaria will set in some
                TaskFamily methods. You can only provide this argument if you also provide
                task_family_path. If neither task_family_path nor env_file_path is provided,
                Vivaria will read environment variables from a file called secrets.env in a Git repo
                that Vivaria is configured to use.
            ignore_workdir: Start task from the current commit while ignoring any uncommitted
                changes.
            k8s: Start the task environment in a Kubernetes cluster.
        """
        if task_family_path is None:
            if env_file_path is not None:
                err_exit("env_file_path cannot be provided without task_family_path")
            task_source = self._setup_task_commit(ignore_workdir=ignore_workdir)
        else:
            task_source = viv_api.upload_task_family(
                pathlib.Path(task_family_path).expanduser(),
                pathlib.Path(env_file_path).expanduser() if env_file_path is not None else None,
            )

        response_lines = viv_api.start_task_environment(
            taskId,
            task_source,
            dont_cache,
            k8s=k8s,
        )

        final_json = self._get_final_json_from_response(response_lines)
        if final_json is None:
            return

        environment_name = final_json.get("environmentName")
        if environment_name is None:
            return

        _set_last_task_environment_name(environment_name)

        if ssh:
            self.ssh(environment_name=environment_name, user=ssh_user)

    @typechecked
    def stop(self, environment_name: str | None = None) -> None:
        """Stop a task environment."""
        viv_api.stop_task_environment(_get_task_environment_name_to_use(environment_name))

    @typechecked
    def restart(self, environment_name: str | None = None) -> None:
        """Stop (if running) and restart a task environment.

        Stops the Docker container associated with a task environment (if it's running), then
        restarts it. Doesn't rerun any TaskFamily methods or make any changes to the container's
        filesystem.

        If the task environment has an aux VM, Vivaria will reboot it. The command will wait until
        the aux VM is accessible over SSH before exiting.
        """
        viv_api.restart_task_environment(_get_task_environment_name_to_use(environment_name))

    @typechecked
    def destroy(self, environment_name: str | None = None) -> None:
        """Destroy a task environment."""
        viv_api.destroy_task_environment(_get_task_environment_name_to_use(environment_name))

    @typechecked
    def score(
        self, environment_name: str | None = None, submission: str | float | dict | None = None
    ) -> None:
        """Score a task environment.

        Run `TaskFamily#score` in a task environment, using a submission passed on the command line
        or read from /home/agent/submission.txt in the environment.
        """
        viv_api.score_task_environment(
            _get_task_environment_name_to_use(environment_name),
            parse_submission(submission) if submission is not None else None,
        )

    @typechecked
    def grant_ssh_access(
        self,
        ssh_public_key_or_key_path: str,
        environment_name: str | None = None,
        user: SSHUser = "agent",
    ) -> None:
        """Grant SSH access to a task environment.

        Allow the person with the SSH private key matching the given public key to SSH into the task
        environment as the given user.

        Args:
            ssh_public_key_or_key_path: SSH public key or path to a file containing the public key.
            environment_name: Name of the task environment to grant access to.
            user: User to grant access to.
        """
        viv_api.grant_ssh_access_to_task_environment(
            _get_task_environment_name_to_use(environment_name),
            resolve_ssh_public_key(ssh_public_key_or_key_path),
            user,
        )

    @typechecked
    def grant_user_access(self, user_email: str, environment_name: str | None = None) -> None:
        """Grant another user access to a task environment.

        Allow the person with the given email to run `viv task` commands on this task environment.
        """
        viv_api.grant_user_access_to_task_environment(
            _get_task_environment_name_to_use(environment_name), user_email
        )

    @typechecked
    def ssh(
        self, environment_name: str | None = None, user: SSHUser = "root", aux_vm: bool = False
    ) -> None:
        """SSH into a task environment as the given user.

        Fails if the task environment has been stopped.
        """
        task_environment = _get_task_environment_name_to_use(environment_name)
        if aux_vm:
            aux_vm_details = viv_api.get_aux_vm_details(container_name=task_environment)
            with _temp_key_file(aux_vm_details) as f:
                opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
                self._ssh.ssh(opts)
        else:
            ip_address = viv_api.get_task_environment_ip_address(task_environment)
            env = viv_api.get_env_for_task_environment(task_environment, user)

            opts = _container_ssh_opts(ip_address, user, env)
            self._ssh.ssh(opts)

    @typechecked
    def scp(
        self,
        source: str,
        destination: str,
        recursive: bool = False,
        user: SSHUser = "root",
        aux_vm: bool = False,
    ) -> None:
        """Use scp to copy a file from your local machine to a task env/aux VM, or vice versa.

        Task environment: Uses the given user, fails if the task environment isn't running.

        Aux VM: Uses the provisioned user on the aux VM.

        Example:
            viv task scp path/to/local/file environment-name:/root/path/to/remote/file
            viv task scp environment-name:~/path/to/remote/file . --user=agent

        Args:
            source: Source file path.
            destination: Destination file path.
            recursive: Whether to copy source recursively.
            user: User to SSH into the task environment as.
            aux_vm: Whether to use the aux VM instead of the task environment.

        Raises:
            ValueError: If both source and destination are local or remote paths.
        """
        source_split = source.split(":")
        destination_split = destination.split(":")
        if (len(source_split) == 1) == (len(destination_split) == 1):
            error_message = (
                "Exactly one of the source and destination must start with a task environment"
                " name, e.g. environment-name:/root/path/to/remote/file"
            )
            err_exit(error_message)

        if len(source_split) == 1:
            environment_name = destination_split[0]
        elif len(destination_split) == 1:
            environment_name = source_split[0]
        else:
            error_message = "How did we get here?"
            raise ValueError(error_message)

        if aux_vm:
            aux_vm_details = viv_api.get_aux_vm_details(container_name=environment_name)
            with _temp_key_file(aux_vm_details) as f:
                opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
                self._ssh.scp(source, destination, opts, recursive=recursive)
        else:
            ip_address = viv_api.get_task_environment_ip_address(environment_name)
            opts = _container_ssh_opts(ip_address, user)
            self._ssh.scp(source, destination, opts, recursive=recursive)

    @typechecked
    def code(
        self,
        environment_name: str | None = None,
        user: SSHUser = "root",
        aux_vm: bool = False,
        editor: CodeEditor = VSCODE,
    ) -> None:
        """Open a code editor (default is VS Code) window.

        For container: Opens the home folder of the given user in the task environment container,
        and fails if the task environment has been stopped.

        For aux VM: Opens the home folder of the provisioned user on the aux VM.

        NOTE: This command may edit your ~/.ssh/config.
        """
        task_environment = _get_task_environment_name_to_use(environment_name)
        if aux_vm:
            aux_vm_details = viv_api.get_aux_vm_details(container_name=task_environment)
            with _temp_key_file(aux_vm_details) as f:
                opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
                self._ssh.open_editor(_aux_vm_host(opts), opts, editor=editor)
        else:
            ip_address = viv_api.get_task_environment_ip_address(task_environment)
            env = viv_api.get_env_for_task_environment(task_environment, user)
            opts = _container_ssh_opts(ip_address, user, env=env)
            host = f"{task_environment}--{user}"
            self._ssh.open_editor(host, opts, editor=editor)

    @typechecked
    def ssh_command(
        self, environment_name: str | None = None, user: SSHUser = "agent", aux_vm: bool = False
    ) -> None:
        """Print a ssh command to connect to a task environment as the given user, or to an aux VM.

        For task environemnt: Fails if the task environment has been stopped.

        For aux VM: Uses the provisioned user on the aux VM.
        """
        task_environment = _get_task_environment_name_to_use(environment_name)
        if aux_vm:
            # We can't use the `with` form here because the user will likely want to access the file
            # after this function returns.
            aux_vm_details = viv_api.get_aux_vm_details(container_name=task_environment)
            f = _temp_key_file(aux_vm_details)
            args = self._ssh.ssh_args(_aux_vm_ssh_opts(f.name, aux_vm_details))
        else:
            ip_address = viv_api.get_task_environment_ip_address(task_environment)
            opts = _container_ssh_opts(ip_address, user)
            args = self._ssh.ssh_args(opts)

        print(" ".join(args))

    @typechecked
    def test(  # noqa: PLR0913
        self,
        taskId: str,  # noqa: ANN001, RUF100, N803 (CLI argument so can't change)
        test_name: str = "",
        dont_cache: bool = False,
        ssh: bool = False,
        ssh_user: SSHUser = "root",
        verbose: bool = False,
        task_family_path: str | None = None,
        env_file_path: str | None = None,
        destroy: bool = False,
        ignore_workdir: bool = False,
        k8s: bool | None = None,
    ) -> None:
        """Start a task environment and run tests.

        Args:
            taskId: The task to test.
            test_name: Test file to run tests from.
            dont_cache: Rebuild the task environment primary machine's Docker image from scratch.
            ssh: SSH into the task environment after starting it.
            ssh_user: User to SSH into the task environment as.
            verbose: Log the output of all tests, on success or failure.
            task_family_path: Path to a task family directory to use. If not provided, Vivaria may
                look up the task family directory from a Git repo that it's configured to use.
            env_file_path: Path to a file of environment variables that Vivaria will set in some
                TaskFamily methods. You can only provide this argument if you also provide
                task_family_path. If neither task_family_path nor env_file_path is provided,
                Vivaria will read environment variables from a file called secrets.env in a Git repo
                that Vivaria is configured to use.
            destroy: Destroy the task environment after running tests.
            ignore_workdir: Run tests on the current commit while ignoring any uncommitted
                changes.
            k8s: Start the task environment in a Kubernetes cluster.
        """
        if task_family_path is None:
            if env_file_path is not None:
                err_exit("env_file_path cannot be provided without task_family_path")

            task_source = self._setup_task_commit(ignore_workdir=ignore_workdir)
        else:
            task_source = viv_api.upload_task_family(
                task_family_path=pathlib.Path(task_family_path).expanduser(),
                env_file_path=pathlib.Path(env_file_path).expanduser()
                if env_file_path is not None
                else None,
            )

        response_lines = viv_api.start_task_test_environment(
            taskId,
            task_source,
            dont_cache,
            test_name,
            include_final_json=True,
            verbose=verbose,
            destroy_on_exit=destroy,
            k8s=k8s,
        )

        final_json = self._get_final_json_from_response(response_lines)
        if final_json is None:
            return

        test_status_code = final_json.get("testStatusCode")

        environment_name = final_json.get("environmentName")
        if environment_name is None:
            sys.exit(test_status_code or 0)

        _set_last_task_environment_name(environment_name)

        if ssh:
            self.ssh(environment_name=environment_name, user=ssh_user)
        else:
            sys.exit(test_status_code or 0)

    @typechecked
    def list(
        self, verbose: bool = False, all_states: bool = False, all_users: bool = False
    ) -> None:
        """List active task environments.

        Args:
            verbose: Whether to print detailed information about each task environment.
            all_states: Whether to list running and stopped task environments, not just running
                ones.
            all_users: Whether to list all users' task environments, not just your own.
        """
        task_environments = viv_api.list_task_environments(
            all_states=all_states, all_users=all_users
        )

        if not verbose:
            for task_environment in task_environments:
                print(task_environment["containerName"])
            return

        print(format_task_environments(task_environments, all_states=all_states))

__init__()

Initialize the task command group.

Source code in cli/viv_cli/main.py
159
160
161
def __init__(self) -> None:
    """Initialize the task command group."""
    self._ssh = SSH()

code(environment_name=None, user='root', aux_vm=False, editor=VSCODE)

Open a code editor (default is VS Code) window.

For container: Opens the home folder of the given user in the task environment container, and fails if the task environment has been stopped.

For aux VM: Opens the home folder of the provisioned user on the aux VM.

NOTE: This command may edit your ~/.ssh/config.

Source code in cli/viv_cli/main.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
@typechecked
def code(
    self,
    environment_name: str | None = None,
    user: SSHUser = "root",
    aux_vm: bool = False,
    editor: CodeEditor = VSCODE,
) -> None:
    """Open a code editor (default is VS Code) window.

    For container: Opens the home folder of the given user in the task environment container,
    and fails if the task environment has been stopped.

    For aux VM: Opens the home folder of the provisioned user on the aux VM.

    NOTE: This command may edit your ~/.ssh/config.
    """
    task_environment = _get_task_environment_name_to_use(environment_name)
    if aux_vm:
        aux_vm_details = viv_api.get_aux_vm_details(container_name=task_environment)
        with _temp_key_file(aux_vm_details) as f:
            opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
            self._ssh.open_editor(_aux_vm_host(opts), opts, editor=editor)
    else:
        ip_address = viv_api.get_task_environment_ip_address(task_environment)
        env = viv_api.get_env_for_task_environment(task_environment, user)
        opts = _container_ssh_opts(ip_address, user, env=env)
        host = f"{task_environment}--{user}"
        self._ssh.open_editor(host, opts, editor=editor)

destroy(environment_name=None)

Destroy a task environment.

Source code in cli/viv_cli/main.py
265
266
267
268
@typechecked
def destroy(self, environment_name: str | None = None) -> None:
    """Destroy a task environment."""
    viv_api.destroy_task_environment(_get_task_environment_name_to_use(environment_name))

grant_ssh_access(ssh_public_key_or_key_path, environment_name=None, user='agent')

Grant SSH access to a task environment.

Allow the person with the SSH private key matching the given public key to SSH into the task environment as the given user.

Parameters:

Name Type Description Default
ssh_public_key_or_key_path str

SSH public key or path to a file containing the public key.

required
environment_name str | None

Name of the task environment to grant access to.

None
user SSHUser

User to grant access to.

'agent'
Source code in cli/viv_cli/main.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@typechecked
def grant_ssh_access(
    self,
    ssh_public_key_or_key_path: str,
    environment_name: str | None = None,
    user: SSHUser = "agent",
) -> None:
    """Grant SSH access to a task environment.

    Allow the person with the SSH private key matching the given public key to SSH into the task
    environment as the given user.

    Args:
        ssh_public_key_or_key_path: SSH public key or path to a file containing the public key.
        environment_name: Name of the task environment to grant access to.
        user: User to grant access to.
    """
    viv_api.grant_ssh_access_to_task_environment(
        _get_task_environment_name_to_use(environment_name),
        resolve_ssh_public_key(ssh_public_key_or_key_path),
        user,
    )

grant_user_access(user_email, environment_name=None)

Grant another user access to a task environment.

Allow the person with the given email to run viv task commands on this task environment.

Source code in cli/viv_cli/main.py
307
308
309
310
311
312
313
314
315
@typechecked
def grant_user_access(self, user_email: str, environment_name: str | None = None) -> None:
    """Grant another user access to a task environment.

    Allow the person with the given email to run `viv task` commands on this task environment.
    """
    viv_api.grant_user_access_to_task_environment(
        _get_task_environment_name_to_use(environment_name), user_email
    )

list(verbose=False, all_states=False, all_users=False)

List active task environments.

Parameters:

Name Type Description Default
verbose bool

Whether to print detailed information about each task environment.

False
all_states bool

Whether to list running and stopped task environments, not just running ones.

False
all_users bool

Whether to list all users' task environments, not just your own.

False
Source code in cli/viv_cli/main.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
@typechecked
def list(
    self, verbose: bool = False, all_states: bool = False, all_users: bool = False
) -> None:
    """List active task environments.

    Args:
        verbose: Whether to print detailed information about each task environment.
        all_states: Whether to list running and stopped task environments, not just running
            ones.
        all_users: Whether to list all users' task environments, not just your own.
    """
    task_environments = viv_api.list_task_environments(
        all_states=all_states, all_users=all_users
    )

    if not verbose:
        for task_environment in task_environments:
            print(task_environment["containerName"])
        return

    print(format_task_environments(task_environments, all_states=all_states))

restart(environment_name=None)

Stop (if running) and restart a task environment.

Stops the Docker container associated with a task environment (if it's running), then restarts it. Doesn't rerun any TaskFamily methods or make any changes to the container's filesystem.

If the task environment has an aux VM, Vivaria will reboot it. The command will wait until the aux VM is accessible over SSH before exiting.

Source code in cli/viv_cli/main.py
252
253
254
255
256
257
258
259
260
261
262
263
@typechecked
def restart(self, environment_name: str | None = None) -> None:
    """Stop (if running) and restart a task environment.

    Stops the Docker container associated with a task environment (if it's running), then
    restarts it. Doesn't rerun any TaskFamily methods or make any changes to the container's
    filesystem.

    If the task environment has an aux VM, Vivaria will reboot it. The command will wait until
    the aux VM is accessible over SSH before exiting.
    """
    viv_api.restart_task_environment(_get_task_environment_name_to_use(environment_name))

score(environment_name=None, submission=None)

Score a task environment.

Run TaskFamily#score in a task environment, using a submission passed on the command line or read from /home/agent/submission.txt in the environment.

Source code in cli/viv_cli/main.py
270
271
272
273
274
275
276
277
278
279
280
281
282
@typechecked
def score(
    self, environment_name: str | None = None, submission: str | float | dict | None = None
) -> None:
    """Score a task environment.

    Run `TaskFamily#score` in a task environment, using a submission passed on the command line
    or read from /home/agent/submission.txt in the environment.
    """
    viv_api.score_task_environment(
        _get_task_environment_name_to_use(environment_name),
        parse_submission(submission) if submission is not None else None,
    )

scp(source, destination, recursive=False, user='root', aux_vm=False)

Use scp to copy a file from your local machine to a task env/aux VM, or vice versa.

Task environment: Uses the given user, fails if the task environment isn't running.

Aux VM: Uses the provisioned user on the aux VM.

Example

viv task scp path/to/local/file environment-name:/root/path/to/remote/file viv task scp environment-name:~/path/to/remote/file . --user=agent

Parameters:

Name Type Description Default
source str

Source file path.

required
destination str

Destination file path.

required
recursive bool

Whether to copy source recursively.

False
user SSHUser

User to SSH into the task environment as.

'root'
aux_vm bool

Whether to use the aux VM instead of the task environment.

False

Raises:

Type Description
ValueError

If both source and destination are local or remote paths.

Source code in cli/viv_cli/main.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
@typechecked
def scp(
    self,
    source: str,
    destination: str,
    recursive: bool = False,
    user: SSHUser = "root",
    aux_vm: bool = False,
) -> None:
    """Use scp to copy a file from your local machine to a task env/aux VM, or vice versa.

    Task environment: Uses the given user, fails if the task environment isn't running.

    Aux VM: Uses the provisioned user on the aux VM.

    Example:
        viv task scp path/to/local/file environment-name:/root/path/to/remote/file
        viv task scp environment-name:~/path/to/remote/file . --user=agent

    Args:
        source: Source file path.
        destination: Destination file path.
        recursive: Whether to copy source recursively.
        user: User to SSH into the task environment as.
        aux_vm: Whether to use the aux VM instead of the task environment.

    Raises:
        ValueError: If both source and destination are local or remote paths.
    """
    source_split = source.split(":")
    destination_split = destination.split(":")
    if (len(source_split) == 1) == (len(destination_split) == 1):
        error_message = (
            "Exactly one of the source and destination must start with a task environment"
            " name, e.g. environment-name:/root/path/to/remote/file"
        )
        err_exit(error_message)

    if len(source_split) == 1:
        environment_name = destination_split[0]
    elif len(destination_split) == 1:
        environment_name = source_split[0]
    else:
        error_message = "How did we get here?"
        raise ValueError(error_message)

    if aux_vm:
        aux_vm_details = viv_api.get_aux_vm_details(container_name=environment_name)
        with _temp_key_file(aux_vm_details) as f:
            opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
            self._ssh.scp(source, destination, opts, recursive=recursive)
    else:
        ip_address = viv_api.get_task_environment_ip_address(environment_name)
        opts = _container_ssh_opts(ip_address, user)
        self._ssh.scp(source, destination, opts, recursive=recursive)

ssh(environment_name=None, user='root', aux_vm=False)

SSH into a task environment as the given user.

Fails if the task environment has been stopped.

Source code in cli/viv_cli/main.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
@typechecked
def ssh(
    self, environment_name: str | None = None, user: SSHUser = "root", aux_vm: bool = False
) -> None:
    """SSH into a task environment as the given user.

    Fails if the task environment has been stopped.
    """
    task_environment = _get_task_environment_name_to_use(environment_name)
    if aux_vm:
        aux_vm_details = viv_api.get_aux_vm_details(container_name=task_environment)
        with _temp_key_file(aux_vm_details) as f:
            opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
            self._ssh.ssh(opts)
    else:
        ip_address = viv_api.get_task_environment_ip_address(task_environment)
        env = viv_api.get_env_for_task_environment(task_environment, user)

        opts = _container_ssh_opts(ip_address, user, env)
        self._ssh.ssh(opts)

ssh_command(environment_name=None, user='agent', aux_vm=False)

Print a ssh command to connect to a task environment as the given user, or to an aux VM.

For task environemnt: Fails if the task environment has been stopped.

For aux VM: Uses the provisioned user on the aux VM.

Source code in cli/viv_cli/main.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
@typechecked
def ssh_command(
    self, environment_name: str | None = None, user: SSHUser = "agent", aux_vm: bool = False
) -> None:
    """Print a ssh command to connect to a task environment as the given user, or to an aux VM.

    For task environemnt: Fails if the task environment has been stopped.

    For aux VM: Uses the provisioned user on the aux VM.
    """
    task_environment = _get_task_environment_name_to_use(environment_name)
    if aux_vm:
        # We can't use the `with` form here because the user will likely want to access the file
        # after this function returns.
        aux_vm_details = viv_api.get_aux_vm_details(container_name=task_environment)
        f = _temp_key_file(aux_vm_details)
        args = self._ssh.ssh_args(_aux_vm_ssh_opts(f.name, aux_vm_details))
    else:
        ip_address = viv_api.get_task_environment_ip_address(task_environment)
        opts = _container_ssh_opts(ip_address, user)
        args = self._ssh.ssh_args(opts)

    print(" ".join(args))

start(taskId, dont_cache=False, ssh=False, ssh_user='root', task_family_path=None, env_file_path=None, ignore_workdir=False, k8s=None)

Start a task environment.

Start a task environment that you can use to manually test a task, or as an environment for a QA run or a human baseline.

Builds a Docker image for a particular task, starts a container from that image, and runs TaskFamily#start in the container.

Parameters:

Name Type Description Default
taskId str

The task to test.

required
dont_cache bool

Rebuild the task environment primary machine's Docker image from scratch.

False
ssh bool

SSH into the task environment after starting it.

False
ssh_user SSHUser

User to SSH into the task environment as.

'root'
task_family_path str | None

Path to a task family directory to use. If not provided, Vivaria may look up the task family directory from a Git repo that it's configured to use.

None
env_file_path str | None

Path to a file of environment variables that Vivaria will set in some TaskFamily methods. You can only provide this argument if you also provide task_family_path. If neither task_family_path nor env_file_path is provided, Vivaria will read environment variables from a file called secrets.env in a Git repo that Vivaria is configured to use.

None
ignore_workdir bool

Start task from the current commit while ignoring any uncommitted changes.

False
k8s bool | None

Start the task environment in a Kubernetes cluster.

None
Source code in cli/viv_cli/main.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@typechecked
def start(  # noqa: PLR0913
    self,
    taskId: str,  # noqa: ANN001, RUF100, N803 (CLI argument so can't change)
    dont_cache: bool = False,
    ssh: bool = False,
    ssh_user: SSHUser = "root",
    task_family_path: str | None = None,
    env_file_path: str | None = None,
    ignore_workdir: bool = False,
    k8s: bool | None = None,
) -> None:
    """Start a task environment.

    Start a task environment that you can use to manually test a task, or as an environment
    for a QA run or a human baseline.

    Builds a Docker image for a particular task, starts a container from that image, and runs
    TaskFamily#start in the container.

    Args:
        taskId: The task to test.
        dont_cache: Rebuild the task environment primary machine's Docker image from scratch.
        ssh: SSH into the task environment after starting it.
        ssh_user: User to SSH into the task environment as.
        task_family_path: Path to a task family directory to use. If not provided, Vivaria may
            look up the task family directory from a Git repo that it's configured to use.
        env_file_path: Path to a file of environment variables that Vivaria will set in some
            TaskFamily methods. You can only provide this argument if you also provide
            task_family_path. If neither task_family_path nor env_file_path is provided,
            Vivaria will read environment variables from a file called secrets.env in a Git repo
            that Vivaria is configured to use.
        ignore_workdir: Start task from the current commit while ignoring any uncommitted
            changes.
        k8s: Start the task environment in a Kubernetes cluster.
    """
    if task_family_path is None:
        if env_file_path is not None:
            err_exit("env_file_path cannot be provided without task_family_path")
        task_source = self._setup_task_commit(ignore_workdir=ignore_workdir)
    else:
        task_source = viv_api.upload_task_family(
            pathlib.Path(task_family_path).expanduser(),
            pathlib.Path(env_file_path).expanduser() if env_file_path is not None else None,
        )

    response_lines = viv_api.start_task_environment(
        taskId,
        task_source,
        dont_cache,
        k8s=k8s,
    )

    final_json = self._get_final_json_from_response(response_lines)
    if final_json is None:
        return

    environment_name = final_json.get("environmentName")
    if environment_name is None:
        return

    _set_last_task_environment_name(environment_name)

    if ssh:
        self.ssh(environment_name=environment_name, user=ssh_user)

stop(environment_name=None)

Stop a task environment.

Source code in cli/viv_cli/main.py
247
248
249
250
@typechecked
def stop(self, environment_name: str | None = None) -> None:
    """Stop a task environment."""
    viv_api.stop_task_environment(_get_task_environment_name_to_use(environment_name))

test(taskId, test_name='', dont_cache=False, ssh=False, ssh_user='root', verbose=False, task_family_path=None, env_file_path=None, destroy=False, ignore_workdir=False, k8s=None)

Start a task environment and run tests.

Parameters:

Name Type Description Default
taskId str

The task to test.

required
test_name str

Test file to run tests from.

''
dont_cache bool

Rebuild the task environment primary machine's Docker image from scratch.

False
ssh bool

SSH into the task environment after starting it.

False
ssh_user SSHUser

User to SSH into the task environment as.

'root'
verbose bool

Log the output of all tests, on success or failure.

False
task_family_path str | None

Path to a task family directory to use. If not provided, Vivaria may look up the task family directory from a Git repo that it's configured to use.

None
env_file_path str | None

Path to a file of environment variables that Vivaria will set in some TaskFamily methods. You can only provide this argument if you also provide task_family_path. If neither task_family_path nor env_file_path is provided, Vivaria will read environment variables from a file called secrets.env in a Git repo that Vivaria is configured to use.

None
destroy bool

Destroy the task environment after running tests.

False
ignore_workdir bool

Run tests on the current commit while ignoring any uncommitted changes.

False
k8s bool | None

Start the task environment in a Kubernetes cluster.

None
Source code in cli/viv_cli/main.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
@typechecked
def test(  # noqa: PLR0913
    self,
    taskId: str,  # noqa: ANN001, RUF100, N803 (CLI argument so can't change)
    test_name: str = "",
    dont_cache: bool = False,
    ssh: bool = False,
    ssh_user: SSHUser = "root",
    verbose: bool = False,
    task_family_path: str | None = None,
    env_file_path: str | None = None,
    destroy: bool = False,
    ignore_workdir: bool = False,
    k8s: bool | None = None,
) -> None:
    """Start a task environment and run tests.

    Args:
        taskId: The task to test.
        test_name: Test file to run tests from.
        dont_cache: Rebuild the task environment primary machine's Docker image from scratch.
        ssh: SSH into the task environment after starting it.
        ssh_user: User to SSH into the task environment as.
        verbose: Log the output of all tests, on success or failure.
        task_family_path: Path to a task family directory to use. If not provided, Vivaria may
            look up the task family directory from a Git repo that it's configured to use.
        env_file_path: Path to a file of environment variables that Vivaria will set in some
            TaskFamily methods. You can only provide this argument if you also provide
            task_family_path. If neither task_family_path nor env_file_path is provided,
            Vivaria will read environment variables from a file called secrets.env in a Git repo
            that Vivaria is configured to use.
        destroy: Destroy the task environment after running tests.
        ignore_workdir: Run tests on the current commit while ignoring any uncommitted
            changes.
        k8s: Start the task environment in a Kubernetes cluster.
    """
    if task_family_path is None:
        if env_file_path is not None:
            err_exit("env_file_path cannot be provided without task_family_path")

        task_source = self._setup_task_commit(ignore_workdir=ignore_workdir)
    else:
        task_source = viv_api.upload_task_family(
            task_family_path=pathlib.Path(task_family_path).expanduser(),
            env_file_path=pathlib.Path(env_file_path).expanduser()
            if env_file_path is not None
            else None,
        )

    response_lines = viv_api.start_task_test_environment(
        taskId,
        task_source,
        dont_cache,
        test_name,
        include_final_json=True,
        verbose=verbose,
        destroy_on_exit=destroy,
        k8s=k8s,
    )

    final_json = self._get_final_json_from_response(response_lines)
    if final_json is None:
        return

    test_status_code = final_json.get("testStatusCode")

    environment_name = final_json.get("environmentName")
    if environment_name is None:
        sys.exit(test_status_code or 0)

    _set_last_task_environment_name(environment_name)

    if ssh:
        self.ssh(environment_name=environment_name, user=ssh_user)
    else:
        sys.exit(test_status_code or 0)

Vivaria

viv CLI.

CLI for running agents on tasks and managing task environments. To exit help use ctrl+\\.

Source code in cli/viv_cli/main.py
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
class Vivaria:
    r"""viv CLI.

    CLI for running agents on tasks and managing task environments. To exit help use `ctrl+\\`.
    """

    def __init__(self, dev: bool = False) -> None:
        """Initialise the CLI."""
        GlobalOptions.dev_mode = dev
        self._ssh = SSH()
        # Add groups of commands
        self.config = Config()
        self.task = Task()
        self.run_batch = RunBatch()

    @typechecked
    def run(  # noqa: PLR0912, PLR0913, C901
        self,
        task: str,
        path: str | None = None,
        yes: bool = False,
        verbose: bool = False,
        open_browser: bool = False,
        max_tokens: int = 300_000,
        max_actions: int = 1_000,
        max_total_seconds: int = 60 * 60 * 24 * 7,
        max_cost: float = 100,
        checkpoint_tokens: int | None = None,
        checkpoint_actions: int | None = None,
        checkpoint_total_seconds: int | None = None,
        checkpoint_cost: float | None = None,
        intervention: bool = False,
        agent_starting_state: str | dict | None = None,
        agent_starting_state_file: str | None = None,
        agent_settings_override: str | dict | None = None,
        agent_settings_pack: str | None = None,
        name: str | None = None,
        metadata: dict[str, str] = {},  # noqa: B006
        repo: str | None = None,
        branch: str | None = None,
        commit: str | None = None,
        priority: Literal["low", "high"] | None = None,
        low_priority: bool | None = None,
        parent: int | None = None,
        batch_name: str | None = None,
        batch_concurrency_limit: int | None = None,
        dangerously_ignore_global_limits: bool = False,
        keep_task_environment_running: bool = False,
        agent_path: str | None = None,
        task_family_path: str | None = None,
        env_file_path: str | None = None,
        k8s: bool | None = None,
        task_repo: str | None = None,
    ) -> None:
        """Construct a task environment and run an agent in it.

        You can either run this command from a clone of an agent repo on your computer, or you can
        specify the repo, branch, and commit to use.

        Args:
            task: The task to run. Specified as `taskId@ref`, with the ref defaulting to
                `origin/main`. The ref can be a branch, tag, or commit.
            path: The path to the git repo containing the agent code. Defaults to the current
                directory. Should not be specified if the `repo`, `branch`, and `commit` arguments,
                or the `agent_path` argument, are specified instead.
            yes: Whether to skip the confirmation prompt before starting the agent.
            verbose: Whether to print verbose output.
            open_browser: Whether to open the agent run page in the default browser.
            max_tokens: The maximum number of tokens the agent can use.
            max_actions: The maximum number of actions the agent can take.
            max_total_seconds: The maximum number of seconds the agent can run for.
            max_cost: The maximum cost of the tokens the agent can use. The currency depends on the
                Vivaria installation you're using.
            checkpoint_tokens: If provided, the agent will pause and wait for human input
                after using this many tokens.
            checkpoint_actions: If provided, the agent will pause and wait for human input
                after taking this many actions.
            checkpoint_total_seconds: If provided, the agent will pause and wait for human input
                after running for this many seconds.
            checkpoint_cost: If provided, the agent will pause and wait for human input
                after spending this much on tokens. The currency depends on the Vivaria installation
                you're using.
            intervention: Whether the agent requires human intervention.
            agent_starting_state: The starting state of the agent, as a JSON string.
            agent_starting_state_file: The path to a file containing the starting state of the
                agent.
            agent_settings_override: The agent settings to override, as a JSON string.
            agent_settings_pack: The agent settings pack to use.
            name: The name of the agent run.
            metadata: Metadata to attach to the agent run.
            repo: The git repo containing the agent code.
            branch: The branch of the git repo containing the agent code.
            commit: The commit of the git repo containing the agent code.
            priority: The priority of the agent run. Can be low or high. Use low priority for
                batches of runs. Use high priority for single runs, if you want the run to start
                quickly and labs not to rate-limit the agent as often.
            low_priority: Deprecated. Use --priority instead. Whether to run the agent in low
                priority mode.
            parent: The ID of the parent run.
            batch_name: The name of the batch to run the agent in.
            batch_concurrency_limit: The maximum number of agents that can run in the batch at the
                same time.
            dangerously_ignore_global_limits: A flag to allow arbitrarily high
                values for max_tokens, max_actions, and max_total_seconds.
            keep_task_environment_running: A flag to keep the task environment running if the agent
                or task crashes. Can still be killed by user.
            agent_path: Optionally specify a path to an agent folder rather than
                using the content of a git repo
            task_family_path: Path to a task family directory to use. If not provided, Vivaria may
                look up the task family directory from a Git repo that it's configured to use.
            env_file_path: Path to a file of environment variables that Vivaria will set in some
                TaskFamily methods. You can only provide this argument if you also provide
                task_family_path. If neither task_family_path nor env_file_path is provided,
                Vivaria will read environment variables from a file called secrets.env in a Git repo
                that Vivaria is configured to use.
            k8s: Run the agent in a Kubernetes cluster.
            task_repo: Optionally specify the task repository. Should include the owner name,
                e.g. METR/mp4-tasks.
        """
        # Set global options
        GlobalOptions.yes_mode = yes
        GlobalOptions.verbose = verbose

        if task_family_path is None and env_file_path is not None:
            err_exit("env_file_path cannot be provided without task_family_path")
        if priority is not None and low_priority is not None:
            err_exit("cannot specify both priority and low_priority")

        uploaded_agent_path = None
        if agent_path is not None:
            if repo is not None or branch is not None or commit is not None or path is not None:
                err_exit("Either specify agent_path or git details but not both.")
            uploaded_agent_path = viv_api.upload_folder(pathlib.Path(agent_path).expanduser())
        elif repo is None:
            cwd = os.path.curdir
            try:
                os.chdir(path if path is not None else ".")
                _assert_current_directory_is_repo_in_org()
                gh.ask_pull_repo_or_exit()
                org, repo = gh.get_org_and_repo()
                branch, commit, link = gh.create_working_tree_permalink(org=org, repo=repo)
                print_if_verbose(link)
                print_if_verbose("Requesting agent run on server")
            except AssertionError as e:
                err_exit(str(e))
            finally:
                os.chdir(cwd)

        if agent_starting_state is not None and agent_starting_state_file is not None:
            err_exit("Cannot specify both agent starting state and agent starting state file")

        agent_starting_state = agent_starting_state or agent_starting_state_file

        starting_state = _get_input_json(agent_starting_state, "agent starting state")
        settings_override = _get_input_json(agent_settings_override, "agent settings override")

        task_parts = task.split("@")
        task_id = task_parts[0]
        task_branch = task_parts[1] if len(task_parts) > 1 else "main"

        if batch_concurrency_limit is not None:
            if batch_name is None:
                err_exit("To use --batch-concurrency-limit, you must also specify --batch-name")
            if batch_concurrency_limit < 0:
                err_exit("--batch-concurrency-limit must not be negative")

        if task_family_path is not None:
            task_source: viv_api.TaskSource = viv_api.upload_task_family(
                task_family_path=pathlib.Path(task_family_path).expanduser(),
                env_file_path=pathlib.Path(env_file_path).expanduser()
                if env_file_path is not None
                else None,
            )
        else:
            task_source = viv_api.GitRepoTaskSource(
                type="gitRepo",
                repoName=task_repo or get_user_config().tasksRepoSlug,
                commitId=None,
            )

        if priority is None and low_priority is not None:
            priority = "low" if low_priority else "high"

        viv_api.setup_and_run_agent(
            {
                "agentRepoName": repo,
                "agentBranch": branch,
                "agentCommitId": commit,
                "uploadedAgentPath": uploaded_agent_path,
                "taskId": task_id,
                "taskBranch": task_branch,
                "name": name,
                "metadata": metadata,
                "usageLimits": {
                    "tokens": max_tokens,
                    "actions": max_actions,
                    "total_seconds": max_total_seconds,
                    "cost": max_cost,
                },
                "checkpoint": {
                    "tokens": checkpoint_tokens,
                    "actions": checkpoint_actions,
                    "total_seconds": checkpoint_total_seconds,
                    "cost": checkpoint_cost,
                },
                "requiresHumanIntervention": intervention,
                "agentStartingState": starting_state,
                "agentSettingsOverride": settings_override,
                "agentSettingsPack": agent_settings_pack,
                "priority": priority,
                # TODO: Stop sending isLowPriority once Vivaria instances stop expecting it.
                "isLowPriority": priority != "high",
                "parentRunId": parent,
                "batchName": str(batch_name) if batch_name is not None else None,
                "batchConcurrencyLimit": batch_concurrency_limit,
                "dangerouslyIgnoreGlobalLimits": dangerously_ignore_global_limits,
                "keepTaskEnvironmentRunning": keep_task_environment_running,
                "taskSource": task_source,
                "isK8s": k8s,
            },
            verbose=verbose,
            open_browser=open_browser,
        )

    @typechecked
    def get_run(self, run_id: int) -> None:
        """Get a run."""
        print(json.dumps(viv_api.get_run(run_id), indent=2))

    @typechecked
    def get_run_status(self, run_id: int) -> None:
        """Get the status of a run."""
        print(json.dumps(viv_api.get_run_status(run_id), indent=2))

    @typechecked
    def query(
        self,
        query: str | None = None,
        output_format: Literal["csv", "json", "jsonl"] = "jsonl",
        output: str | pathlib.Path | None = None,
    ) -> None:
        """Query vivaria database.

        Args:
            query: The query to execute, or the path to a query. If not provided, runs the default
                query.
            output_format: The format to output the runs in. Either "csv" or "json".
            output: The path to a file to output the runs to. If not provided, prints to stdout.
        """
        if query is not None:
            query_file = pathlib.Path(query).expanduser()
            if query_file.exists():
                with query_file.open() as file:
                    query = file.read()

        runs = viv_api.query_runs(query).get("rows", [])

        if output is not None:
            output_file = pathlib.Path(output).expanduser()
            output_file.parent.mkdir(parents=True, exist_ok=True)
        else:
            output_file = None

        with contextlib.nullcontext(sys.stdout) if output_file is None else output_file.open(
            "w"
        ) as file:
            if output_format == "csv":
                if not runs:
                    return
                writer = csv.DictWriter(file, fieldnames=runs[0].keys(), lineterminator="\n")
                writer.writeheader()
                for run in runs:
                    writer.writerow(run)
            elif output_format == "json":
                json.dump(runs, file, indent=2)
            else:
                for run in runs:
                    file.write(json.dumps(run) + "\n")

    @typechecked
    def get_agent_state(self, run_id: int, index: int, agent_branch_number: int = 0) -> None:
        """Get the last state of an agent run."""
        print(json.dumps(viv_api.get_agent_state(run_id, index, agent_branch_number), indent=2))

    @typechecked
    def get_run_usage(self, run_id: int, branch_number: int = 0) -> None:
        """Get the time and token usage of an agent run."""
        print(json.dumps(viv_api.get_run_usage(run_id, branch_number), indent=2))

    @typechecked
    def register_ssh_public_key(self, ssh_public_key_path: str) -> None:
        """Register your SSH public key.

        This id done, so that you can use viv ssh and viv scp on agent containers you create.
        """
        if not ssh_public_key_path.endswith(".pub"):
            err_exit(
                f'Exiting because the path {ssh_public_key_path} does not end with ".pub". '
                "Please confirm that the file contains a public key, then rename it so "
                'it ends in ".pub".'
            )

        try:
            with pathlib.Path(ssh_public_key_path).expanduser().open() as f:
                ssh_public_key = f.read().strip()
        except FileNotFoundError:
            err_exit(f"File {ssh_public_key_path} not found")

        viv_api.register_ssh_public_key(ssh_public_key)

        private_key_path = (
            pathlib.Path(ssh_public_key_path.removesuffix(".pub")).expanduser().resolve()
        )
        if not private_key_path.exists():
            print(
                "WARNING: You must have a private key file corresponding to that public key locally"
                f" named {private_key_path} to access your runs."
            )
            return

        set_user_config({"sshPrivateKeyPath": str(private_key_path)})

        print(
            "Successfully registered your SSH public key and wrote the path to your private key to"
            " viv config.\nThis applies to new runs you create, and doesn't allow you to ssh into "
            "old runs."
        )

    @typechecked
    def score(self, run_id: int, submission: str | float | dict) -> None:
        """Score a run.

        Run `TaskFamily#score` in a run's agent container, using a submission passed on the command
        line.
        """
        viv_api.score_run(run_id, parse_submission(submission))

    @typechecked
    def grant_ssh_access(
        self, run_id: int, ssh_public_key_or_key_path: str, user: SSHUser = "agent"
    ) -> None:
        """Grant SSH access to a run.

        Allow the person with the SSH private key matching the given public key to SSH into the run
        as the given user.

        Args:
            run_id: ID of the run to grant access to.
            ssh_public_key_or_key_path: SSH public key or path to a file containing the public key.
            user: User to grant access to.
        """
        viv_api.grant_ssh_access_to_run(
            run_id, resolve_ssh_public_key(ssh_public_key_or_key_path), user
        )

    @typechecked
    def ssh(self, run_id: int, user: SSHUser = "root", aux_vm: bool = False) -> None:
        """SSH into the agent container or aux VM for a run ID.

        For agent containers: Starts the agent container if necessary and uses the given user
        (defaulting to root).

        For aux VMs: Uses the provisioned aux VM user.
        """
        if aux_vm:
            aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
            with _temp_key_file(aux_vm_details) as f:
                opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
                self._ssh.ssh(opts)
        else:
            viv_api.start_agent_container(run_id)
            ip_address = viv_api.get_agent_container_ip_address(run_id)
            env = viv_api.get_env_for_run(run_id, user)
            opts = _container_ssh_opts(ip_address, user, env)
            self._ssh.ssh(opts)

    @typechecked
    def ssh_command(self, run_id: int, user: SSHUser = "agent", aux_vm: bool = False) -> None:
        """Print a ssh command to connect to an agent container as the given user, or to an aux VM.

        For agent container: Fails if the agent container has been stopped.

        For aux VM: Uses the provisioned user on the aux VM.
        """
        if aux_vm:
            # We can't use the `with` form here because the user will likely want to access the file
            # after this function returns.
            aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
            f = _temp_key_file(aux_vm_details)
            args = self._ssh.ssh_args(_aux_vm_ssh_opts(f.name, aux_vm_details))
        else:
            ip_address = viv_api.get_agent_container_ip_address(run_id)
            opts = _container_ssh_opts(ip_address, user)
            args = self._ssh.ssh_args(opts)

        print(" ".join(args))

    @typechecked
    def scp(
        self,
        source: str,
        destination: str,
        recursive: bool = False,
        user: SSHUser = "root",
        aux_vm: bool = False,
    ) -> None:
        """SCP.

        Use scp to copy a file from your local machine to the agent container/aux VM for a run ID,
        or vice versa.

        For agent container: Starts the agent container if necessary and SSHes into the agent
        container as the given user, defaulting to root.

        For aux VM: Uses the provisioned aux VM user.

        Example:
            viv scp path/to/local/file 12345:/root/path/to/remote/file
            viv scp 67890:~/path/to/remote/file . --user=agent

        Args:
            source: Source file path.
            destination: Destination file path.
            recursive: Whether to copy source recursively.
            user: User to SSH into the agent container as.
            aux_vm: Whether to SCP to the aux VM.

        Raises:
            ValueError: If both source and destination are local or remote paths.
        """
        source_split = source.split(":")
        destination_split = destination.split(":")
        if (len(source_split) == 1) == (len(destination_split) == 1):
            error_message = (
                "Exactly one of the source and destination must start with a run ID"
                ", e.g. 12345:/root/path/to/remote/file"
            )
            err_exit(error_message)

        def parse_run_id(val: str) -> int:
            try:
                return int(val)
            except (TypeError, ValueError):
                err_exit(f"Invalid run ID {val}")

        if len(source_split) == 1:
            run_id = parse_run_id(destination_split[0])
        elif len(destination_split) == 1:
            run_id = parse_run_id(source_split[0])
        else:
            error_message = "How did we get here?"
            raise ValueError(error_message)

        if aux_vm:
            aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
            with _temp_key_file(aux_vm_details) as f:
                opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
                self._ssh.scp(source, destination, opts, recursive=recursive)
        else:
            viv_api.start_agent_container(run_id)
            ip_address = viv_api.get_agent_container_ip_address(run_id)
            opts = _container_ssh_opts(ip_address, user)
            self._ssh.scp(source, destination, recursive=recursive, opts=opts)

    @typechecked
    def code(
        self, run_id: int, user: SSHUser = "root", aux_vm: bool = False, editor: CodeEditor = VSCODE
    ) -> None:
        """Open a code editor (default is VSCode) window to the agent/task container or aux VM.

        For container: Opens the home folder of the given user on the task/agent container
        for a run ID, and starts the container if necessary.

        For aux VM: Opens the home folder of the provisioned user on the aux VM.

        NOTE: This command may edit your ~/.ssh/config.
        """
        if aux_vm:
            aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
            with _temp_key_file(aux_vm_details) as f:
                opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
                host = _aux_vm_host(opts)
                self._ssh.open_editor(host, opts, editor=editor)
        else:
            viv_api.start_agent_container(run_id)
            ip_address = viv_api.get_agent_container_ip_address(run_id)
            env = viv_api.get_env_for_run(run_id, user)
            opts = _container_ssh_opts(ip_address, user, env=env)
            host = f"viv-vm-{user}-{run_id}"
            self._ssh.open_editor(host, opts, editor=editor)

    @typechecked
    def print_git_details(self, path: str = ".", dont_commit_new_changes: bool = False) -> None:
        """Print the git details for the current directory and optionally push the latest commit."""
        cwd = os.curdir
        try:
            os.chdir(path)
            _assert_current_directory_is_repo_in_org()

            if dont_commit_new_changes:
                _, repo = gh.get_org_and_repo()

                branch = gh.get_branch() or err_exit(
                    "Error: can't start run from detached head (must be on branch)"
                )
                commit = gh.get_latest_commit_id()
                execute(f"git push -u origin {branch}", error_out=True, log=True)
            else:
                gh.ask_pull_repo_or_exit()
                org, repo = gh.get_org_and_repo()
                branch, commit, _link = gh.create_working_tree_permalink(org=org, repo=repo)

            print(f"--repo '{repo}' --branch '{branch}' --commit '{commit}'")
        except AssertionError as e:
            err_exit(str(e))
        finally:
            os.chdir(cwd)

    @typechecked
    def upgrade(self) -> None:
        """Upgrade the CLI."""
        execute(
            (
                f"pip install --force-reinstall --exists-action=w "
                f"git+{get_user_config().mp4RepoUrl}@main#egg=viv-cli&subdirectory=cli"
            ),
            log=True,
            error_out=True,
        )

    @typechecked
    def kill(self, run_id: int) -> None:
        """Kill a run."""
        viv_api.kill_run(run_id)

    @typechecked
    def unkill(self, run_id: int, branch_number: int = 0) -> None:
        """Unkill a run."""
        viv_api.unkill_branch(run_id, branch_number)

__init__(dev=False)

Initialise the CLI.

Source code in cli/viv_cli/main.py
572
573
574
575
576
577
578
579
def __init__(self, dev: bool = False) -> None:
    """Initialise the CLI."""
    GlobalOptions.dev_mode = dev
    self._ssh = SSH()
    # Add groups of commands
    self.config = Config()
    self.task = Task()
    self.run_batch = RunBatch()

code(run_id, user='root', aux_vm=False, editor=VSCODE)

Open a code editor (default is VSCode) window to the agent/task container or aux VM.

For container: Opens the home folder of the given user on the task/agent container for a run ID, and starts the container if necessary.

For aux VM: Opens the home folder of the provisioned user on the aux VM.

NOTE: This command may edit your ~/.ssh/config.

Source code in cli/viv_cli/main.py
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
@typechecked
def code(
    self, run_id: int, user: SSHUser = "root", aux_vm: bool = False, editor: CodeEditor = VSCODE
) -> None:
    """Open a code editor (default is VSCode) window to the agent/task container or aux VM.

    For container: Opens the home folder of the given user on the task/agent container
    for a run ID, and starts the container if necessary.

    For aux VM: Opens the home folder of the provisioned user on the aux VM.

    NOTE: This command may edit your ~/.ssh/config.
    """
    if aux_vm:
        aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
        with _temp_key_file(aux_vm_details) as f:
            opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
            host = _aux_vm_host(opts)
            self._ssh.open_editor(host, opts, editor=editor)
    else:
        viv_api.start_agent_container(run_id)
        ip_address = viv_api.get_agent_container_ip_address(run_id)
        env = viv_api.get_env_for_run(run_id, user)
        opts = _container_ssh_opts(ip_address, user, env=env)
        host = f"viv-vm-{user}-{run_id}"
        self._ssh.open_editor(host, opts, editor=editor)

get_agent_state(run_id, index, agent_branch_number=0)

Get the last state of an agent run.

Source code in cli/viv_cli/main.py
845
846
847
848
@typechecked
def get_agent_state(self, run_id: int, index: int, agent_branch_number: int = 0) -> None:
    """Get the last state of an agent run."""
    print(json.dumps(viv_api.get_agent_state(run_id, index, agent_branch_number), indent=2))

get_run(run_id)

Get a run.

Source code in cli/viv_cli/main.py
790
791
792
793
@typechecked
def get_run(self, run_id: int) -> None:
    """Get a run."""
    print(json.dumps(viv_api.get_run(run_id), indent=2))

get_run_status(run_id)

Get the status of a run.

Source code in cli/viv_cli/main.py
795
796
797
798
@typechecked
def get_run_status(self, run_id: int) -> None:
    """Get the status of a run."""
    print(json.dumps(viv_api.get_run_status(run_id), indent=2))

get_run_usage(run_id, branch_number=0)

Get the time and token usage of an agent run.

Source code in cli/viv_cli/main.py
850
851
852
853
@typechecked
def get_run_usage(self, run_id: int, branch_number: int = 0) -> None:
    """Get the time and token usage of an agent run."""
    print(json.dumps(viv_api.get_run_usage(run_id, branch_number), indent=2))

grant_ssh_access(run_id, ssh_public_key_or_key_path, user='agent')

Grant SSH access to a run.

Allow the person with the SSH private key matching the given public key to SSH into the run as the given user.

Parameters:

Name Type Description Default
run_id int

ID of the run to grant access to.

required
ssh_public_key_or_key_path str

SSH public key or path to a file containing the public key.

required
user SSHUser

User to grant access to.

'agent'
Source code in cli/viv_cli/main.py
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
@typechecked
def grant_ssh_access(
    self, run_id: int, ssh_public_key_or_key_path: str, user: SSHUser = "agent"
) -> None:
    """Grant SSH access to a run.

    Allow the person with the SSH private key matching the given public key to SSH into the run
    as the given user.

    Args:
        run_id: ID of the run to grant access to.
        ssh_public_key_or_key_path: SSH public key or path to a file containing the public key.
        user: User to grant access to.
    """
    viv_api.grant_ssh_access_to_run(
        run_id, resolve_ssh_public_key(ssh_public_key_or_key_path), user
    )

kill(run_id)

Kill a run.

Source code in cli/viv_cli/main.py
1096
1097
1098
1099
@typechecked
def kill(self, run_id: int) -> None:
    """Kill a run."""
    viv_api.kill_run(run_id)

print_git_details(path='.', dont_commit_new_changes=False)

Print the git details for the current directory and optionally push the latest commit.

Source code in cli/viv_cli/main.py
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
@typechecked
def print_git_details(self, path: str = ".", dont_commit_new_changes: bool = False) -> None:
    """Print the git details for the current directory and optionally push the latest commit."""
    cwd = os.curdir
    try:
        os.chdir(path)
        _assert_current_directory_is_repo_in_org()

        if dont_commit_new_changes:
            _, repo = gh.get_org_and_repo()

            branch = gh.get_branch() or err_exit(
                "Error: can't start run from detached head (must be on branch)"
            )
            commit = gh.get_latest_commit_id()
            execute(f"git push -u origin {branch}", error_out=True, log=True)
        else:
            gh.ask_pull_repo_or_exit()
            org, repo = gh.get_org_and_repo()
            branch, commit, _link = gh.create_working_tree_permalink(org=org, repo=repo)

        print(f"--repo '{repo}' --branch '{branch}' --commit '{commit}'")
    except AssertionError as e:
        err_exit(str(e))
    finally:
        os.chdir(cwd)

query(query=None, output_format='jsonl', output=None)

Query vivaria database.

Parameters:

Name Type Description Default
query str | None

The query to execute, or the path to a query. If not provided, runs the default query.

None
output_format Literal['csv', 'json', 'jsonl']

The format to output the runs in. Either "csv" or "json".

'jsonl'
output str | Path | None

The path to a file to output the runs to. If not provided, prints to stdout.

None
Source code in cli/viv_cli/main.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
@typechecked
def query(
    self,
    query: str | None = None,
    output_format: Literal["csv", "json", "jsonl"] = "jsonl",
    output: str | pathlib.Path | None = None,
) -> None:
    """Query vivaria database.

    Args:
        query: The query to execute, or the path to a query. If not provided, runs the default
            query.
        output_format: The format to output the runs in. Either "csv" or "json".
        output: The path to a file to output the runs to. If not provided, prints to stdout.
    """
    if query is not None:
        query_file = pathlib.Path(query).expanduser()
        if query_file.exists():
            with query_file.open() as file:
                query = file.read()

    runs = viv_api.query_runs(query).get("rows", [])

    if output is not None:
        output_file = pathlib.Path(output).expanduser()
        output_file.parent.mkdir(parents=True, exist_ok=True)
    else:
        output_file = None

    with contextlib.nullcontext(sys.stdout) if output_file is None else output_file.open(
        "w"
    ) as file:
        if output_format == "csv":
            if not runs:
                return
            writer = csv.DictWriter(file, fieldnames=runs[0].keys(), lineterminator="\n")
            writer.writeheader()
            for run in runs:
                writer.writerow(run)
        elif output_format == "json":
            json.dump(runs, file, indent=2)
        else:
            for run in runs:
                file.write(json.dumps(run) + "\n")

register_ssh_public_key(ssh_public_key_path)

Register your SSH public key.

This id done, so that you can use viv ssh and viv scp on agent containers you create.

Source code in cli/viv_cli/main.py
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
@typechecked
def register_ssh_public_key(self, ssh_public_key_path: str) -> None:
    """Register your SSH public key.

    This id done, so that you can use viv ssh and viv scp on agent containers you create.
    """
    if not ssh_public_key_path.endswith(".pub"):
        err_exit(
            f'Exiting because the path {ssh_public_key_path} does not end with ".pub". '
            "Please confirm that the file contains a public key, then rename it so "
            'it ends in ".pub".'
        )

    try:
        with pathlib.Path(ssh_public_key_path).expanduser().open() as f:
            ssh_public_key = f.read().strip()
    except FileNotFoundError:
        err_exit(f"File {ssh_public_key_path} not found")

    viv_api.register_ssh_public_key(ssh_public_key)

    private_key_path = (
        pathlib.Path(ssh_public_key_path.removesuffix(".pub")).expanduser().resolve()
    )
    if not private_key_path.exists():
        print(
            "WARNING: You must have a private key file corresponding to that public key locally"
            f" named {private_key_path} to access your runs."
        )
        return

    set_user_config({"sshPrivateKeyPath": str(private_key_path)})

    print(
        "Successfully registered your SSH public key and wrote the path to your private key to"
        " viv config.\nThis applies to new runs you create, and doesn't allow you to ssh into "
        "old runs."
    )

run(task, path=None, yes=False, verbose=False, open_browser=False, max_tokens=300000, max_actions=1000, max_total_seconds=60 * 60 * 24 * 7, max_cost=100, checkpoint_tokens=None, checkpoint_actions=None, checkpoint_total_seconds=None, checkpoint_cost=None, intervention=False, agent_starting_state=None, agent_starting_state_file=None, agent_settings_override=None, agent_settings_pack=None, name=None, metadata={}, repo=None, branch=None, commit=None, priority=None, low_priority=None, parent=None, batch_name=None, batch_concurrency_limit=None, dangerously_ignore_global_limits=False, keep_task_environment_running=False, agent_path=None, task_family_path=None, env_file_path=None, k8s=None, task_repo=None)

Construct a task environment and run an agent in it.

You can either run this command from a clone of an agent repo on your computer, or you can specify the repo, branch, and commit to use.

Parameters:

Name Type Description Default
task str

The task to run. Specified as taskId@ref, with the ref defaulting to origin/main. The ref can be a branch, tag, or commit.

required
path str | None

The path to the git repo containing the agent code. Defaults to the current directory. Should not be specified if the repo, branch, and commit arguments, or the agent_path argument, are specified instead.

None
yes bool

Whether to skip the confirmation prompt before starting the agent.

False
verbose bool

Whether to print verbose output.

False
open_browser bool

Whether to open the agent run page in the default browser.

False
max_tokens int

The maximum number of tokens the agent can use.

300000
max_actions int

The maximum number of actions the agent can take.

1000
max_total_seconds int

The maximum number of seconds the agent can run for.

60 * 60 * 24 * 7
max_cost float

The maximum cost of the tokens the agent can use. The currency depends on the Vivaria installation you're using.

100
checkpoint_tokens int | None

If provided, the agent will pause and wait for human input after using this many tokens.

None
checkpoint_actions int | None

If provided, the agent will pause and wait for human input after taking this many actions.

None
checkpoint_total_seconds int | None

If provided, the agent will pause and wait for human input after running for this many seconds.

None
checkpoint_cost float | None

If provided, the agent will pause and wait for human input after spending this much on tokens. The currency depends on the Vivaria installation you're using.

None
intervention bool

Whether the agent requires human intervention.

False
agent_starting_state str | dict | None

The starting state of the agent, as a JSON string.

None
agent_starting_state_file str | None

The path to a file containing the starting state of the agent.

None
agent_settings_override str | dict | None

The agent settings to override, as a JSON string.

None
agent_settings_pack str | None

The agent settings pack to use.

None
name str | None

The name of the agent run.

None
metadata dict[str, str]

Metadata to attach to the agent run.

{}
repo str | None

The git repo containing the agent code.

None
branch str | None

The branch of the git repo containing the agent code.

None
commit str | None

The commit of the git repo containing the agent code.

None
priority Literal['low', 'high'] | None

The priority of the agent run. Can be low or high. Use low priority for batches of runs. Use high priority for single runs, if you want the run to start quickly and labs not to rate-limit the agent as often.

None
low_priority bool | None

Deprecated. Use --priority instead. Whether to run the agent in low priority mode.

None
parent int | None

The ID of the parent run.

None
batch_name str | None

The name of the batch to run the agent in.

None
batch_concurrency_limit int | None

The maximum number of agents that can run in the batch at the same time.

None
dangerously_ignore_global_limits bool

A flag to allow arbitrarily high values for max_tokens, max_actions, and max_total_seconds.

False
keep_task_environment_running bool

A flag to keep the task environment running if the agent or task crashes. Can still be killed by user.

False
agent_path str | None

Optionally specify a path to an agent folder rather than using the content of a git repo

None
task_family_path str | None

Path to a task family directory to use. If not provided, Vivaria may look up the task family directory from a Git repo that it's configured to use.

None
env_file_path str | None

Path to a file of environment variables that Vivaria will set in some TaskFamily methods. You can only provide this argument if you also provide task_family_path. If neither task_family_path nor env_file_path is provided, Vivaria will read environment variables from a file called secrets.env in a Git repo that Vivaria is configured to use.

None
k8s bool | None

Run the agent in a Kubernetes cluster.

None
task_repo str | None

Optionally specify the task repository. Should include the owner name, e.g. METR/mp4-tasks.

None
Source code in cli/viv_cli/main.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
@typechecked
def run(  # noqa: PLR0912, PLR0913, C901
    self,
    task: str,
    path: str | None = None,
    yes: bool = False,
    verbose: bool = False,
    open_browser: bool = False,
    max_tokens: int = 300_000,
    max_actions: int = 1_000,
    max_total_seconds: int = 60 * 60 * 24 * 7,
    max_cost: float = 100,
    checkpoint_tokens: int | None = None,
    checkpoint_actions: int | None = None,
    checkpoint_total_seconds: int | None = None,
    checkpoint_cost: float | None = None,
    intervention: bool = False,
    agent_starting_state: str | dict | None = None,
    agent_starting_state_file: str | None = None,
    agent_settings_override: str | dict | None = None,
    agent_settings_pack: str | None = None,
    name: str | None = None,
    metadata: dict[str, str] = {},  # noqa: B006
    repo: str | None = None,
    branch: str | None = None,
    commit: str | None = None,
    priority: Literal["low", "high"] | None = None,
    low_priority: bool | None = None,
    parent: int | None = None,
    batch_name: str | None = None,
    batch_concurrency_limit: int | None = None,
    dangerously_ignore_global_limits: bool = False,
    keep_task_environment_running: bool = False,
    agent_path: str | None = None,
    task_family_path: str | None = None,
    env_file_path: str | None = None,
    k8s: bool | None = None,
    task_repo: str | None = None,
) -> None:
    """Construct a task environment and run an agent in it.

    You can either run this command from a clone of an agent repo on your computer, or you can
    specify the repo, branch, and commit to use.

    Args:
        task: The task to run. Specified as `taskId@ref`, with the ref defaulting to
            `origin/main`. The ref can be a branch, tag, or commit.
        path: The path to the git repo containing the agent code. Defaults to the current
            directory. Should not be specified if the `repo`, `branch`, and `commit` arguments,
            or the `agent_path` argument, are specified instead.
        yes: Whether to skip the confirmation prompt before starting the agent.
        verbose: Whether to print verbose output.
        open_browser: Whether to open the agent run page in the default browser.
        max_tokens: The maximum number of tokens the agent can use.
        max_actions: The maximum number of actions the agent can take.
        max_total_seconds: The maximum number of seconds the agent can run for.
        max_cost: The maximum cost of the tokens the agent can use. The currency depends on the
            Vivaria installation you're using.
        checkpoint_tokens: If provided, the agent will pause and wait for human input
            after using this many tokens.
        checkpoint_actions: If provided, the agent will pause and wait for human input
            after taking this many actions.
        checkpoint_total_seconds: If provided, the agent will pause and wait for human input
            after running for this many seconds.
        checkpoint_cost: If provided, the agent will pause and wait for human input
            after spending this much on tokens. The currency depends on the Vivaria installation
            you're using.
        intervention: Whether the agent requires human intervention.
        agent_starting_state: The starting state of the agent, as a JSON string.
        agent_starting_state_file: The path to a file containing the starting state of the
            agent.
        agent_settings_override: The agent settings to override, as a JSON string.
        agent_settings_pack: The agent settings pack to use.
        name: The name of the agent run.
        metadata: Metadata to attach to the agent run.
        repo: The git repo containing the agent code.
        branch: The branch of the git repo containing the agent code.
        commit: The commit of the git repo containing the agent code.
        priority: The priority of the agent run. Can be low or high. Use low priority for
            batches of runs. Use high priority for single runs, if you want the run to start
            quickly and labs not to rate-limit the agent as often.
        low_priority: Deprecated. Use --priority instead. Whether to run the agent in low
            priority mode.
        parent: The ID of the parent run.
        batch_name: The name of the batch to run the agent in.
        batch_concurrency_limit: The maximum number of agents that can run in the batch at the
            same time.
        dangerously_ignore_global_limits: A flag to allow arbitrarily high
            values for max_tokens, max_actions, and max_total_seconds.
        keep_task_environment_running: A flag to keep the task environment running if the agent
            or task crashes. Can still be killed by user.
        agent_path: Optionally specify a path to an agent folder rather than
            using the content of a git repo
        task_family_path: Path to a task family directory to use. If not provided, Vivaria may
            look up the task family directory from a Git repo that it's configured to use.
        env_file_path: Path to a file of environment variables that Vivaria will set in some
            TaskFamily methods. You can only provide this argument if you also provide
            task_family_path. If neither task_family_path nor env_file_path is provided,
            Vivaria will read environment variables from a file called secrets.env in a Git repo
            that Vivaria is configured to use.
        k8s: Run the agent in a Kubernetes cluster.
        task_repo: Optionally specify the task repository. Should include the owner name,
            e.g. METR/mp4-tasks.
    """
    # Set global options
    GlobalOptions.yes_mode = yes
    GlobalOptions.verbose = verbose

    if task_family_path is None and env_file_path is not None:
        err_exit("env_file_path cannot be provided without task_family_path")
    if priority is not None and low_priority is not None:
        err_exit("cannot specify both priority and low_priority")

    uploaded_agent_path = None
    if agent_path is not None:
        if repo is not None or branch is not None or commit is not None or path is not None:
            err_exit("Either specify agent_path or git details but not both.")
        uploaded_agent_path = viv_api.upload_folder(pathlib.Path(agent_path).expanduser())
    elif repo is None:
        cwd = os.path.curdir
        try:
            os.chdir(path if path is not None else ".")
            _assert_current_directory_is_repo_in_org()
            gh.ask_pull_repo_or_exit()
            org, repo = gh.get_org_and_repo()
            branch, commit, link = gh.create_working_tree_permalink(org=org, repo=repo)
            print_if_verbose(link)
            print_if_verbose("Requesting agent run on server")
        except AssertionError as e:
            err_exit(str(e))
        finally:
            os.chdir(cwd)

    if agent_starting_state is not None and agent_starting_state_file is not None:
        err_exit("Cannot specify both agent starting state and agent starting state file")

    agent_starting_state = agent_starting_state or agent_starting_state_file

    starting_state = _get_input_json(agent_starting_state, "agent starting state")
    settings_override = _get_input_json(agent_settings_override, "agent settings override")

    task_parts = task.split("@")
    task_id = task_parts[0]
    task_branch = task_parts[1] if len(task_parts) > 1 else "main"

    if batch_concurrency_limit is not None:
        if batch_name is None:
            err_exit("To use --batch-concurrency-limit, you must also specify --batch-name")
        if batch_concurrency_limit < 0:
            err_exit("--batch-concurrency-limit must not be negative")

    if task_family_path is not None:
        task_source: viv_api.TaskSource = viv_api.upload_task_family(
            task_family_path=pathlib.Path(task_family_path).expanduser(),
            env_file_path=pathlib.Path(env_file_path).expanduser()
            if env_file_path is not None
            else None,
        )
    else:
        task_source = viv_api.GitRepoTaskSource(
            type="gitRepo",
            repoName=task_repo or get_user_config().tasksRepoSlug,
            commitId=None,
        )

    if priority is None and low_priority is not None:
        priority = "low" if low_priority else "high"

    viv_api.setup_and_run_agent(
        {
            "agentRepoName": repo,
            "agentBranch": branch,
            "agentCommitId": commit,
            "uploadedAgentPath": uploaded_agent_path,
            "taskId": task_id,
            "taskBranch": task_branch,
            "name": name,
            "metadata": metadata,
            "usageLimits": {
                "tokens": max_tokens,
                "actions": max_actions,
                "total_seconds": max_total_seconds,
                "cost": max_cost,
            },
            "checkpoint": {
                "tokens": checkpoint_tokens,
                "actions": checkpoint_actions,
                "total_seconds": checkpoint_total_seconds,
                "cost": checkpoint_cost,
            },
            "requiresHumanIntervention": intervention,
            "agentStartingState": starting_state,
            "agentSettingsOverride": settings_override,
            "agentSettingsPack": agent_settings_pack,
            "priority": priority,
            # TODO: Stop sending isLowPriority once Vivaria instances stop expecting it.
            "isLowPriority": priority != "high",
            "parentRunId": parent,
            "batchName": str(batch_name) if batch_name is not None else None,
            "batchConcurrencyLimit": batch_concurrency_limit,
            "dangerouslyIgnoreGlobalLimits": dangerously_ignore_global_limits,
            "keepTaskEnvironmentRunning": keep_task_environment_running,
            "taskSource": task_source,
            "isK8s": k8s,
        },
        verbose=verbose,
        open_browser=open_browser,
    )

score(run_id, submission)

Score a run.

Run TaskFamily#score in a run's agent container, using a submission passed on the command line.

Source code in cli/viv_cli/main.py
894
895
896
897
898
899
900
901
@typechecked
def score(self, run_id: int, submission: str | float | dict) -> None:
    """Score a run.

    Run `TaskFamily#score` in a run's agent container, using a submission passed on the command
    line.
    """
    viv_api.score_run(run_id, parse_submission(submission))

scp(source, destination, recursive=False, user='root', aux_vm=False)

SCP.

Use scp to copy a file from your local machine to the agent container/aux VM for a run ID, or vice versa.

For agent container: Starts the agent container if necessary and SSHes into the agent container as the given user, defaulting to root.

For aux VM: Uses the provisioned aux VM user.

Example

viv scp path/to/local/file 12345:/root/path/to/remote/file viv scp 67890:~/path/to/remote/file . --user=agent

Parameters:

Name Type Description Default
source str

Source file path.

required
destination str

Destination file path.

required
recursive bool

Whether to copy source recursively.

False
user SSHUser

User to SSH into the agent container as.

'root'
aux_vm bool

Whether to SCP to the aux VM.

False

Raises:

Type Description
ValueError

If both source and destination are local or remote paths.

Source code in cli/viv_cli/main.py
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
@typechecked
def scp(
    self,
    source: str,
    destination: str,
    recursive: bool = False,
    user: SSHUser = "root",
    aux_vm: bool = False,
) -> None:
    """SCP.

    Use scp to copy a file from your local machine to the agent container/aux VM for a run ID,
    or vice versa.

    For agent container: Starts the agent container if necessary and SSHes into the agent
    container as the given user, defaulting to root.

    For aux VM: Uses the provisioned aux VM user.

    Example:
        viv scp path/to/local/file 12345:/root/path/to/remote/file
        viv scp 67890:~/path/to/remote/file . --user=agent

    Args:
        source: Source file path.
        destination: Destination file path.
        recursive: Whether to copy source recursively.
        user: User to SSH into the agent container as.
        aux_vm: Whether to SCP to the aux VM.

    Raises:
        ValueError: If both source and destination are local or remote paths.
    """
    source_split = source.split(":")
    destination_split = destination.split(":")
    if (len(source_split) == 1) == (len(destination_split) == 1):
        error_message = (
            "Exactly one of the source and destination must start with a run ID"
            ", e.g. 12345:/root/path/to/remote/file"
        )
        err_exit(error_message)

    def parse_run_id(val: str) -> int:
        try:
            return int(val)
        except (TypeError, ValueError):
            err_exit(f"Invalid run ID {val}")

    if len(source_split) == 1:
        run_id = parse_run_id(destination_split[0])
    elif len(destination_split) == 1:
        run_id = parse_run_id(source_split[0])
    else:
        error_message = "How did we get here?"
        raise ValueError(error_message)

    if aux_vm:
        aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
        with _temp_key_file(aux_vm_details) as f:
            opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
            self._ssh.scp(source, destination, opts, recursive=recursive)
    else:
        viv_api.start_agent_container(run_id)
        ip_address = viv_api.get_agent_container_ip_address(run_id)
        opts = _container_ssh_opts(ip_address, user)
        self._ssh.scp(source, destination, recursive=recursive, opts=opts)

ssh(run_id, user='root', aux_vm=False)

SSH into the agent container or aux VM for a run ID.

For agent containers: Starts the agent container if necessary and uses the given user (defaulting to root).

For aux VMs: Uses the provisioned aux VM user.

Source code in cli/viv_cli/main.py
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
@typechecked
def ssh(self, run_id: int, user: SSHUser = "root", aux_vm: bool = False) -> None:
    """SSH into the agent container or aux VM for a run ID.

    For agent containers: Starts the agent container if necessary and uses the given user
    (defaulting to root).

    For aux VMs: Uses the provisioned aux VM user.
    """
    if aux_vm:
        aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
        with _temp_key_file(aux_vm_details) as f:
            opts = _aux_vm_ssh_opts(f.name, aux_vm_details)
            self._ssh.ssh(opts)
    else:
        viv_api.start_agent_container(run_id)
        ip_address = viv_api.get_agent_container_ip_address(run_id)
        env = viv_api.get_env_for_run(run_id, user)
        opts = _container_ssh_opts(ip_address, user, env)
        self._ssh.ssh(opts)

ssh_command(run_id, user='agent', aux_vm=False)

Print a ssh command to connect to an agent container as the given user, or to an aux VM.

For agent container: Fails if the agent container has been stopped.

For aux VM: Uses the provisioned user on the aux VM.

Source code in cli/viv_cli/main.py
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
@typechecked
def ssh_command(self, run_id: int, user: SSHUser = "agent", aux_vm: bool = False) -> None:
    """Print a ssh command to connect to an agent container as the given user, or to an aux VM.

    For agent container: Fails if the agent container has been stopped.

    For aux VM: Uses the provisioned user on the aux VM.
    """
    if aux_vm:
        # We can't use the `with` form here because the user will likely want to access the file
        # after this function returns.
        aux_vm_details = viv_api.get_aux_vm_details(run_id=run_id)
        f = _temp_key_file(aux_vm_details)
        args = self._ssh.ssh_args(_aux_vm_ssh_opts(f.name, aux_vm_details))
    else:
        ip_address = viv_api.get_agent_container_ip_address(run_id)
        opts = _container_ssh_opts(ip_address, user)
        args = self._ssh.ssh_args(opts)

    print(" ".join(args))

unkill(run_id, branch_number=0)

Unkill a run.

Source code in cli/viv_cli/main.py
1101
1102
1103
1104
@typechecked
def unkill(self, run_id: int, branch_number: int = 0) -> None:
    """Unkill a run."""
    viv_api.unkill_branch(run_id, branch_number)

upgrade()

Upgrade the CLI.

Source code in cli/viv_cli/main.py
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
@typechecked
def upgrade(self) -> None:
    """Upgrade the CLI."""
    execute(
        (
            f"pip install --force-reinstall --exists-action=w "
            f"git+{get_user_config().mp4RepoUrl}@main#egg=viv-cli&subdirectory=cli"
        ),
        log=True,
        error_out=True,
    )

main()

Main entry point for the CLI.

Source code in cli/viv_cli/main.py
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
def main() -> None:
    """Main entry point for the CLI."""
    _move_old_config_files()

    # We can't use get_user_config here because the user's config might be invalid.
    config = get_user_config_dict()

    # TODO: improve type hints if Sentry releases their types
    def sentry_before_send(event: Any, hint: Any) -> Any:  # noqa: ANN401
        if "exc_info" in hint:
            _, exc_value, _ = hint["exc_info"]
            if isinstance(exc_value, KeyboardInterrupt):
                return None
        return event

    sentry_sdk.init(
        dsn=config.get("sentryDsn"),
        # Enable performance monitoring
        enable_tracing=True,
        traces_sample_rate=1.0,
        profiles_sample_rate=1.0,
        before_send=sentry_before_send,
    )
    sentry_sdk.set_tag("api_url", config.get("apiUrl"))
    try:
        fire.Fire(Vivaria)
    except TypeCheckError as e:
        err_exit(str(e))