-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathkg.py
More file actions
128 lines (105 loc) · 3.84 KB
/
kg.py
File metadata and controls
128 lines (105 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Diffbot Knowledge Graph APIs: DQL search and entity enhancement."""
import asyncio
import pathlib
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
from .ontology import Ontology
if TYPE_CHECKING:
from .client import Diffbot, DiffbotAsync
KG_DQL_ENDPOINT = "https://kg.diffbot.com/kg/v3/dql"
KG_ONTOLOGY_ENDPOINT = "https://kg.diffbot.com/kg/ontology"
def _build_dql_params(
client: Any,
query: str,
size: int,
from_: int,
format: str,
filter: Optional[str],
exportspec: Optional[str],
extra: Optional[Dict[str, str]],
) -> Dict[str, Any]:
params: Dict[str, Any] = {"token": client.token, "query": query, "size": size}
if from_:
params["from"] = from_
if format != "json":
params["format"] = format
if filter is not None:
params["filter"] = filter
if exportspec is not None:
params["exportspec"] = exportspec
if extra:
params.update(extra)
return params
def dql(
client: "Diffbot",
query: str,
*,
size: int = 10,
from_: int = 0,
format: str = "json",
filter: Optional[str] = None,
exportspec: Optional[str] = None,
extra: Optional[Dict[str, str]] = None,
raw: bool = False,
) -> Union[Dict[str, Any], bytes]:
params = _build_dql_params(client, query, size, from_, format, filter, exportspec, extra)
response = client._http.get(KG_DQL_ENDPOINT, params=params)
client._raise_for_status(response)
return response.content if raw else response.json()
async def dql_async(
client: "DiffbotAsync",
query: str,
*,
size: int = 10,
from_: int = 0,
format: str = "json",
filter: Optional[str] = None,
exportspec: Optional[str] = None,
extra: Optional[Dict[str, str]] = None,
raw: bool = False,
) -> Union[Dict[str, Any], bytes]:
params = _build_dql_params(client, query, size, from_, format, filter, exportspec, extra)
response = await client._http.get(KG_DQL_ENDPOINT, params=params)
client._raise_for_status(response)
return response.content if raw else response.json()
def dql_parallel(
client: "Diffbot",
queries: Sequence[Dict[str, Any]],
*,
workers: int = 8,
) -> List[Union[Dict[str, Any], bytes]]:
if not queries:
return []
with ThreadPoolExecutor(max_workers=min(workers, len(queries))) as ex:
return list(ex.map(lambda q: dql(client, **q), queries))
async def dql_parallel_async(
client: "DiffbotAsync",
queries: Sequence[Dict[str, Any]],
*,
workers: int = 8,
) -> List[Union[Dict[str, Any], bytes]]:
if not queries:
return []
sem = asyncio.Semaphore(workers)
async def _one(q: Dict[str, Any]) -> Union[Dict[str, Any], bytes]:
async with sem:
return await dql_async(client, **q)
return await asyncio.gather(*(_one(q) for q in queries))
def dql_refresh_ontology(client: "Diffbot", dest: pathlib.Path) -> None:
response = client._http.get(KG_ONTOLOGY_ENDPOINT)
client._raise_for_status(response)
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(response.content)
def dql_fetch_ontology(client: "Diffbot") -> Ontology:
"""Download the ontology and return it as a queryable :class:`Ontology`.
Performs no caching — the caller decides whether and where to hold onto the
result. Use :func:`dql_refresh_ontology` instead to persist raw bytes to disk.
"""
response = client._http.get(KG_ONTOLOGY_ENDPOINT)
client._raise_for_status(response)
return Ontology.from_json(response.content)
async def dql_fetch_ontology_async(client: "DiffbotAsync") -> Ontology:
"""Async variant of :func:`dql_fetch_ontology`."""
response = await client._http.get(KG_ONTOLOGY_ENDPOINT)
client._raise_for_status(response)
return Ontology.from_json(response.content)