Skip to content

Commit bbdb64a

Browse files
authored
Merge pull request #1 from datopian/feat/gen-target-schema
CKAN Harverster implementation
2 parents 7843b42 + 07bb18e commit bbdb64a

16 files changed

Lines changed: 473 additions & 111 deletions

.env.example

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# CKAN source
2+
SOURCE_CKAN_URL=https://ckan.com
3+
SOURCE_CKAN_API_KEY=
4+
SOURCE_CKAN_ORG_ID=
5+
6+
# PortalJS Cloud target
7+
PORTALJS_CKAN_URL=https://my-org.portaljs.com
8+
PORTALJS_CKAN_API_KEY=xyz
9+
PORTALJS_ORG_ID=my-org
10+
11+
# Harvest behavior
12+
CONCURRENCY=4
13+
RATE_LIMIT_RPS=2
14+
RETRY_MAX_ATTEMPTS=2
15+
RETRY_BASE_MS=500
16+
17+
# Incremental window
18+
# If set, harvest only datasets with metadata_modified >= SINCE_ISO
19+
SINCE_ISO=2025-02-01T00:00:00Z
20+
# Alternatively, roll-forward state (persisted between runs)
21+
STATE_FILE=.harvest_state.json
22+

.harvest_state.json

Lines changed: 0 additions & 3 deletions
This file was deleted.

README.md

