-
Notifications
You must be signed in to change notification settings - Fork 233
Expand file tree
/
Copy path_resolvers.py
More file actions
132 lines (104 loc) · 4.28 KB
/
_resolvers.py
File metadata and controls
132 lines (104 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright 2017-2020 Palantir Technologies, Inc.
# Copyright 2021- Python Language Server Contributors.
import logging
from collections import defaultdict
from time import time
from jedi.api.classes import Completion
from pylsp import lsp
log = logging.getLogger(__name__)
# ---- Base class
# -----------------------------------------------------------------------------
class Resolver:
def __init__(self, callback, resolve_on_error, time_to_live=60 * 30) -> None:
self.callback = callback
self.resolve_on_error = resolve_on_error
self._cache = {}
self._time_to_live = time_to_live
self._cache_ttl = defaultdict(set)
self._clear_every = 2
# see https://github.com/davidhalter/jedi/blob/master/jedi/inference/helpers.py#L194-L202
self._cached_modules = {"pandas", "numpy", "tensorflow", "matplotlib"}
@property
def cached_modules(self):
return self._cached_modules
@cached_modules.setter
def cached_modules(self, new_value):
self._cached_modules = set(new_value)
def clear_outdated(self) -> None:
now = self.time_key()
to_clear = [timestamp for timestamp in self._cache_ttl if timestamp < now]
for time_key in to_clear:
for key in self._cache_ttl[time_key]:
del self._cache[key]
del self._cache_ttl[time_key]
def time_key(self):
return int(time() / self._time_to_live)
def get_or_create(self, completion: Completion):
if not completion.full_name:
use_cache = False
else:
module_parts = completion.full_name.split(".")
use_cache = module_parts and module_parts[0] in self._cached_modules
if use_cache:
key = self._create_completion_id(completion)
if key not in self._cache:
if self.time_key() % self._clear_every == 0:
self.clear_outdated()
self._cache[key] = self.resolve(completion)
self._cache_ttl[self.time_key()].add(key)
return self._cache[key]
return self.resolve(completion)
def _create_completion_id(self, completion: Completion):
return (
completion.full_name,
completion.module_path,
completion.line,
completion.column,
self.time_key(),
)
def resolve(self, completion):
try:
sig = completion.get_signatures()
return self.callback(completion, sig)
except Exception as e:
log.warning(
f"Something went wrong when resolving label for {completion}: {e}"
)
return self.resolve_on_error
# ---- Label resolver
# -----------------------------------------------------------------------------
def format_label(completion, sig):
if sig and completion.type in ("function", "method"):
params = ", ".join(param.name for param in sig[0].params)
label = f"{completion.name}({params})"
return label
return completion.name
LABEL_RESOLVER = Resolver(callback=format_label, resolve_on_error="")
# ---- Snippets resolver
# -----------------------------------------------------------------------------
def format_snippet(completion, sig):
if not sig:
return {}
snippet_completion = {}
positional_args = [
param
for param in sig[0].params
if "=" not in param.description and param.name not in {"/", "*"}
]
if len(positional_args) > 1:
# For completions with params, we can generate a snippet instead
snippet_completion["insertTextFormat"] = lsp.InsertTextFormat.Snippet
snippet = completion.name + "("
for i, param in enumerate(positional_args):
snippet += "${{{}:{}}}".format(i + 1, param.name)
if i < len(positional_args) - 1:
snippet += ", "
snippet += ")$0"
snippet_completion["insertText"] = snippet
elif len(positional_args) == 1:
snippet_completion["insertTextFormat"] = lsp.InsertTextFormat.Snippet
snippet_completion["insertText"] = completion.name + "($0)"
else:
snippet_completion["insertText"] = completion.name + "()"
return snippet_completion
SNIPPET_RESOLVER = Resolver(callback=format_snippet, resolve_on_error={})