Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ apis/python/src/tiledbsoma/libtiledb.*
apis/python/src/tiledbsoma/libtiledbsoma.*

/.quarto/
/tags

/NOTES/
18 changes: 8 additions & 10 deletions apis/python/src/tiledbsoma/query_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import ast
from dataclasses import dataclass, field
from typing import Any, Callable, List, Tuple, Type, Union
from typing import Any, Callable, List, Tuple, Union

import numpy as np
import tiledb
Expand Down Expand Up @@ -99,15 +99,15 @@ class QueryCondition:
>>> # Select cells where the attribute values for `foo` are less than 5
>>> # and `bar` equal to string "asdf".
>>> # Note precedence is equivalent to:
>>> # tiledb.QueryCondition("foo > 5 or ('asdf' == attr('b a r') and baz <= val(1.0))")
>>> qc = tiledb.QueryCondition("foo > 5 or 'asdf' == attr('b a r') and baz <= val(1.0)")
>>> # tiledbsoma.QueryCondition("foo > 5 or ('asdf' == attr('b a r') and baz <= val(1.0))")
>>> qc = tiledbsoma.QueryCondition("foo > 5 or 'asdf' == attr('b a r') and baz <= val(1.0)")
>>> A.query(attr_cond=qc)
>>>
>>> # Select cells where the attribute values for `foo` are equal to
>>> # 1, 2, or 3.
>>> # Note this is equivalent to:
>>> # tiledb.QueryCondition("foo == 1 or foo == 2 or foo == 3")
>>> A.query(attr_cond=tiledb.QueryCondition("foo in [1, 2, 3]"))
>>> # tiledbsoma.QueryCondition("foo == 1 or foo == 2 or foo == 3")
>>> A.query(attr_cond=tiledbsoma.QueryCondition("foo in [1, 2, 3]"))
"""

expression: str
Expand Down Expand Up @@ -183,7 +183,7 @@ def visit_In(self, node):
def visit_List(self, node):
return list(node.elts)

def visit_Compare(self, node: Type[ast.Compare]) -> PyQueryCondition:
def visit_Compare(self, node: ast.Compare) -> PyQueryCondition:
operator = self.visit(node.ops[0])

if operator in (
Expand Down Expand Up @@ -323,10 +323,8 @@ def get_att_from_node(self, node: QueryConditionNodeElem) -> Any:
raise tiledb.TileDBError(f"Attribute `{att}` not found in schema.")

if att not in self.query_attrs:
raise tiledb.TileDBError(
f"Attribute `{att}` given to filter in query's `attr_cond` "
"arg but not found in `attr` arg."
)
# https://github.com/TileDB-Inc/TileDB-Py/pull/1333/files
self.query_attrs.append(att)

return att

Expand Down
198 changes: 80 additions & 118 deletions apis/python/src/tiledbsoma/soma_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import pyarrow as pa
import tiledb

from . import util, util_arrow, util_tiledb
# This package's pybind11 code
import tiledbsoma.libtiledbsoma as clib

from . import util, util_arrow
from .logging import log_io
from .query_condition import QueryCondition # type: ignore
from .soma_collection import SOMACollectionBase
from .tiledb_array import TileDBArray
from .types import Ids, NTuple, SOMAResultOrder
Expand Down Expand Up @@ -172,40 +176,38 @@ def read(

**Indexing**: the ``ids`` parameter will support, per dimension: a row offset (uint), a row-offset range (slice), or a list of both.
"""
tiledb_result_order = util_tiledb.tiledb_result_order_from_soma_result_order(
result_order, accept=["rowid-ordered", "unordered"]
)

