-
Notifications
You must be signed in to change notification settings - Fork 6.4k
Expand file tree
/
Copy pathenctag.py
More file actions
98 lines (77 loc) · 3.11 KB
/
enctag.py
File metadata and controls
98 lines (77 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
Module: CephFS Volume Encryption Tag
This module provides the `CephFSVolumeEncryptionTag` class, which is designed to manage encryption
tags of subvolumes within a CephFS filesystem. The encryption tag mechanism allows
administrators to tag specific subvolumes with identifiers that indicate encryption information,
such as a keyid or other itentifier tags.
Key Features:
- **Set Encryption Tag**: Assigns an tag to a subvolume.
- **Get Encryption Tag**: Retrieves the existing tag of a subvolume, if any.
- **Remove Tag**: Removes the tag from a subvolume, making it available for reallocation.
supported top-level scopes.
"""
import errno
import logging
from typing import Optional, Tuple, Protocol
log = logging.getLogger(__name__)
XATTR_SUBVOLUME_ENCTAG_NAME = 'user.ceph.subvolume.enctag'
class FSOperations(Protocol):
"""Protocol class representing the file system operations earmarking
classes will perform.
"""
def setxattr(
self, path: str, key: str, value: bytes, flags: int
) -> None: ...
def getxattr(self, path: str, key: str) -> bytes: ...
class EncryptionTagException(Exception):
def __init__(self, error_code: int, error_message: str) -> None:
self.errno = error_code
self.error_str = error_message
def to_tuple(self) -> Tuple[int, Optional[str], str]:
return self.errno, "", self.error_str
def __str__(self) -> str:
return f"{self.errno} ({self.error_str})"
class CephFSVolumeEncryptionTag:
ENCTAG_MAX = 255
def __init__(self, fs: FSOperations, path: str) -> None:
self.fs = fs
self.path = path
def _handle_cephfs_error(self, e: Exception, action: str) -> None:
if isinstance(e, ValueError):
raise EncryptionTagException(
-errno.EINVAL, f"Invalid encryption tag specified: {e}"
) from e
elif isinstance(e, OSError):
log.error(f"Error {action} encryption tag: {e}")
raise EncryptionTagException(-e.errno, e.strerror) from e
else:
log.error(f"Unexpected error {action} encryption tag: {e}")
raise EncryptionTagException(
-errno.EIO, "Unexpected error"
) from e
def get_tag(self) -> Optional[str]:
try:
enc_tag_value = self.fs.getxattr(
self.path, XATTR_SUBVOLUME_ENCTAG_NAME
).decode('utf-8')
return enc_tag_value
except Exception as e:
self._handle_cephfs_error(e, "getting")
return None
def set_tag(self, enc_tag: str) -> None:
try:
if len(enc_tag) > self.ENCTAG_MAX:
raise ValueError(
f"length '{len(enc_tag)} > {self.ENCTAG_MAX}'"
)
self.fs.setxattr(
self.path,
XATTR_SUBVOLUME_ENCTAG_NAME,
enc_tag.encode('utf-8'),
0,
)
log.info(f"Encryption Tag '{enc_tag}' set on {self.path}.")
except Exception as e:
self._handle_cephfs_error(e, "setting")
def clear_tag(self) -> None:
self.set_tag("")