Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
When neither `-n/--name` nor `-s/--server` is provided, the default server is
used automatically. `rsconnect login` sets the server as default unless
`--no-set-default` is passed. `CONNECT_SERVER` still takes precedence.
- New `environment` subcommand for managing execution environments on Connect.

## [1.29.0] - 2026-04-29

Expand Down
3 changes: 3 additions & 0 deletions docs/commands/environment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
::: mkdocs-click
:module: rsconnect.main
:command: environment
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nav:
- content: commands/content.md
- deploy: commands/deploy.md
- details: commands/details.md
- environment: commands/environment.md
- info: commands/info.md
- list: commands/list.md
- login: commands/login.md
Expand Down
160 changes: 160 additions & 0 deletions rsconnect/actions_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Public API for managing execution environments on Posit Connect.
"""

from __future__ import annotations

from typing import Optional, Union

from .api import RSConnectClient, RSConnectServer, SPCSConnectServer
from .models import (
EnvironmentCreateInput,
EnvironmentInstallation,
EnvironmentInstallations,
EnvironmentPermissionInput,
EnvironmentPermissionV1,
EnvironmentUpdateInput,
EnvironmentV1,
EnvironmentVolumeMount,
)


def list_environments(
connect_server: Union[RSConnectServer, SPCSConnectServer],
) -> list[EnvironmentV1]:
with RSConnectClient(connect_server) as client:
return client.environment_list()


def get_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
guid: str,
) -> EnvironmentV1:
with RSConnectClient(connect_server) as client:
return client.environment_get(guid)


def create_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
image: str,
title: Optional[str] = None,
description: Optional[str] = None,
matching: Optional[str] = None,
supervisor: Optional[str] = None,
python: Optional[list[EnvironmentInstallation]] = None,
quarto: Optional[list[EnvironmentInstallation]] = None,
r: Optional[list[EnvironmentInstallation]] = None,
tensorflow: Optional[list[EnvironmentInstallation]] = None,
volume_mounts: Optional[list[EnvironmentVolumeMount]] = None,
user_guids: Optional[list[str]] = None,
group_guids: Optional[list[str]] = None,
) -> EnvironmentV1:
body: EnvironmentCreateInput = {
"cluster_name": "Kubernetes",
"name": image,
}
if title is not None:
body["title"] = title
if description is not None:
body["description"] = description
if matching is not None:
body["matching"] = matching
if supervisor is not None:
body["supervisor"] = supervisor
if python is not None:
body["python"] = _make_installations(python)
if quarto is not None:
body["quarto"] = _make_installations(quarto)
if r is not None:
body["r"] = _make_installations(r)
if tensorflow is not None:
body["tensorflow"] = _make_installations(tensorflow)
if volume_mounts is not None:
body["volume_mounts"] = volume_mounts

with RSConnectClient(connect_server) as client:
result = client.environment_create(body)
if user_guids is not None or group_guids is not None:
_sync_permissions(client, result["guid"], user_guids, group_guids)
return client.environment_get(result["guid"])


def update_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
guid: str,
title: Optional[str] = None,
description: Optional[str] = None,
matching: Optional[str] = None,
supervisor: Optional[str] = None,
python: Optional[list[EnvironmentInstallation]] = None,
quarto: Optional[list[EnvironmentInstallation]] = None,
r: Optional[list[EnvironmentInstallation]] = None,
tensorflow: Optional[list[EnvironmentInstallation]] = None,
volume_mounts: Optional[list[EnvironmentVolumeMount]] = None,
user_guids: Optional[list[str]] = None,
group_guids: Optional[list[str]] = None,
) -> EnvironmentV1:
with RSConnectClient(connect_server) as client:
existing = client.environment_get(guid)

body: EnvironmentUpdateInput = {
"title": title if title is not None else existing["title"],
"description": description if description is not None else existing["description"],
"matching": matching if matching is not None else existing["matching"],
"supervisor": supervisor if supervisor is not None else existing["supervisor"],
"python": _make_installations(python) if python is not None else existing["python"],
"quarto": _make_installations(quarto) if quarto is not None else existing["quarto"],
"r": _make_installations(r) if r is not None else existing["r"],
"tensorflow": _make_installations(tensorflow) if tensorflow is not None else existing["tensorflow"],
"volume_mounts": volume_mounts if volume_mounts is not None else existing["volume_mounts"],
}

result = client.environment_update(guid, body)

if user_guids is not None or group_guids is not None:
_sync_permissions(client, guid, user_guids, group_guids)
return client.environment_get(guid)

return result


def delete_environment(
connect_server: Union[RSConnectServer, SPCSConnectServer],
guid: str,
) -> None:
with RSConnectClient(connect_server) as client:
client.environment_delete(guid)


def _make_installations(items: list[EnvironmentInstallation]) -> EnvironmentInstallations:
return {"installations": items}


def _sync_permissions(
client: RSConnectClient,
env_guid: str,
user_guids: Optional[list[str]],
group_guids: Optional[list[str]],
) -> list[EnvironmentPermissionV1]:
existing = client.environment_permission_list(env_guid)

desired_users = set(user_guids or [])
desired_groups = set(group_guids or [])

existing_users = {p["user_guid"]: p for p in existing if p["user_guid"] is not None}
existing_groups = {p["group_guid"]: p for p in existing if p["group_guid"] is not None}

results: list[EnvironmentPermissionV1] = []
for g in desired_users - set(existing_users.keys()):
body: EnvironmentPermissionInput = {"user_guid": g}
results.append(client.environment_permission_add(env_guid, body))
for g in desired_groups - set(existing_groups.keys()):
body = {"group_guid": g}
results.append(client.environment_permission_add(env_guid, body))

for g in set(existing_users.keys()) - desired_users:
client.environment_permission_delete(env_guid, existing_users[g]["guid"])
for g in set(existing_groups.keys()) - desired_groups:
client.environment_permission_delete(env_guid, existing_groups[g]["guid"])

return results
52 changes: 52 additions & 0 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@
ContentItemV1,
DeleteInputDTO,
DeleteOutputDTO,
EnvironmentCreateInput,
EnvironmentPermissionInput,
EnvironmentPermissionV1,
EnvironmentUpdateInput,
EnvironmentV1,
ListEntryOutputDTO,
PyInfo,
ServerSettings,
Expand Down Expand Up @@ -730,6 +735,53 @@ def system_caches_runtime_delete(self, target: DeleteInputDTO) -> DeleteOutputDT
response = self._server.handle_bad_response(response)
return response

def environment_list(self) -> list[EnvironmentV1]:
response = cast(Union[List[EnvironmentV1], HTTPResponse], self.get("v1/environments"))
response = self._server.handle_bad_response(response)
return response

def environment_get(self, guid: str) -> EnvironmentV1:
response = cast(Union[EnvironmentV1, HTTPResponse], self.get(f"v1/environments/{guid}"))
response = self._server.handle_bad_response(response)
return response

def environment_create(self, body: EnvironmentCreateInput) -> EnvironmentV1:
response = cast(Union[EnvironmentV1, HTTPResponse], self.post("v1/environments", body=body))
response = self._server.handle_bad_response(response)
return response

def environment_update(self, guid: str, body: EnvironmentUpdateInput) -> EnvironmentV1:
response = cast(Union[EnvironmentV1, HTTPResponse], self.put(f"v1/environments/{guid}", body=body))
response = self._server.handle_bad_response(response)
return response

def environment_delete(self, guid: str) -> None:
response = cast(HTTPResponse, self.delete(f"v1/environments/{guid}", decode_response=False))
self._server.handle_bad_response(response, is_httpresponse=True)

def environment_permission_list(self, env_guid: str) -> list[EnvironmentPermissionV1]:
response = cast(
Union[List[EnvironmentPermissionV1], HTTPResponse],
self.get(f"v1/environments/{env_guid}/permissions"),
)
response = self._server.handle_bad_response(response)
return response

def environment_permission_add(self, env_guid: str, body: EnvironmentPermissionInput) -> EnvironmentPermissionV1:
response = cast(
Union[EnvironmentPermissionV1, HTTPResponse],
self.post(f"v1/environments/{env_guid}/permissions", body=body),
)
response = self._server.handle_bad_response(response)
return response

def environment_permission_delete(self, env_guid: str, permission_guid: str) -> None:
response = cast(
HTTPResponse,
self.delete(f"v1/environments/{env_guid}/permissions/{permission_guid}", decode_response=False),
)
self._server.handle_bad_response(response, is_httpresponse=True)

def task_get(
self,
task_id: str,
Expand Down
Loading
Loading