with self._tiledb_open("r") as A:
dim_names, attr_names = util_tiledb.split_column_names(
A.schema, column_names
query_condition = None
if value_filter is not None:
query_condition = QueryCondition(value_filter)

# As an arg to this method, `column_names` is optional-None. For the pybind11
# code it's optional-[].
lib_column_names = [] if column_names is None else column_names

sr = clib.SOMAReader(
self._uri,
name=self.__class__.__name__,
schema=A.schema, # query_condition needs this
column_names=lib_column_names,
query_condition=query_condition,
)
if value_filter is None:
query = A.query(
return_arrow=True,
return_incomplete=True,
order=tiledb_result_order,
dims=dim_names,
attrs=attr_names,
)
else:
qc = tiledb.QueryCondition(value_filter)
query = A.query(
return_arrow=True,
return_incomplete=True,
attr_cond=qc,
order=tiledb_result_order,
dims=dim_names,
attrs=attr_names,
)

if ids is None:
iterator = query.df[:]
else:
iterator = query.df[ids]
if ids is not None:
# XXX TODO NEEDS TO ALWAYS BE A LIST NO MATTER WHAT
if isinstance(ids, slice):
ids = util.slice_to_list(ids)
sr.set_dim_points(ROWID, ids)

# TODO: platform_config
# TODO: batch_size
# TODO: result_order

for table in iterator:
yield table
sr.submit()

while arrow_table := sr.read_next():
# yield util_arrow.ascii_to_unicode_pyarrow_readback(batch)
yield arrow_table # XXX what other post-processing

def read_all(
self,
Expand Down Expand Up @@ -233,6 +235,52 @@ def read_all(
)
)

def read_as_pandas(
self,
*,
ids: Optional[Ids] = None,
value_filter: Optional[str] = None,
column_names: Optional[Sequence[str]] = None,
result_order: Optional[SOMAResultOrder] = None,
# to rename index to 'obs_id' or 'var_id', if desired, for anndata
id_column_name: Optional[str] = None,
) -> Iterator[pd.DataFrame]:
"""
Reads from SOMA storage into memory. For ``to_anndata``, as well as for any interactive use where the user wants a Pandas dataframe. Returns a generator over dataframes for batched read. See also ``read_as_pandas_all`` for a convenience wrapper.

TODO: params-list
"""
for tbl in self.read(
ids=ids,
value_filter=value_filter,
column_names=column_names,
result_order=result_order,
):
yield tbl.to_pandas()

def read_as_pandas_all(
self,
*,
ids: Optional[Ids] = None,
value_filter: Optional[str] = None,
column_names: Optional[Sequence[str]] = None,
result_order: Optional[SOMAResultOrder] = None,
# to rename index to 'obs_id' or 'var_id', if desired, for anndata
id_column_name: Optional[str] = None,
) -> pd.DataFrame:
"""
This is a convenience method around ``read``. It concatenates all partial read results into a single DataFrame. Its nominal use is to simplify unit-test cases.
"""
return pd.concat(
self.read_as_pandas(
ids=ids,
value_filter=value_filter,
column_names=column_names,
result_order=result_order,
id_column_name=id_column_name,
)
)

def _get_is_sparse(self) -> bool:
if self._cached_is_sparse is None:

Expand Down Expand Up @@ -292,92 +340,6 @@ def write(self, values: pa.Table) -> None:
with self._tiledb_open("w") as A:
A[lo : (hi + 1)] = attr_cols_map

def read_as_pandas(
self,
*,
ids: Optional[Ids] = None,
value_filter: Optional[str] = None,
column_names: Optional[Sequence[str]] = None,
result_order: Optional[SOMAResultOrder] = None,
# to rename index to 'obs_id' or 'var_id', if desired, for anndata
id_column_name: Optional[str] = None,
) -> Iterator[pd.DataFrame]:
"""
Reads from SOMA storage into memory. For ``to_anndata``, as well as for any interactive use where the user wants a Pandas dataframe. Returns a generator over dataframes for batched read. See also ``read_as_pandas_all`` for a convenience wrapper.

TODO: params-list
"""
tiledb_result_order = util_tiledb.tiledb_result_order_from_soma_result_order(
result_order, accept=["rowid-ordered", "unordered"]
)

with self._tiledb_open() as A:
dim_names, attr_names = util_tiledb.split_column_names(
A.schema, column_names
)
if value_filter is None:
query = A.query(
return_incomplete=True,
order=tiledb_result_order,
dims=dim_names,
attrs=attr_names,
)
else:
qc = tiledb.QueryCondition(value_filter)
query = A.query(
return_incomplete=True,
attr_cond=qc,
order=tiledb_result_order,
dims=dim_names,
attrs=attr_names,
)

if ids is None:
iterator = query.df[:]
else:
iterator = query.df[ids]

for df in iterator:

if id_column_name is not None:
df.reset_index(inplace=True)
df.set_index(id_column_name, inplace=True)

# Don't materialize soma_rowid on read
if (
ROWID in df.columns
and column_names is not None
and ROWID not in column_names
):
yield df.drop(ROWID, axis=1)
else:
yield df

def read_as_pandas_all(
self,
*,
ids: Optional[Ids] = None,
value_filter: Optional[str] = None,
column_names: Optional[Sequence[str]] = None,
result_order: Optional[SOMAResultOrder] = None,
# to rename index to 'obs_id' or 'var_id', if desired, for anndata
id_column_name: Optional[str] = None,
) -> pd.DataFrame:
"""
This is a convenience method around ``read``. It concatenates all partial read results into a single DataFrame. Its nominal use is to simplify unit-test cases.
"""
dataframes = []
generator = self.read_as_pandas(
ids=ids,
value_filter=value_filter,
column_names=column_names,
id_column_name=id_column_name,
result_order=result_order,
)
for dataframe in generator:
dataframes.append(dataframe)
return pd.concat(dataframes)

def write_from_pandas(
self,
dataframe: pd.DataFrame,
Expand Down Expand Up @@ -432,7 +394,7 @@ def write_from_pandas(
tiledb.from_pandas(
uri=self.uri,
dataframe=dataframe,
# name=self.name,
name=self.__class__.__name__,
sparse=True, # TODO
allows_duplicates=self._tiledb_platform_config.allows_duplicates,
offsets_filters=offsets_filters,
Expand Down
Loading