Skip to content

Commit 943be3d

Browse files
committed
fix tags ARRAY column (#212)
1 parent d856ec8 commit 943be3d

5 files changed

Lines changed: 120 additions & 36 deletions

File tree

.github/workflows/update-db.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
name: Update DB
22

33
on:
4-
push:
5-
branches:
6-
- main
7-
pull_request:
8-
branches:
9-
- main
104
workflow_dispatch:
115
repository_dispatch:
126
types: [update-db] # This is the event type we'll trigger from the API. It is currently used in carbonplan/offsets-db-download/.github/workflows

offsets_db_api/tasks.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import csv
22
import datetime
33
import io
4+
import json
45
import random
56
import time
67
import traceback
@@ -179,13 +180,23 @@ def process_dataframe(
179180
for col_name, dtype in dtype_dict.items():
180181
if 'ARRAY' in str(dtype) and col_name in df.columns:
181182
logger.info(f'Converting column {col_name} to PostgreSQL array format')
182-
df[col_name] = df[col_name].apply(
183-
lambda x: (
184-
'{' + ','.join(str(i) for i in x) + '}'
185-
if (hasattr(x, '__iter__') and not isinstance(x, str))
186-
else x
187-
)
188-
)
183+
184+
def _to_pg_array(x):
185+
if x is None:
186+
return x
187+
if isinstance(x, str):
188+
try:
189+
parsed = json.loads(x)
190+
if isinstance(parsed, list):
191+
return '{' + ','.join(str(i) for i in parsed) + '}'
192+
except (json.JSONDecodeError, ValueError):
193+
pass
194+
return x
195+
if hasattr(x, '__iter__'):
196+
return '{' + ','.join(str(i) for i in x) + '}'
197+
return x
198+
199+
df[col_name] = df[col_name].apply(_to_pg_array)
189200

190201
with engine.begin() as conn:
191202
if table_name == 'credit':
@@ -525,6 +536,11 @@ def truncate_clip_tables():
525536
for _, row in df.iterrows():
526537
clip_id = row['id'] # Assuming 'id' is the primary key in Clip model
527538
project_ids = row['project_ids']
539+
if isinstance(project_ids, str):
540+
try:
541+
project_ids = json.loads(project_ids)
542+
except (json.JSONDecodeError, ValueError):
543+
project_ids = [project_ids]
528544
for project_id in project_ids:
529545
clip_projects_data.append({'id': index, 'clip_id': clip_id, 'project_id': project_id})
530546
index += 1

pixi.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,12 @@
134134
db-down = "docker compose down"
135135
db-logs = "docker compose logs -f db"
136136
db-up = "docker compose up -d --wait db"
137+
# Wipe data volumes and start a completely fresh DB (WARNING: destroys all local data).
138+
db-reset = "docker compose down -v && docker compose up -d --wait db"
137139
# Start DB, wait until healthy, then apply migrations (one-time setup or after a schema change).
138140
dev-setup = { depends-on = ["db-up", "migrate"] }
141+
# Wipe DB, recreate, and apply migrations — use when the schema is broken or migrations are out of sync.
142+
dev-reset = { depends-on = ["db-reset", "migrate"] }
139143
# Start DB then launch the dev server (daily startup).
140144
dev = { depends-on = ["db-up", "serve"] }
141145

@@ -149,6 +153,8 @@
149153
# ── Database updates ──────────────────────────────────────────────────────
150154
update-db-production = "python scripts/update_database.py production --url https://offsets-db.fly.dev/files/"
151155
update-db-staging = "python scripts/update_database.py staging --url https://offsets-db-staging.fly.dev/files/"
156+
# Seed local DB from staging-files.json (requires `pixi run serve` to be running in another terminal).
157+
seed-local = "OFFSETS_DB_API_KEY_STAGING=local-dev-key python scripts/update_database.py staging --url http://127.0.0.1:8000/files/"
152158

153159
# ── Load testing ──────────────────────────────────────────────────────────
154160
loadtest = "locust -f load-testing/locustfile.py"

scripts/update_database.py

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
import json
44
import os
55
import sys
6+
import time
67

78
import fsspec
9+
import httpx
810
import pandas as pd
9-
import requests
1011

1112

1213
def generate_path(*, date: datetime.date, bucket: str, category: str) -> str:
@@ -77,43 +78,110 @@ def load_files_from_json(file_path: str) -> list[dict[str, str]]:
7778
sys.exit(1)
7879

7980

81+
def _get_api_key(env: str) -> str:
82+
var = 'OFFSETS_DB_API_KEY_PRODUCTION' if env == 'production' else 'OFFSETS_DB_API_KEY_STAGING'
83+
key = os.environ.get(var)
84+
if key is None:
85+
raise ValueError(f'{var} environment variable not set')
86+
return key
87+
88+
89+
def _request(method: str, url: str, headers: dict, timeout: float = 30, **kwargs) -> httpx.Response:
90+
try:
91+
return httpx.request(method, url, headers=headers, timeout=timeout, **kwargs)
92+
except httpx.ConnectError:
93+
print(f'Error: could not connect to {url}')
94+
print('Is the API server running?')
95+
sys.exit(1)
96+
except httpx.TimeoutException:
97+
print(f'Error: request to {url} timed out after {timeout:.0f}s')
98+
sys.exit(1)
99+
100+
101+
def _poll_until_complete(
102+
*,
103+
base_url: str,
104+
file_ids: list[int],
105+
headers: dict,
106+
initial_delay: float = 2.0,
107+
max_delay: float = 30.0,
108+
timeout: float = 600.0,
109+
) -> list[dict]:
110+
"""Poll file statuses with exponential backoff until all leave pending state."""
111+
pending = set(file_ids)
112+
results: dict[int, dict] = {}
113+
deadline = time.monotonic() + timeout
114+
delay = initial_delay
115+
116+
print(f'\nPolling status for {len(file_ids)} file(s)...')
117+
118+
while pending:
119+
if time.monotonic() > deadline:
120+
timed_out = [str(i) for i in pending]
121+
print(f'Timed out waiting for file(s): {", ".join(timed_out)}')
122+
sys.exit(1)
123+
124+
time.sleep(delay)
125+
delay = min(delay * 2, max_delay)
126+
127+
for file_id in list(pending):
128+
resp = _request('GET', f'{base_url.rstrip("/")}/{file_id}', headers=headers)
129+
if not resp.is_success:
130+
print(f' [{file_id}] HTTP {resp.status_code} polling status — skipping')
131+
continue
132+
file = resp.json()
133+
if file['status'] != 'pending':
134+
pending.discard(file_id)
135+
results[file_id] = file
136+
icon = '✓' if file['status'] == 'success' else '✗'
137+
error = f' error: {file["error"]}' if file.get('error') else ''
138+
print(
139+
f' {icon} [{file_id}] {file["category"]:8s} {file["status"]:8s} {file["url"]}{error}'
140+
)
141+
142+
return list(results.values())
143+
144+
80145
def post_data_to_environment(
81146
*,
82147
env: str,
83148
url: str,
84149
files: list[dict[str, str]],
150+
post_timeout: float = 300,
85151
) -> None:
86-
"""Post file definitions to the API."""
87-
# Get API key from environment
88-
if env == 'production':
89-
api_key = os.environ.get('OFFSETS_DB_API_KEY_PRODUCTION')
90-
if api_key is None:
91-
raise ValueError('OFFSETS_DB_API_KEY_PRODUCTION environment variable not set')
92-
else:
93-
api_key = os.environ.get('OFFSETS_DB_API_KEY_STAGING')
94-
if api_key is None:
95-
raise ValueError('OFFSETS_DB_API_KEY_STAGING environment variable not set')
96-
97152
headers = {
98153
'accept': 'application/json',
99154
'Content-Type': 'application/json',
100-
'X-API-KEY': api_key,
155+
'X-API-KEY': _get_api_key(env),
101156
}
102157

103-
print(f'\nSending {len(files)} files to {url}:')
158+
print(f'\nSending {len(files)} file(s) to {url}:')
104159
for file in files:
105160
print(f'- {file["category"]}: {file["url"]}')
106161

107-
# Send the request
108-
response = requests.post(url, headers=headers, data=json.dumps(files))
162+
response = _request('POST', url, headers=headers, json=files, timeout=post_timeout)
163+
if not response.is_success:
164+
print(f'\nFailed in {env}: HTTP {response.status_code} {response.reason_phrase}')
165+
if body := response.text.strip():
166+
print(body)
167+
sys.exit(1)
168+
169+
queued = response.json()
170+
file_ids = [f['id'] for f in queued]
171+
print(f'Queued {len(file_ids)} file(s) with ids: {file_ids}')
109172

110-
# Log the response
111-
if response.ok:
112-
print(f'\nSuccess in {env}:', response.json())
113-
else:
114-
print(f'\nFailed in {env}:', response.text)
173+
results = _poll_until_complete(base_url=url, file_ids=file_ids, headers=headers)
174+
175+
if failures := [f for f in results if f['status'] == 'failure']:
176+
print(f'\n{len(failures)} file(s) failed in {env}:')
177+
for f in failures:
178+
print(f' - [{f["id"]}] {f["url"]}')
179+
if f.get('error'):
180+
print(f' {f["error"]}')
115181
sys.exit(1)
116182

183+
print(f'\nAll {len(results)} file(s) processed successfully in {env}.')
184+
117185

118186
def main():
119187
parser = argparse.ArgumentParser(

0 commit comments

Comments
 (0)