Lines changed: 117 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,134 @@
1-
Open-source framework and scripts for harvesting datasets into [PortalJS](https://portaljs.com).
2-
This repo is designed as a **template** — fork or clone it to quickly set up your own dataset harvesting pipelines.
1+
# PortalJS CKAN Harvester
32

4-
It includes:
3+
A template harvester that pulls datasets from a **CKAN source** and upserts them into a **PortalJS CKAN target**.
54

6-
* Reusable scripts for extracting datasets from common sources (APIs, CSVs, spreadsheets, etc.)
7-
* A plug-and-play **ETL framework** for transforming and publishing datasets
8-
* GitHub Actions workflow for automated harvesting
9-
* Config-driven setup — no need to hard-wire pipelines
5+
**fetch → map → upsert**
106

11-
## 🚀 Quickstart
7+
---
128

13-
1. **Use this template**
14-
Click **“Use this template”** on GitHub to bootstrap your own repo.
9+
## Quick Start
1510

16-
2. **Configure harvesters**
17-
Edit `config.yml` to define dataset sources and pipelines:
11+
```bash
12+
npm install
13+
cp .env.example .env # or edit the existing .env
14+
npm start # run the harvester
15+
```
1816

19-
```yaml
20-
sources:
21-
- name: world-bank
22-
type: api
23-
url: https://api.worldbank.org/v2/
24-
format: json
25-
```
17+
---
2618

27-
3. **Run**
19+
## Environment Variables (.env)
2820

29-
TODO
21+
Use these exact names. Example values are placeholders:
3022

31-
4. **Automate with GitHub Actions**
32-
Push your repo — harvesting will run on schedule using the included workflow (`.github/workflows/harvest.yml`).
23+
```env
24+
# CKAN source
25+
SOURCE_CKAN_URL=<https://source-ckan.example.org>
26+
SOURCE_CKAN_API_KEY=<source-api-key-or-empty>
27+
SOURCE_CKAN_ORG_ID=<org-slug-or-empty>
3328
34-
## 🛠 Features
29+
# PortalJS Cloud target
30+
PORTALJS_CKAN_URL=<http://localhost:5000>
31+
PORTALJS_CKAN_API_KEY=<target-api-key>
32+
PORTALJS_ORG_ID=<target-org-id>
3533
36-
* **Modular scripts** – add your own connectors or reuse provided ones
37-
* **Config-driven** – no need to edit code for new datasets
38-
* **CI/CD ready** – run pipelines directly in GitHub Actions
39-
* **Extensible** – works with PortalJS or standalone
34+
# Harvest behavior
35+
CONCURRENCY=4
36+
RATE_LIMIT_RPS=2
37+
RETRY_MAX_ATTEMPTS=2
38+
RETRY_BASE_MS=500
4039
41-
## 📦 Repo Structure
40+
# Incremental window
41+
SINCE_ISO=2025-02-01T00:00:00Z
42+
STATE_FILE=.harvest_state.json
4243
43-
TODO
44+
```
4445

45-
## 🤝 Contributing
46+
* **`SOURCE_CKAN_URL`** – source CKAN base URL
4647

47-
PRs and new connectors welcome!
48-
Please open an issue if you’d like to propose a new feature or source integration.
48+
* **`SOURCE_CKAN_API_KEY`** – source API key (optional)
4949

50-
## 📄 License
50+
* **`SOURCE_CKAN_ORG_ID`** – restrict harvest to one org (optional, empty = harvest all)
5151

52-
MIT License. See [LICENSE](./LICENSE) for details.
52+
* **`PORTALJS_CKAN_URL`** – target CKAN base URL
53+
54+
* **`PORTALJS_CKAN_API_KEY`** – target API key (**required**)
55+
56+
* **`PORTALJS_ORG_ID`** – target org where datasets will be created (must exist first)
57+
58+
* **`CONCURRENCY`** – how many datasets to process in parallel (optional, default 4)
59+
60+
* **`RATE_LIMIT_RPS`** – max HTTP requests per second (optional, default 2)
61+
62+
* **`RETRY_MAX_ATTEMPTS`** – number of retry attempts on failure (optional, default 2)
63+
64+
* **`RETRY_BASE_MS`** – base delay (ms) for exponential backoff (optional, default 500)
65+
66+
* **`SINCE_ISO`** – harvest only datasets modified after this date (overrides state file) (optional)
67+
68+
* **`STATE_FILE`** – JSON file used to track last run. Stores `lastRunISO`. Lets the harvester run incrementally instead of fetching everything every time.
69+
70+
---
71+
72+
## How It Works
73+
74+
1. **Discover** datasets from source CKAN (`package_search`), filtered by org and/or date.
75+
2. **Map** each dataset from source schema → target schema.
76+
3. **Upsert** into target CKAN (update if exists, create if not).
77+
4. **Persist state** in `STATE_FILE` for the next incremental run.
78+
79+
---
80+
81+
## Project Structure
82+
83+
84+
85+
* **`index.ts`** – main entry. Loads env + state, chooses full vs incremental run, loops datasets, maps, upserts, logs results, updates state.
86+
* **`config.ts`** – loads `.env` with `dotenv` and validates using **Zod**.
87+
* **`gen-schema.ts`** – generates `schemas/target-schema.d.ts` from target CKAN scheming API.
88+
* **`.github/workflows/run-index.yml`** – GitHub Action to run on schedule or manual trigger.
89+
90+
* **`schemas/`**
91+
92+
* **`source-schema.d.ts`** – interface for source datasets.
93+
* **`target-schema.d.ts`** – auto-generated interface for target datasets.
94+
95+
* **`src/`**
96+
97+
* **`source.ts`** – source CKAN client.
98+
99+
* `iterSourcePackages()` async generator over `package_search`.
100+
* Supports org filter and incremental filtering (`metadata_modified >= …`).
101+
102+
* **`target.ts`** – target CKAN helpers.
103+
104+
* Preloads dataset list with `package_list`.
105+
* `upsertPortalDataset()` creates or updates dataset with API key.
106+
107+
* **`map.ts`** – mapping logic.
108+
109+
* Sets `owner_org` to `PORTALJS_ORG_ID`.
110+
* Prefixes dataset `name` with `<owner_org>--` (unique, PortalJS-friendly).
111+
* Maps `title`, `notes`, resources, and ensures defaults (language = EN, description fallback, etc.).
112+
113+
* **`state.ts`** – reads/writes the `STATE_FILE` JSON.
114+
115+
* **`utils.ts`** – small helpers (`withRetry()`, `sleep()`, etc.).
116+
117+
---
118+
119+
## Running
120+
121+
1. Edit `.env`.
122+
2. Run `npm start`.
123+
3. Logs will show:
124+
125+
* “Full harvest mode” or “Incremental mode since <ISO>”
126+
* Final summary: `total=… upserts=… failures=…`
127+
128+
---
129+
130+
## Extending
131+
132+
* **Mapping** – extend `src/map.ts` to add fields (tags, extras, licenses, etc.).
133+
* **Filters** – extend `iterSourcePackages()` to filter by groups, tags, etc.
134+
* **Retries** – tweak retry/backoff logic in `utils.ts`.

config.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@ config(); // loads .env
55
import { z } from "zod";
66

77
const EnvSchema = z.object({
8-
CKAN_BASE_URL: z.string().url(),
9-
CKAN_API_KEY: z.string().optional(),
10-
CKAN_ORG_ID: z.string().optional(),
8+
SOURCE_CKAN_URL: z.string().url(),
9+
SOURCE_CKAN_API_KEY: z.string().optional(),
10+
SOURCE_CKAN_ORG_ID: z.string().optional(),
1111

12-
PORTALJS_BASE_URL: z.string().url(),
13-
PORTALJS_API_TOKEN: z.string().min(1),
12+
PORTALJS_CKAN_URL: z.string().url(),
13+
PORTALJS_CKAN_API_KEY: z.string().min(1),
1414
PORTALJS_ORG_ID: z.string().min(1),
1515

1616
CONCURRENCY: z.coerce.number().default(4),
1717
RATE_LIMIT_RPS: z.coerce.number().default(2),
18-
RETRY_MAX_ATTEMPTS: z.coerce.number().default(5),
18+
RETRY_MAX_ATTEMPTS: z.coerce.number().default(2),
1919
RETRY_BASE_MS: z.coerce.number().default(500),
20-
DRY_RUN: z.preprocess(v => String(v).toLowerCase() === "true", z.boolean()).default(false),
20+
DRY_RUN: z.coerce.boolean().default(false),
2121

2222
SINCE_ISO: z.string().optional(),
2323
STATE_FILE: z.string().default(".harvest_state.json"),

gen-schema.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// generate-types.ts
2+
import fs from "fs";
3+
import { env } from "./config";
4+
import { capitalize } from "./src/utils";
5+
6+
interface SchemaField {
7+
field_name: string;
8+
choices?: Array<{ value: string; label: string }>;
9+
validators?: string | string[];
10+
}
11+
12+
async function generateTypes(type: string, schemaUrl: string) {
13+
const res = await fetch(schemaUrl);
14+
if (!res.ok) {
15+
const text = await res.text().catch(() => "");
16+
throw new Error(
17+
`Failed to fetch schema from ${schemaUrl}: ${res.status} ${
18+
res.statusText
19+
} ${text.slice(0, 300)}`
20+
);
21+
}
22+
const data = await res.json();
23+
const { dataset_fields, resource_fields } = data.result;
24+
25+
const datasetProps = dataset_fields.map((f: SchemaField) => {
26+
let tsType = "string";
27+
28+
if (f.choices) {
29+
tsType = f.choices.map((c) => JSON.stringify(String(c.value))).join(" | ");
30+
}
31+
32+
const validators = Array.isArray(f.validators) ? f.validators : (f.validators ? [f.validators] : []);
33+
const isRequired = validators.some(v => v.includes("not_empty") || v.includes("scheming_required"));
34+
35+
return ` ${f.field_name}${isRequired ? "" : "?"}: ${tsType};`;
36+
});
37+
38+
const resourceProps = resource_fields.map((f: SchemaField) => {
39+
return ` ${f.field_name}?: string;`;
40+
});
41+
42+
const content = `// Auto-generated from CKAN schema
43+
export interface ${capitalize(type)}Schema {
44+
${datasetProps.join("\n")}
45+
resources?: CkanResource[];
46+
}
47+
48+
export interface CkanResource {
49+
${resourceProps.join("\n")}
50+
}
51+
`;
52+
53+
fs.mkdirSync("schemas", { recursive: true });
54+
fs.writeFileSync(`schemas/${type}-schema.d.ts`, content);
55+
console.log(`✅ Types generated in schemas/${type}-schema.d.ts`);
56+
}
57+
58+
generateTypes(
59+
"target",
60+
env.PORTALJS_CKAN_URL +
61+
"/api/3/action/scheming_dataset_schema_show?type=dataset"
62+
);

index.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import Bottleneck from "bottleneck";
22
import { env } from "./config";
3-
import { iterSourcePackages } from "./src/ckan";
3+
import { iterSourcePackages } from "./src/source";
44
import { mapCkanToPortalJS } from "./src/map";
5-
import { upsertPortalDataset } from "./src/cloud";
5+
import { upsertPortalDataset } from "./src/target";
66
import { readState, writeState } from "./src/state";
77
import { withRetry } from "./src/utils";
88

@@ -29,7 +29,6 @@ async function main() {
2929
const job = async () => {
3030
try {
3131
const payload = mapCkanToPortalJS(ds, env.PORTALJS_ORG_ID);
32-
3332
await withRetry(() => upsertPortalDataset(payload), `upsert ${ds.name}`);
3433
upserts++;
3534
} catch (err: any) {

0 commit comments

Comments
 (0)