Skip to content

Commit ed4f1c8

Browse files
authored
add tinybird api clients (#6)
1 parent 467bef2 commit ed4f1c8

22 files changed

+2032
-64
lines changed

README.md

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ Verdin
99
<a href="https://github.com/psf/black"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
1010
</p>
1111

12-
Verdin is a [tiny bird](https://en.wikipedia.org/wiki/Verdin), and also a [Tinybird](https://tinybird.co) SDK for Python
13-
.
12+
Verdin is a [tiny bird](https://en.wikipedia.org/wiki/Verdin), and also a [Tinybird](https://tinybird.co) SDK for Python.
1413

1514
Install
1615
-------
@@ -58,7 +57,7 @@ response: tinybird.PipeJsonResponse = pipe.query({"key": "val"})
5857
print(response.data)
5958
```
6059

61-
### Append to a DataSource
60+
### Append to a data source
6261

6362
```python
6463
from verdin import tinybird
@@ -75,6 +74,19 @@ datasource.append([
7574
])
7675
```
7776

77+
### Append to a data source using high-frequency ingest
78+
79+
The `DataSource` object also gives you access to `/v0/events`, which is the high-frequency ingest, to append data.
80+
Use the `send_events` method and pass JSON serializable documents to it.
81+
82+
```python
83+
datasource.send_events(records=[
84+
{"key": "val1"},
85+
{"key": "val2"},
86+
...
87+
])
88+
```
89+
7890
### Queue and batch records into a DataSource
7991

8092
Verdin provides a way to queue and batch data continuously:
@@ -100,6 +112,61 @@ records.put(("col1-row1", "col2-row1"))
100112
records.put(("col1-row2", "col2-row2"))
101113
```
102114

115+
### API access
116+
117+
The DataSource and Pipes objects presented so far are high-level abstractions that provide a convenience Python API
118+
to deal with the most common use cases. Verdin also provides more low-level access to APIs via `client.api`.
119+
The following APIs are available:
120+
121+
* `/v0/datasources`: `client.api.datasources`
122+
* `/v0/events`: `client.api.events`
123+
* `/v0/pipes`: `client.api.pipes`
124+
* `/v0/tokens`: `client.api.tokens`
125+
* `/v0/variables`: `client.api.variables`
126+
127+
Note that for some (datasources, pipes, tokens), manipulation operations are not implemented as they are typically done
128+
through tb deployments and not through the API.
129+
130+
Also note that API clients do not take care of retries or rate limiting. The caller is expected to handle fault
131+
tolerance.
132+
133+
#### Example (Querying a pipe)
134+
135+
You can query a pipe through the pipes API as follows:
136+
137+
```python
138+
from verdin import tinybird
139+
140+
client = tinybird.Client(...)
141+
142+
response = client.api.pipes.query(
143+
"my_pipe",
144+
parameters={"my_param": "..."},
145+
query="SELECT * FROM _ LIMIT 10",
146+
)
147+
148+
for record in response.data:
149+
# each record is a dictionary
150+
...
151+
```
152+
153+
#### Example (High-frequency ingest)
154+
155+
You can use the HFI endpoint `/v0/events` through the `events` api. As records, you can pass a list of JSON serializable
156+
documents.
157+
158+
```python
159+
from verdin import tinybird
160+
161+
client = tinybird.Client(...)
162+
163+
response = client.api.events.send("my_datasource", records=[
164+
{"id": "...", "value": "..."},
165+
...
166+
])
167+
assert response.quarantined_rows == 0
168+
```
169+
103170
Develop
104171
-------
105172

tests/integration/conftest.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,21 @@
33

44
import pytest
55

6+
from verdin.api import ApiError
67
from verdin.client import Client
78
from verdin.test.cli import TinybirdCli
89
from verdin.test.container import TinybirdLocalContainer
910

11+
# os.environ["SKIP_TINYBIRD_LOCAL_START"] = "1"
12+
13+
14+
def _is_skip_tinybird_local_start() -> bool:
15+
"""
16+
Set SKIP_TINYBIRD_LOCAL_START=1 if you have a tb local container running already with the project deployed. This
17+
allows faster iterations.
18+
"""
19+
return os.environ.get("SKIP_TINYBIRD_LOCAL_START") in ["1", "true", "True", True]
20+
1021

1122
@pytest.fixture(scope="session")
1223
def client(tinybird_local_container) -> Client:
@@ -33,17 +44,44 @@ def tinybird_local_container():
3344

3445
container = TinybirdLocalContainer(cwd=project_dir)
3546

36-
container.start()
47+
if not _is_skip_tinybird_local_start():
48+
container.start()
49+
3750
container.wait_is_up()
3851

3952
yield container
4053

4154
# cleanup
42-
container.stop()
55+
if not _is_skip_tinybird_local_start():
56+
container.stop()
4357

4458

4559
@pytest.fixture(scope="session", autouse=True)
4660
def deployed_project(cli):
61+
if _is_skip_tinybird_local_start():
62+
yield
63+
return
64+
4765
time.sleep(5)
4866
cli.deploy(wait=True, auto=True)
4967
yield
68+
69+
70+
@pytest.fixture(autouse=True)
71+
def _truncate_datasource(client):
72+
# make sure to truncate "simple" datasource and its quarantine table before and after each test
73+
74+
client.api.datasources.truncate("simple")
75+
try:
76+
# also truncate the quarantine table if it exists
77+
client.api.datasources.truncate("simple_quarantine")
78+
except ApiError:
79+
pass
80+
81+
yield
82+
client.api.datasources.truncate("simple")
83+
84+
try:
85+
client.api.datasources.truncate("simple_quarantine")
86+
except ApiError:
87+
pass
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
VERSION 0
2+
3+
DESCRIPTION >
4+
Endpoint to select specific keys from the table
5+
6+
NODE endpoint
7+
SQL >
8+
%
9+
SELECT *
10+
FROM simple
11+
WHERE 1=1
12+
{% if defined(key) %} AND key == {{ String(key) }} {% end %}
13+
14+
TYPE ENDPOINT
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import json
2+
3+
import pytest
4+
5+
from verdin.api import ApiError
6+
from verdin.api.datasources import DataSourceNotFoundError
7+
from tests.utils import retry
8+
9+
10+
class TestDataSourcesApi:
11+
def test_list(self, client):
12+
response = client.api.datasources.list()
13+
14+
assert len(response.datasources) >= 1
15+
16+
# find "simple" datasource in the list of data sources
17+
ds = None
18+
for datasource in response.datasources:
19+
if datasource["name"] == "simple":
20+
ds = datasource
21+
break
22+
23+
assert ds
24+
25+
# smoke tests some attributes
26+
assert ds["engine"]["engine"] == "MergeTree"
27+
assert "simple_kv" in [x["name"] for x in ds["used_by"]]
28+
29+
def test_get_information(self, client):
30+
response = client.api.datasources.get_information("simple")
31+
32+
# smoke tests some attributes
33+
assert response.info["name"] == "simple"
34+
assert response.info["engine"]["engine"] == "MergeTree"
35+
assert "simple_kv" in [x["name"] for x in response.info["used_by"]]
36+
37+
def test_get_information_on_non_existing_datasource(self, client):
38+
with pytest.raises(DataSourceNotFoundError) as e:
39+
client.api.datasources.get_information("non_existing_datasource")
40+
41+
e.match('Data Source "non_existing_datasource" does not exist')
42+
assert e.value.status_code == 404
43+
44+
def test_truncate(self, client):
45+
ds = client.datasource("simple")
46+
ds.append_ndjson(
47+
[
48+
{
49+
"Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
50+
"Timestamp": "2024-01-23T10:30:00.123456",
51+
"Key": "foo",
52+
"Value": "bar",
53+
},
54+
{
55+
"Id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
56+
"Timestamp": "2024-02-23T11:45:00.234567",
57+
"Key": "baz",
58+
"Value": "ed",
59+
},
60+
]
61+
)
62+
63+
def _wait_for_count(cnt: int):
64+
query = client.sql("SELECT count(*) as cnt FROM simple")
65+
assert query.json().data == [{"cnt": cnt}]
66+
67+
retry(_wait_for_count, args=(2,))
68+
69+
client.api.datasources.truncate("simple")
70+
71+
retry(_wait_for_count, args=(0,))
72+
73+
def test_append_to_non_existing_data_source(self, client):
74+
with pytest.raises(ApiError) as e:
75+
client.api.datasources.append("non_existing_datasource", "foo,bar\n")
76+
77+
# this is odd behavior, but currently, this raises a 403, with the error
78+
# "Adding or modifying data sources to this workspace can only be done via deployments"
79+
# due to the way tinybird behaves (apparently it doesn't check mode=append)
80+
81+
assert e.value.status_code == 403
82+
83+
def test_append_csv(self, client):
84+
ds = client.api.datasources
85+
86+
data = "5b6859d2-e060-40a4-949a-7e7fab8e3207,2024-01-23T10:30:00.123456,foo,bar\n"
87+
data += "af49ffce-559c-426e-9787-ddb08628b547,2024-02-23T11:45:00.234567,baz,ed"
88+
89+
response = ds.append("simple", data)
90+
assert not response.error
91+
assert response.quarantine_rows == 0
92+
assert response.invalid_lines == 0
93+
assert response.datasource["name"] == "simple"
94+
95+
assert client.sql("SELECT * FROM simple").json().data == [
96+
{
97+
"id": "5b6859d2-e060-40a4-949a-7e7fab8e3207",
98+
"timestamp": "2024-01-23 10:30:00.123456",
99+
"key": "foo",
100+
"value": "bar",
101+
},
102+
{
103+
"id": "af49ffce-559c-426e-9787-ddb08628b547",
104+
"timestamp": "2024-02-23 11:45:00.234567",
105+
"key": "baz",
106+
"value": "ed",
107+
},
108+
]
109+
110+
def test_append_csv_with_invalid_data(self, client):
111+
ds = client.api.datasources
112+
113+
data = "5b6859d2-e060-40a4-949a-7e7fab8e3207,2024-01-23T10:30:00.123456,foo,bar\n"
114+
data += "af49ffce-559c-426e-9787-ddb08628b5472024-02-23T11:45:00.234567,baz,ed" # error in this line
115+
116+
response = ds.append("simple", data)
117+
assert (
118+
response.error
119+
== "There was an error with file contents: 1 row in quarantine and 1 invalid line."
120+
)
121+
assert response.invalid_lines == 1
122+
assert response.quarantine_rows == 1
123+
124+
def test_append_ndjson(self, client):
125+
ds = client.api.datasources
126+
ds.truncate("simple")
127+
128+
records = [
129+
{
130+
"Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
131+
"Timestamp": "2024-01-23T10:30:00.123456",
132+
"Key": "foo",
133+
"Value": "bar",
134+
},
135+
{
136+
"Id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
137+
"Timestamp": "2024-02-23T11:45:00.234567",
138+
"Key": "baz",
139+
"Value": "ed",
140+
},
141+
]
142+
143+
def _data():
144+
for r in records:
145+
yield json.dumps(r) + "\n"
146+
147+
response = ds.append("simple", _data(), format="ndjson")
148+
assert not response.error
149+
assert response.quarantine_rows == 0
150+
assert response.invalid_lines == 0
151+
assert response.datasource["name"] == "simple"
152+
153+
assert client.sql("SELECT * FROM simple").json().data == [
154+
{
155+
"id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
156+
"timestamp": "2024-01-23 10:30:00.123456",
157+
"key": "foo",
158+
"value": "bar",
159+
},
160+
{
161+
"id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
162+
"timestamp": "2024-02-23 11:45:00.234567",
163+
"key": "baz",
164+
"value": "ed",
165+
},
166+
]

0 commit comments

Comments
 (0)