-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
fix(webui): enhance password crypto method #7338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90b79d5
a0159c8
a3abb28
b749f62
a47e39d
5c27cf1
a64d4e7
fb1cb4e
58da2f1
2009901
e041f0e
6744329
eada50d
0c2f796
a18e2d4
bd7fc50
7c02af8
75f5f48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| """Utilities for dashboard password hashing and verification.""" | ||
|
|
||
| import hashlib | ||
| import hmac | ||
| import re | ||
| import secrets | ||
| import string | ||
|
|
||
| _PBKDF2_ITERATIONS = 600_000 | ||
| _PBKDF2_SALT_BYTES = 16 | ||
| _PBKDF2_ALGORITHM = "pbkdf2_sha256" | ||
| _PBKDF2_FORMAT = f"{_PBKDF2_ALGORITHM}$" | ||
| _LEGACY_MD5_LENGTH = 32 | ||
| _DASHBOARD_PASSWORD_MIN_LENGTH = 8 | ||
| _GENERATED_DASHBOARD_PASSWORD_LENGTH = 24 | ||
| DEFAULT_DASHBOARD_PASSWORD = "astrbot" | ||
|
|
||
|
|
||
| def generate_dashboard_password() -> str: | ||
| """Generate a strong dashboard password that satisfies the complexity policy.""" | ||
| alphabet = string.ascii_letters + string.digits | ||
| password_chars = [ | ||
| secrets.choice(string.ascii_uppercase), | ||
| secrets.choice(string.ascii_lowercase), | ||
| secrets.choice(string.digits), | ||
| *( | ||
| secrets.choice(alphabet) | ||
| for _ in range(_GENERATED_DASHBOARD_PASSWORD_LENGTH - 3) | ||
| ), | ||
| ] | ||
| secrets.SystemRandom().shuffle(password_chars) | ||
| return "".join(password_chars) | ||
|
|
||
|
|
||
| def hash_dashboard_password(raw_password: str) -> str: | ||
| """Return a salted hash for dashboard password using PBKDF2-HMAC-SHA256.""" | ||
| if not isinstance(raw_password, str) or raw_password == "": | ||
| raise ValueError("Password cannot be empty") | ||
|
|
||
| salt = secrets.token_hex(_PBKDF2_SALT_BYTES) | ||
| digest = hashlib.pbkdf2_hmac( | ||
| "sha256", | ||
| raw_password.encode("utf-8"), | ||
| bytes.fromhex(salt), | ||
| _PBKDF2_ITERATIONS, | ||
| ).hex() | ||
| return f"{_PBKDF2_FORMAT}{_PBKDF2_ITERATIONS}${salt}${digest}" | ||
|
|
||
|
|
||
| def hash_legacy_dashboard_password(raw_password: str) -> str: | ||
| """Return legacy MD5 hash for downgrade compatibility only.""" | ||
| if not isinstance(raw_password, str) or raw_password == "": | ||
| raise ValueError("Password cannot be empty") | ||
| return hashlib.md5(raw_password.encode("utf-8")).hexdigest() | ||
|
|
||
|
|
||
| def validate_dashboard_password(raw_password: str) -> None: | ||
| """Validate whether dashboard password meets the minimal complexity policy.""" | ||
| if not isinstance(raw_password, str) or raw_password == "": | ||
| raise ValueError("Password cannot be empty") | ||
| if len(raw_password) < _DASHBOARD_PASSWORD_MIN_LENGTH: | ||
| raise ValueError( | ||
| f"Password must be at least {_DASHBOARD_PASSWORD_MIN_LENGTH} characters long" | ||
| ) | ||
|
|
||
| if not re.search(r"[A-Z]", raw_password): | ||
| raise ValueError("Password must include at least one uppercase letter") | ||
| if not re.search(r"[a-z]", raw_password): | ||
| raise ValueError("Password must include at least one lowercase letter") | ||
| if not re.search(r"\d", raw_password): | ||
| raise ValueError("Password must include at least one digit") | ||
|
Soulter marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def _is_legacy_md5_hash(stored: str) -> bool: | ||
| return ( | ||
| isinstance(stored, str) | ||
| and len(stored) == _LEGACY_MD5_LENGTH | ||
| and all(c in "0123456789abcdefABCDEF" for c in stored) | ||
| ) | ||
|
|
||
|
|
||
| def _is_pbkdf2_hash(stored: str) -> bool: | ||
| return isinstance(stored, str) and stored.startswith(_PBKDF2_FORMAT) | ||
|
|
||
|
|
||
| def verify_dashboard_password(stored_hash: str, candidate_password: str) -> bool: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider refactoring
For example: from enum import Enum, auto
class _HashScheme(Enum):
LEGACY_MD5 = auto()
PBKDF2 = auto()
UNKNOWN = auto()
def _get_hash_scheme(stored_hash: str) -> _HashScheme:
if _is_legacy_md5_hash(stored_hash):
return _HashScheme.LEGACY_MD5
if _is_pbkdf2_hash(stored_hash):
return _HashScheme.PBKDF2
return _HashScheme.UNKNOWNThen split the verification logic into named functions so the compatibility behavior is explicit: def _verify_legacy_md5_hash(stored_hash: str, candidate_password: str) -> bool:
# Keep compatibility with existing md5-based deployments:
# new clients send plain password, old clients may send md5 of it.
candidate_md5 = hashlib.md5(candidate_password.encode("utf-8")).hexdigest()
stored = stored_hash.lower()
return (
hmac.compare_digest(stored, candidate_md5.lower())
or hmac.compare_digest(stored, candidate_password.lower())
)
def _verify_pbkdf2_hash(stored_hash: str, candidate_password: str) -> bool:
parts: list[str] = stored_hash.split("$")
if len(parts) != 4:
return False
_, iterations_s, salt, digest = parts
try:
iterations = int(iterations_s)
stored_key = bytes.fromhex(digest)
salt_bytes = bytes.fromhex(salt)
except (TypeError, ValueError):
return False
candidate_key = hashlib.pbkdf2_hmac(
"sha256",
candidate_password.encode("utf-8"),
salt_bytes,
iterations,
)
return hmac.compare_digest(stored_key, candidate_key)
def verify_dashboard_password(stored_hash: str, candidate_password: str) -> bool:
if not isinstance(stored_hash, str) or not isinstance(candidate_password, str):
return False
scheme = _get_hash_scheme(stored_hash)
if scheme is _HashScheme.LEGACY_MD5:
return _verify_legacy_md5_hash(stored_hash, candidate_password)
if scheme is _HashScheme.PBKDF2:
return _verify_pbkdf2_hash(stored_hash, candidate_password)
return FalseThis keeps all existing behavior (including the legacy MD5 dual comparison and default-password checks) but makes each piece self-contained and easier to reason about and test individually. |
||
| """Verify password against legacy md5 or new PBKDF2-SHA256 format.""" | ||
| if not isinstance(stored_hash, str) or not isinstance(candidate_password, str): | ||
| return False | ||
|
|
||
| if _is_legacy_md5_hash(stored_hash): | ||
| # Keep compatibility with existing MD5-based deployments while requiring | ||
| # the real plaintext password, not the stored MD5 value itself. | ||
| candidate_md5 = hashlib.md5(candidate_password.encode("utf-8")).hexdigest() | ||
| return hmac.compare_digest(stored_hash.lower(), candidate_md5.lower()) | ||
|
|
||
| if _is_pbkdf2_hash(stored_hash): | ||
| parts: list[str] = stored_hash.split("$") | ||
| if len(parts) != 4: | ||
| return False | ||
| _, iterations_s, salt, digest = parts | ||
| try: | ||
| iterations = int(iterations_s) | ||
| stored_key = bytes.fromhex(digest) | ||
| salt_bytes = bytes.fromhex(salt) | ||
| except (TypeError, ValueError): | ||
| return False | ||
| candidate_key = hashlib.pbkdf2_hmac( | ||
| "sha256", | ||
| candidate_password.encode("utf-8"), | ||
| salt_bytes, | ||
| iterations, | ||
| ) | ||
| return hmac.compare_digest(stored_key, candidate_key) | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| def is_default_dashboard_password(stored_hash: str) -> bool: | ||
| """Check whether the password still equals the built-in default value.""" | ||
| return verify_dashboard_password(stored_hash, DEFAULT_DASHBOARD_PASSWORD) | ||
|
|
||
|
|
||
| def is_legacy_dashboard_password(stored_hash: str) -> bool: | ||
| """Check whether the password is still stored with legacy MD5.""" | ||
| return _is_legacy_md5_hash(stored_hash) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| from astrbot.core.config.astrbot_config import AstrBotConfig | ||
| from astrbot.core.db import BaseDatabase | ||
| from astrbot.core.utils.auth_password import ( | ||
| hash_dashboard_password, | ||
| hash_legacy_dashboard_password, | ||
| is_legacy_dashboard_password, | ||
| ) | ||
|
|
||
| PASSWORD_STORAGE_UPGRADED_KEY = "password_storage_upgraded" | ||
| PASSWORD_CHANGE_REQUIRED_KEY = "password_change_required" | ||
|
|
||
|
|
||
| def _set_dashboard_flag(config: AstrBotConfig, key: str, value: bool) -> None: | ||
| if config["dashboard"].get(key) == bool(value): | ||
| return | ||
| config["dashboard"][key] = bool(value) | ||
| config.save_config() | ||
|
|
||
|
|
||
| def _has_usable_pbkdf2_password(config: AstrBotConfig) -> bool: | ||
| password = config["dashboard"].get("pbkdf2_password", "") | ||
| if not isinstance(password, str) or not password.startswith("pbkdf2_sha256$"): | ||
| return False | ||
|
|
||
| parts = password.split("$") | ||
| if len(parts) != 4: | ||
| return False | ||
|
|
||
| _, iterations, salt, digest = parts | ||
| try: | ||
| int(iterations) | ||
| bytes.fromhex(salt) | ||
| bytes.fromhex(digest) | ||
| except ValueError: | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| async def is_password_storage_upgraded( | ||
| db: BaseDatabase, | ||
| config: AstrBotConfig, | ||
| ) -> bool: | ||
| config_upgraded = _has_usable_pbkdf2_password(config) | ||
| if config["dashboard"].get(PASSWORD_STORAGE_UPGRADED_KEY) != config_upgraded: | ||
| _set_dashboard_flag(config, PASSWORD_STORAGE_UPGRADED_KEY, config_upgraded) | ||
| return config_upgraded | ||
|
|
||
|
|
||
| async def set_password_storage_upgraded( | ||
| db: BaseDatabase, | ||
| config: AstrBotConfig, | ||
| upgraded: bool, | ||
| ) -> None: | ||
| _set_dashboard_flag(config, PASSWORD_STORAGE_UPGRADED_KEY, upgraded) | ||
|
|
||
|
|
||
| async def is_password_change_required( | ||
| db: BaseDatabase, | ||
| config: AstrBotConfig, | ||
| ) -> bool: | ||
| stored = config["dashboard"].get(PASSWORD_CHANGE_REQUIRED_KEY, None) | ||
| if stored is not None: | ||
| return bool(stored) | ||
|
|
||
| required = bool( | ||
| getattr(config, "_generated_dashboard_password_change_required", False) | ||
| or getattr(config, "_dashboard_password_change_required_from_config", False) | ||
| ) | ||
| if required: | ||
| _set_dashboard_flag(config, PASSWORD_CHANGE_REQUIRED_KEY, True) | ||
| return required | ||
|
|
||
|
|
||
| async def set_password_change_required( | ||
| db: BaseDatabase, | ||
| config: AstrBotConfig, | ||
| required: bool, | ||
| ) -> None: | ||
| _set_dashboard_flag(config, PASSWORD_CHANGE_REQUIRED_KEY, required) | ||
|
|
||
|
|
||
| def get_dashboard_password_hash(config: AstrBotConfig, *, upgraded: bool) -> str: | ||
| if upgraded and _has_usable_pbkdf2_password(config): | ||
| return config["dashboard"].get("pbkdf2_password", "") | ||
|
|
||
| legacy_password = config["dashboard"].get("password", "") | ||
| if upgraded and not is_legacy_dashboard_password(legacy_password): | ||
| return "" | ||
| return legacy_password | ||
|
|
||
|
|
||
| def set_dashboard_password_hashes(config: AstrBotConfig, raw_password: str) -> None: | ||
| config["dashboard"]["pbkdf2_password"] = hash_dashboard_password(raw_password) | ||
| config["dashboard"]["password"] = hash_legacy_dashboard_password(raw_password) |
Uh oh!
There was an error while loading. Please reload this page.