-
-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy path__init__.py
More file actions
157 lines (114 loc) · 4.17 KB
/
__init__.py
File metadata and controls
157 lines (114 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import binascii
import itertools
import tempfile
import zipfile
import glob
import sys
import re
from importlib.machinery import SourceFileLoader
from acid import pure
from acid.nvim.log import log_debug, log_warning
path_ns_cache = {}
def get_customization_variable(nvim, var, default=None):
return nvim.current.buffer.vars.get(var, nvim.vars.get(var, default))
def current_path(nvim):
return nvim.funcs.getcwd()
def path_to_ns(nvim, fpath=None, force=False):
if not fpath:
fpath = nvim.funcs.expand("%:p:r")
if fpath in path_ns_cache and not force:
ns = path_ns_cache[fpath]
return ns
log_debug("Namespace is not cached. Finding from path")
stop_paths = get_stop_paths(nvim)
ns = pure.path_to_ns(fpath, stop_paths)
path_ns_cache[fpath] = ns
return ns
def get_port_no(nvim):
pwd = current_path(nvim)
with open(os.path.join(pwd, ".nrepl-port")) as port:
return port.read().strip()
def repl_host_address(nvim):
path = current_path(nvim)
connection = nvim.exec_lua(
"return require('acid.connections').get(...)", path
)
if connection is not None:
return connection
log_debug("Failed to connect from lua connections.")
host = nvim.vars.get('acid_nrepl_host', '127.0.0.1')
try:
addr = [host, get_port_no(nvim)]
nvim.exec_lua(
"require('acid.connections').set(...)", path, addr, async_ = True)
log_debug("Found a .nrepl-port file, using that")
return addr
except:
return None
# Renamed the function, keeping this here to avoid breaking stuff..
localhost = repl_host_address
def format_addr(*addr):
return "{}://{}:{}".format('nrepl', *addr)
def get_acid_ns(nvim, fpath=None):
strategy = get_customization_variable(nvim, 'acid_ns_strategy', '')
if 'ns:' in strategy:
return strategy.split(':')[-1]
return path_to_ns(nvim, fpath)
def test_paths(nvim):
return {'test', *nvim.vars.get('acid_alt_test_paths', [])}
def src_paths(nvim):
return {'src', *nvim.vars.get('acid_alt_paths', [])}
def get_stop_paths(nvim):
return {'test', 'src', *test_paths(nvim), *src_paths(nvim)}
def find_file_in_path(nvim, fname, resource = None):
log_debug("finding path")
protocol, *_, fpath = fname.split(':')
log_debug("Finding path to {}", fname )
if protocol == 'file':
if os.path.exists(fpath):
return fpath
elif resource != None:
paths = get_stop_paths(nvim)
for path in paths:
attempt = os.path.join(path, resource)
if os.path.exists(attempt):
return attempt
project = resource.split('/')[0]
foreign_project_fpath = nvim.vars.get('acid_project_root', None)
if foreign_project_fpath is None:
return
for path in paths:
attempt = os.path.join(
foreign_project_fpath, project, path, resource
)
if os.path.exists(attempt):
return attempt
elif protocol == 'jar':
jarpath, fpath = fpath.split('!')
hashed = '{:X}'.format(
binascii.crc32(bytes(jarpath, 'ascii')) & 0xffffffff
)
tmppath = os.path.join(tempfile.gettempdir(), hashed)
if not os.path.exists(tmppath):
os.mkdir(tmppath)
zipf = zipfile.ZipFile(jarpath)
zipf.extractall(tmppath)
zipf.close()
fpath = fpath if not os.path.isabs(fpath) else fpath[1:]
full = os.path.join(tmppath, fpath)
if os.path.exists(full):
return full
return None
def alt_paths(path_arr, alt_paths, root, rename_fn):
# clone array so we don't overwrite last element
path = list(path_arr)[1:]
path[-1] = pure.rename_file(path[-1], rename_fn)
def existing(ap):
alt_root = os.path.join(root, ap)
if os.path.exists(alt_root):
log_debug("Path {} exists", alt_root)
return os.path.join(alt_root, *path)
log_debug("Path {} doesn't exist", alt_root)
return
return filter(lambda i: i is not None, map(existing, alt_paths))