Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions deeplabcut/gui/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#
import json
import re
import shutil
import sys
import urllib.request
from collections.abc import Callable

Expand Down Expand Up @@ -234,3 +236,57 @@ def _is_up_to_date(installed: str, latest: str) -> bool:
except InvalidVersion:
return installed == latest
return installed == latest


def package_specs_for_update(packages: list[str]) -> list[str]:
"""Return package specs to install for GUI updates.

DeepLabCut itself should be updated with the GUI extra so GUI dependencies
are kept in sync.
"""
specs = []

for package in packages:
normalized = package.strip()

if normalized == "deeplabcut":
specs.append("deeplabcut[gui]")
else:
specs.append(normalized)

return specs


def build_update_commands(packages: list[str]) -> list[tuple[str, str, list[str]]]:
"""Build installer commands, ordered from preferred to fallback.

Returns tuples of:

(backend_name, program, arguments)

The order is:
1. uv, if available
2. current-interpreter pip fallback
"""
Comment on lines +241 to +270
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nitpick:

These functions seem designed as a generic public function for building normalized package specs install commands, but in practice they only work for very specific input (e.g. case-sensitive exact match for "deeplabcut"). Also they are specifically intended for the auto-update flow, not general GUI utilities. Maybe make these functions private or document this somehow, e.g.:

Suggested change
def package_specs_for_update(packages: list[str]) -> list[str]:
"""Return package specs to install for GUI updates.
DeepLabCut itself should be updated with the GUI extra so GUI dependencies
are kept in sync.
"""
specs = []
for package in packages:
normalized = package.strip()
if normalized == "deeplabcut":
specs.append("deeplabcut[gui]")
else:
specs.append(normalized)
return specs
def build_update_commands(packages: list[str]) -> list[tuple[str, str, list[str]]]:
"""Build installer commands, ordered from preferred to fallback.
Returns tuples of:
(backend_name, program, arguments)
The order is:
1. uv, if available
2. current-interpreter pip fallback
"""
DLCPackage: TypeAlias = Literal["deeplabcut", "napari-deeplabcut"]
def _package_specs_for_update(packages: list[DLCPackage]) -> list[str]:
"""Return package specs to install for GUI updates.
DeepLabCut itself should be updated with the GUI extra so GUI dependencies
are kept in sync.
"""
specs = []
for package in packages:
normalized = package.strip()
if normalized == "deeplabcut":
specs.append("deeplabcut[gui]")
else:
specs.append(normalized)
return specs
def _build_update_commands(packages: list[DLCPackage]) -> list[tuple[str, str, list[str]]]:
"""Build installer commands, ordered from preferred to fallback.
Returns tuples of:
(backend_name, program, arguments)
The order is:
1. uv, if available
2. current-interpreter pip fallback
"""

specs = package_specs_for_update(packages)
commands = []

uv = shutil.which("uv")
if uv:
commands.append(
(
"uv",
uv,
["pip", "install", "--python", sys.executable, "-U", *specs],
)
)

commands.append(
(
"pip",
sys.executable,
["-m", "pip", "install", "-U", *specs],
)
)

return commands
170 changes: 136 additions & 34 deletions deeplabcut/gui/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
UnsupervizedIdTracking,
VideoEditor,
)
from deeplabcut.gui.utils import UpdateChecker
from deeplabcut.gui.utils import UpdateChecker, build_update_commands
from deeplabcut.gui.widgets import StreamReceiver, StreamWriter

