-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathquery.py
More file actions
419 lines (339 loc) · 14.6 KB
/
query.py
File metadata and controls
419 lines (339 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
from collections.abc import Sequence
from json import loads as json_loads
from typing import Optional, Sequence, Union
import numpy as np
import tiledb.libtiledb as lt
from .array import Array
from .ctx import Ctx, CtxMixin, default_ctx
from .datatypes import DataType
from .subarray import Subarray
class Query(CtxMixin, lt.Query):
"""
Represents a TileDB query.
"""
def __init__(
self,
array: Array,
ctx: Optional[Ctx] = None,
attrs: Optional[Union[Sequence[str], Sequence[int]]] = None,
cond: Optional[str] = None,
dims: Union[bool, Sequence[str]] = False,
has_coords: bool = False,
index_col: Optional[Union[bool, Sequence[int]]] = True,
order: Optional[str] = None,
use_arrow: Optional[bool] = None,
return_arrow: bool = False,
return_incomplete: bool = False,
):
"""Class representing a query on a TileDB Array.
Allows easy subarray queries of cells for an item or region of the array
across one or more attributes. Optionally subselect over attributes, return
dense result coordinate values, and specify a layout a result layout / cell-order.
For write mode arrays, the Query can be used to write data with explicit control
over submit() and finalize() operations.
:param array: the Array object to query.
:param ctx: the TileDB context.
:param attrs: the attributes to subselect over.
If attrs is None (default) all array attributes will be returned.
Array attributes can be defined by name or by positional index.
:param cond: the str expression to filter attributes or dimensions on. The expression must be parsable by tiledb.QueryCondition(). See help(tiledb.QueryCondition) for more details.
:param dims: the dimensions to subselect over. If dims is False (default), no specific selection is made.
If True, all dimensions are returned. Otherwise, specify a list of dimension names.
:param has_coords: (deprecated) if True, return array of coordinate value (default False).
:param index_col: For dataframe queries, override the saved index information,
and only set specified index(es) in the final dataframe, or None.
:param order: 'C', 'F', 'G', or 'U' (row-major, col-major, global order, unordered).
:param use_arrow: if True, return dataframes via PyArrow if applicable.
:param return_arrow: if True, return results as a PyArrow Table if applicable.
:param return_incomplete: if True, initialize and return an iterable Query object over the indexed range.
Consuming this iterable returns a result set for each TileDB incomplete query.
If False (default), queries will be internally run to completion by resizing buffers and
resubmitting until query is complete.
"""
if array.mode not in ("r", "d", "w"):
raise ValueError("array mode must be read, delete, or write mode")
if dims not in (False, None) and has_coords == True:
raise ValueError("Cannot pass both dims and has_coords=True to Query")
if return_incomplete and not array.schema.sparse:
raise lt.TileDBError(
"Incomplete queries are only supported for sparse arrays at this time"
)
# reference to the array we are querying
self._array = array
query_type_map = {
"r": lt.QueryType.READ,
"d": lt.QueryType.DELETE,
"w": lt.QueryType.WRITE,
}
query_type = query_type_map[array.mode]
super().__init__(
ctx, lt.Array(ctx if ctx is not None else default_ctx(), array), query_type
)
if order is None:
if array.schema.sparse:
order = "U" # unordered
else:
order = "C" # row-major
layout_map = {
"C": lt.LayoutType.ROW_MAJOR,
"F": lt.LayoutType.COL_MAJOR,
"G": lt.LayoutType.GLOBAL_ORDER,
"U": lt.LayoutType.UNORDERED,
}
if order not in layout_map:
raise ValueError(
f"order must be one of {list(layout_map.keys())}, got '{order}'"
)
self.layout = layout_map[order]
self._order = order
self._dims = dims
if dims == True or has_coords == True:
domain = array.schema.domain
self._dims = [domain.dim(i).name for i in range(domain.ndim)]
elif dims:
domain = array.schema.domain
for dname in dims:
if not domain.has_dim(dname):
raise lt.TileDBError(
f"Selected dimension does not exist: '{dname}'"
)
self._dims = dims
else:
self._dims = None
if attrs is not None:
for name in attrs:
if not array.schema.has_attr(name):
raise lt.TileDBError(f"Selected attribute does not exist: '{name}'")
self._attrs = attrs
self._cond = cond
self._has_coords = has_coords
self._index_col = index_col
self._return_arrow = return_arrow
self._use_arrow = use_arrow
self._return_incomplete = return_incomplete
if array.mode in ("r", "d") and return_arrow:
if use_arrow is None:
use_arrow = True
if not use_arrow:
raise lt.TileDBError(
"Cannot initialize return_arrow with use_arrow=False"
)
def subarray(self) -> Subarray:
"""Subarray with the ranges this query is on.
:rtype: Subarray
"""
return Subarray.from_pybind11(self._ctx, self._subarray)
def __getitem__(self, selection):
if self._return_arrow:
raise lt.TileDBError("`return_arrow=True` requires .df indexer`")
return self._array.subarray(
selection,
attrs=self._attrs,
cond=self._cond,
coords=self._has_coords if self._has_coords else self._dims,
order=self._order,
)
def agg(self, aggs):
"""
Calculate an aggregate operation for a given attribute. Available
operations are sum, min, max, mean, count, and null_count (for nullable
attributes only). Aggregates may be combined with other query operations
such as query conditions and slicing.
The input may be a single operation, a list of operations, or a
dictionary with attribute mapping to a single operation or list of
operations.
For undefined operations on max and min, which can occur when a nullable
attribute contains only nulled data at the given coordinates or when
there is no data read for the given query (e.g. query conditions that do
not match any values or coordinates that contain no data)), invalid
results are represented as np.nan for attributes of floating point types
and None for integer types.
>>> import tiledb, tempfile, numpy as np
>>> path = tempfile.mkdtemp()
>>> with tiledb.from_numpy(path, np.arange(1, 10)) as A:
... pass
>>> # Note that tiledb.from_numpy creates anonymous attributes, so the
>>> # name of the attribute is represented as an empty string
>>> with tiledb.open(path, 'r') as A:
... A.query().agg("sum")[:]
45
>>> with tiledb.open(path, 'r') as A:
... A.query(cond="attr('') < 5").agg(["count", "mean"])[:]
{'count': 9, 'mean': 2.5}
>>> with tiledb.open(path, 'r') as A:
... A.query().agg({"": ["max", "min"]})[2:7]
{'max': 7, 'min': 3}
:param agg: The input attributes and operations to apply aggregations on
:returns: single value for single operation on one attribute, a dictionary
of attribute keys associated with a single value for a single operation
across multiple attributes, or a dictionary of attribute keys that maps
to a dictionary of operation labels with the associated value
"""
schema = self._array.schema
attr_to_aggs_map = {}
if isinstance(aggs, dict):
attr_to_aggs_map = {
a: (tuple([aggs[a]]) if isinstance(aggs[a], str) else tuple(aggs[a]))
for a in aggs
}
elif isinstance(aggs, str):
attrs = tuple(schema.attr(i).name for i in range(schema.nattr))
attr_to_aggs_map = {a: (aggs,) for a in attrs}
elif isinstance(aggs, Sequence):
attrs = tuple(schema.attr(i).name for i in range(schema.nattr))
attr_to_aggs_map = {a: tuple(aggs) for a in attrs}
from .aggregation import Aggregation
return Aggregation(self, attr_to_aggs_map)
@property
def array(self):
return self._array
@array.setter
def array(self, value):
self._array = value
@property
def attrs(self):
"""List of attributes to include in Query."""
return self._attrs
@attrs.setter
def attrs(self, value):
self._attrs = value
@property
def cond(self):
"""QueryCondition used to filter attributes or dimensions in Query."""
return self._cond
@cond.setter
def cond(self, value):
self._cond = value
@property
def dims(self):
"""List of dimensions to include in Query."""
return self._dims
@property
def has_coords(self):
"""
True if query should include (return) coordinate values.
:rtype: bool
"""
return self._has_coords
@property
def order(self):
"""Return underlying Array order."""
return self._order
@order.setter
def order(self, value):
self._order = value
@property
def index_col(self):
"""List of columns to set as index for dataframe queries, or None."""
return self._index_col
@property
def use_arrow(self):
return self._use_arrow
@property
def return_arrow(self):
return self._return_arrow
@property
def return_incomplete(self):
return self._return_incomplete
@property
def domain_index(self):
"""Apply Array.domain_index with query parameters."""
return self._domain_index
def label_index(self, labels):
"""Apply Array.label_index with query parameters."""
from .multirange_indexing import LabelIndexer
return LabelIndexer(self._array, tuple(labels), query=self)
@property
def multi_index(self):
"""Apply Array.multi_index with query parameters."""
# Delayed to avoid circular import
from .multirange_indexing import MultiRangeIndexer
return MultiRangeIndexer(self._array, query=self)
@property
def df(self):
"""Apply Array.multi_index with query parameters and return result
as a Pandas dataframe."""
# Delayed to avoid circular import
from .multirange_indexing import DataFrameIndexer
return DataFrameIndexer(self._array, query=self, use_arrow=self._use_arrow)
def get_stats(self, print_out=True, json=False):
"""Retrieves the stats from a TileDB query.
:param print_out: Print string to console (default True), or return as string
:param json: Return stats JSON object (default: False)
"""
pyquery = self._array.pyquery
if pyquery is None:
return ""
stats = self._array.pyquery.get_stats()
if json:
stats = json_loads(stats)
if print_out:
print(stats)
else:
return stats
def submit(self):
"""Submit the query.
For read/delete queries: an alias for calling the regular indexer [:].
For write queries: submits the write query with current buffers.
"""
if self._array.mode in ("r", "d"):
return self[:]
else:
# Write mode - submit the underlying query
return self._submit()
def finalize(self):
"""Finalize a query."""
super().finalize()
def set_data(self, data):
"""Set data buffers for write queries.
:param data: Dictionary mapping attribute/dimension names to numpy arrays,
or a single numpy array if the array has a single attribute.
:raises ValueError: if array is not in write mode or invalid data provided
Example:
>>> import tiledb, numpy as np
>>> with tiledb.open(uri, 'w') as A:
... q = tiledb.Query(A, order='G')
... q.set_data({'d1': np.array([1, 5, 10]), 'a1': np.array([100, 200, 300])})
... q.submit()
... q.set_data({'d1': np.array([15, 20]), 'a1': np.array([400, 500])})
... q.submit()
... q.finalize()
"""
if self._array.mode != "w":
raise ValueError("set_data() is only supported for arrays in write mode")
schema = self._array.schema
# Convert single array to dict
if isinstance(data, np.ndarray):
if schema.nattr != 1:
raise ValueError(
"Single array provided but schema has multiple attributes"
)
data = {schema.attr(0).name: data}
if not isinstance(data, dict):
raise ValueError("data must be a dict or numpy array")
# Set buffers for each attribute/dimension
for name, buffer in data.items():
if not isinstance(buffer, np.ndarray):
buffer = np.array(buffer)
# Determine ncells based on datatype
if schema.has_attr(name):
dtype = schema.attr(name).dtype
elif schema.domain.has_dim(name):
dtype = schema.domain.dim(name).dtype
else:
raise ValueError(f"Unknown attribute or dimension: {name}")
ncells = DataType.from_numpy(dtype).ncells
buffer_size = np.uint64(len(buffer) * ncells)
self.set_data_buffer(name, buffer, buffer_size)
def set_subarray_ranges(self, ranges):
"""Set subarray for dense array writes.
:param ranges: List of (start, end) tuples, one per dimension.
"""
if self._array.mode != "w":
raise ValueError(
"set_subarray_ranges() is only supported for arrays in write mode"
)
subarray = Subarray(self._array, self._ctx)
dim_ranges = [[r] for r in ranges]
subarray.add_ranges(dim_ranges)
self.set_subarray(subarray)