warnings.filterwarnings(
Expand Down Expand Up @@ -98,6 +98,9 @@ def __init__(self, app):
# Update checks
self._update_process = None
self._update_process_output = []
self._update_commands = []
self._update_attempt_outputs = []
self._current_update_backend = None

self._scheduled_update_check_silent = True
self._update_check_timer = QtCore.QTimer(self)
Expand Down Expand Up @@ -309,6 +312,20 @@ def video_type(self, ext):
def video_files(self):
return self.files

def _disconnect_update_process(self, process):
if process is None:
return

for signal, slot in (
(process.finished, self._on_update_process_finished),
(process.errorOccurred, self._on_update_process_error),
(process.readyRead, self._drain_update_process_output),
):
try:
signal.disconnect(slot)
except (TypeError, RuntimeError):
pass

def check_for_updates(self, *, silent=True, delay_ms=0):
"""Start an update check immediately or schedule it after a delay."""
if self._closing:
Expand All @@ -330,6 +347,29 @@ def _trigger_scheduled_update_check(self):
return
self._updater.check(silent=self._scheduled_update_check_silent)

def _start_next_update_command(self):
"""Start the next update backend. Return True if one was started."""
if not self._update_commands:
return False

backend_name, program, arguments = self._update_commands.pop(0)
self._current_update_backend = backend_name

self.status_bar.showMessage(f"Installing updates with {backend_name}...")

self._update_process = QtCore.QProcess(self)
self._update_process.setProgram(program)
self._update_process.setArguments(arguments)

self._update_process.finished.connect(self._on_update_process_finished)
self._update_process.errorOccurred.connect(self._on_update_process_error)
self._update_process.readyRead.connect(self._drain_update_process_output)
self._update_process.setProcessChannelMode(QtCore.QProcess.MergedChannels)
self._update_process.start()

self.logger.info("Starting update command: %s %s", program, " ".join(arguments))
return True

def _run_update_command(self, packages):
if not packages:
return
Expand All @@ -341,21 +381,31 @@ def _run_update_command(self, packages):
self.status_bar.showMessage("Installing updates...")

self._update_process_output = []
self._update_process = QtCore.QProcess(self)
self._update_process.setProgram(sys.executable)
self._update_process.setArguments(["-m", "pip", "install", "-U", *packages])
self._update_attempt_outputs = []
self._update_commands = build_update_commands(packages)
self._current_update_backend = None

self._update_process.finished.connect(self._on_update_process_finished)
self._update_process.errorOccurred.connect(self._on_update_process_error)
self._update_process.readyRead.connect(self._drain_update_process_output)
self._update_process.setProcessChannelMode(QtCore.QProcess.MergedChannels)
self._update_process.start()
if not self._start_next_update_command():
self._cleanup_update_process()
QtWidgets.QMessageBox.warning(
self,
"Update failed",
"No available installer backend was found.",
)

Comment on lines +388 to +395
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_update_commands unconditionally appends the pip fallback so the list if never empty. This means that this code is unreachable, correct?

def _drain_update_process_output(self, process=None):

if process is None:
sender = self.sender()
process = sender if sender is not None else self._update_process

if process is not self._update_process:
return

def _drain_update_process_output(self):
if self._update_process is None:
if process is None:
return

Comment on lines +402 to 407
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second condition is only met when (simulataneous) self._update_process is None AND process is None. Consider changing to:

Suggested change
if process is not self._update_process:
return
def _drain_update_process_output(self):
if self._update_process is None:
if process is None:
return
if process is None or process is not self._update_process:
return

data = bytes(self._update_process.readAll())
data = bytes(process.readAll())
if not data:
return

Expand All @@ -371,62 +421,114 @@ def _drain_update_process_output(self):

def _cleanup_update_process(self):
if self._update_process is not None:
self._disconnect_update_process(self._update_process)
self._update_process.deleteLater()
self._update_process = None

self._update_process_output = []
self._update_commands = []
self._update_attempt_outputs = []
self._current_update_backend = None

self._progress_bar.hide()
self.status_bar.showMessage("www.deeplabcut.org")

def _on_update_process_error(self, error):

process = self.sender()
if process is not self._update_process:
return

if self._closing:
self._cleanup_update_process()
return

backend = self._current_update_backend or "installer"

error_strings = {
QtCore.QProcess.FailedToStart: (
"Process failed to start. Check that pip is available and you have sufficient permissions."
),
QtCore.QProcess.Crashed: "Update process crashed unexpectedly.",
QtCore.QProcess.Timedout: "Update process timed out.",
QtCore.QProcess.WriteError: "Could not write to update process.",
QtCore.QProcess.ReadError: "Could not read from update process.",
QtCore.QProcess.UnknownError: "An unknown error occurred.",
QtCore.QProcess.FailedToStart: f"The {backend} update process failed to start.",
QtCore.QProcess.Crashed: f"The {backend} update process crashed unexpectedly.",
QtCore.QProcess.Timedout: f"The {backend} update process timed out.",
QtCore.QProcess.WriteError: f"Could not write to the {backend} update process.",
QtCore.QProcess.ReadError: f"Could not read from the {backend} update process.",
QtCore.QProcess.UnknownError: f"An unknown {backend} update error occurred.",
}
message = error_strings.get(error, "An unknown error occurred.")
QtWidgets.QMessageBox.warning(self, "Update failed", message)

if process is not None:
self._drain_update_process_output(process)

output = "".join(self._update_process_output).strip()
message = error_strings.get(error, f"An unknown {backend} update error occurred.")
failed_output = f"{message}\n\n{output}" if output else message
self.logger.warning(failed_output)
self._update_attempt_outputs.append(failed_output)

self._disconnect_update_process(process)
self._update_process.deleteLater()
Comment on lines +466 to +467
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this only works because the guard (process is not self._update_processreturn). Safer is process.deleteLater() like in _on_update_process_finished:

Suggested change
self._disconnect_update_process(process)
self._update_process.deleteLater()
self._disconnect_update_process(process)
process.deleteLater()

self._update_process = None
Comment thread
C-Achard marked this conversation as resolved.
self._update_process_output = []

if self._start_next_update_command():
return

QtWidgets.QMessageBox.warning(
self,
"Update failed",
"No update backend completed successfully.\n\n" + "\n\n---\n\n".join(self._update_attempt_outputs),
)
self._cleanup_update_process()

def _on_update_process_finished(self, exit_code, exit_status):
process = self.sender()
if process is not self._update_process:
return

if self._closing:
self._cleanup_update_process()
return

if self._update_process is None:
if process is None:
return
Comment on lines +482 to 491
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
process = self.sender()
if process is not self._update_process:
return
if self._closing:
self._cleanup_update_process()
return
if self._update_process is None:
if process is None:
return
process = self.sender()
if process is None or process is not self._update_process:
return
if self._closing:
self._cleanup_update_process()
return


self._progress_bar.hide()
self._drain_update_process_output()
self._drain_update_process_output(process)

output = "".join(self._update_process_output).strip()
backend = self._current_update_backend or "installer"

if exit_status == QtCore.QProcess.NormalExit and exit_code == 0:
self._progress_bar.hide()

QtWidgets.QMessageBox.information(
self,
"Update complete",
"The update completed successfully.\n\nPlease restart DeepLabCut to use the updated packages.",
)

if output:
self.logger.info(output)
else:
QtWidgets.QMessageBox.warning(
self,
"Update failed",
"The update command did not complete successfully.\n\n"
f"{output or 'No additional output was captured.'}",
)
if output:
self.logger.error(output)

self._cleanup_update_process()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calls _progress_bar.hide() again. Maybe better to remove in one of both places to avoid confusion.

return

failed_output = (
f"{backend} failed with exit code {exit_code}.\n\n{output or 'No additional output was captured.'}"
)
self._update_attempt_outputs.append(failed_output)
self.logger.warning(failed_output)

self._disconnect_update_process(process)
process.deleteLater()
self._update_process = None
self._update_process_output = []

if self._start_next_update_command():
return

QtWidgets.QMessageBox.warning(
self,
"Update failed",
"No update backend completed successfully.\n\n" + "\n\n---\n\n".join(self._update_attempt_outputs),
)

self._cleanup_update_process()

Expand Down
Loading
Loading