Skip to content

Commit 23d1dd3

Browse files
authored
Merge pull request #707 from slingdata-io/v1.5.8
V1.5.8
2 parents f827d86 + 7fe76ba commit 23d1dd3

7 files changed

Lines changed: 250 additions & 13 deletions

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ Sling is a passion project turned into a free CLI Product which offers an easy s
3434
* Database to Database
3535
* File System to Database
3636
* Database to File System
37+
* API to Database
38+
* API to File System
3739

3840

3941
https://github.com/slingdata-io/sling-cli/assets/7671010/e10ee716-1de8-4d53-8eb2-95c6d9d7f9f0
@@ -60,7 +62,7 @@ Example [Replication](https://docs.slingdata.io/sling-cli/run/configuration/repl
6062
---
6163

6264
Available Connectors:
63-
- **Databases**: [`bigquery`](https://docs.slingdata.io/connections/database-connections/bigquery) [`bigtable`](https://docs.slingdata.io/connections/database-connections/bigtable) [`clickhouse`](https://docs.slingdata.io/connections/database-connections/clickhouse) [`duckdb`](https://docs.slingdata.io/connections/database-connections/duckdb) [`mariadb`](https://docs.slingdata.io/connections/database-connections/mariadb) [`motherduck`](https://docs.slingdata.io/connections/database-connections/motherduck) [`mysql`](https://docs.slingdata.io/connections/database-connections/mysql) [`oracle`](https://docs.slingdata.io/connections/database-connections/oracle) [`postgres`](https://docs.slingdata.io/connections/database-connections/postgres) [`redshift`](https://docs.slingdata.io/connections/database-connections/redshift) [`snowflake`](https://docs.slingdata.io/connections/database-connections/snowflake) [`sqlite`](https://docs.slingdata.io/connections/database-connections/sqlite) [`sqlserver`](https://docs.slingdata.io/connections/database-connections/sqlserver) [`starrocks`](https://docs.slingdata.io/connections/database-connections/starrocks) [`prometheus`](https://docs.slingdata.io/connections/database-connections/prometheus) [`proton`](https://docs.slingdata.io/connections/database-connections/proton) [`databricks`](https://docs.slingdata.io/connections/database-connections/databricks) [`exasol`](https://docs.slingdata.io/connections/database-connections/exasol)
65+
- **Databases**: [`adbc`](https://docs.slingdata.io/connections/database-connections/adbc) [`azuredwh`](https://docs.slingdata.io/connections/database-connections/azuredwh) [`azuresql`](https://docs.slingdata.io/connections/database-connections/azuresql) [`azuretable`](https://docs.slingdata.io/connections/database-connections/azuretable) [`bigquery`](https://docs.slingdata.io/connections/database-connections/bigquery) [`bigtable`](https://docs.slingdata.io/connections/database-connections/bigtable) [`clickhouse`](https://docs.slingdata.io/connections/database-connections/clickhouse) [`d1`](https://docs.slingdata.io/connections/database-connections/d1) [`databricks`](https://docs.slingdata.io/connections/database-connections/databricks) [`duckdb`](https://docs.slingdata.io/connections/database-connections/duckdb) [`elasticsearch`](https://docs.slingdata.io/connections/database-connections/elasticsearch) [`exasol`](https://docs.slingdata.io/connections/database-connections/exasol) [`fabric`](https://docs.slingdata.io/connections/database-connections/fabric) [`mariadb`](https://docs.slingdata.io/connections/database-connections/mariadb) [`mongodb`](https://docs.slingdata.io/connections/database-connections/mongodb) [`motherduck`](https://docs.slingdata.io/connections/database-connections/motherduck) [`mysql`](https://docs.slingdata.io/connections/database-connections/mysql) [`odbc`](https://docs.slingdata.io/connections/database-connections/odbc) [`oracle`](https://docs.slingdata.io/connections/database-connections/oracle) [`postgres`](https://docs.slingdata.io/connections/database-connections/postgres) [`prometheus`](https://docs.slingdata.io/connections/database-connections/prometheus) [`proton`](https://docs.slingdata.io/connections/database-connections/proton) [`redshift`](https://docs.slingdata.io/connections/database-connections/redshift) [`snowflake`](https://docs.slingdata.io/connections/database-connections/snowflake) [`sqlite`](https://docs.slingdata.io/connections/database-connections/sqlite) [`sqlserver`](https://docs.slingdata.io/connections/database-connections/sqlserver) [`starrocks`](https://docs.slingdata.io/connections/database-connections/starrocks) [`trino`](https://docs.slingdata.io/connections/database-connections/trino)
6466

6567
- **Data Lakes**:[`athena`](https://docs.slingdata.io/connections/datalake-connections/athena) [`Ducklake`](https://docs.slingdata.io/connections/datalake-connections/ducklake) [`iceberg`](https://docs.slingdata.io/connections/datalake-connections/iceberg) [`S3 Tables`](https://docs.slingdata.io/connections/datalake-connections/iceberg)
6668

cmd/sling/sling_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ var connMap = map[dbio.Type]connTest{
9494
dbio.TypeDbSQLite: {name: "sqlite", schema: "main"},
9595
dbio.TypeDbD1: {name: "d1", schema: "main"},
9696
dbio.TypeDbSQLServer: {name: "mssql", schema: "dbo", useBulk: g.Bool(false)},
97-
dbio.Type("sqlserver_bcp"): {name: "mssql", schema: "dbo", useBulk: g.Bool(true), adjustCol: g.Bool(false)},
97+
dbio.Type("sqlserver_bcp"): {name: "mssql2022", schema: "dbo", useBulk: g.Bool(true), adjustCol: g.Bool(false)},
98+
dbio.Type("azure_sql"): {name: "azure_sql", schema: "dbo", useBulk: g.Bool(true), adjustCol: g.Bool(false)},
9899
dbio.Type("sqlserver_adbc"): {name: "mssql_adbc", schema: "dbo"},
99100
dbio.Type("sqlserver_odbc"): {name: "mssql_odbc", schema: "dbo", useBulk: g.Bool(false)},
100101
dbio.TypeDbStarRocks: {name: "starrocks"},
@@ -1180,6 +1181,7 @@ func TestSuiteDatabaseSQLServer(t *testing.T) {
11801181
}
11811182
testSuite(t, dbio.Type("sqlserver_adbc"))
11821183
testSuite(t, dbio.Type("sqlserver_odbc"))
1184+
testSuite(t, dbio.Type("azure_sql"))
11831185
}
11841186

11851187
func TestSuiteDatabaseFabric(t *testing.T) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
source: MSSQL
2+
target: AZURE_SQL
3+
4+
hooks:
5+
start:
6+
# Create source tables
7+
- type: query
8+
connection: '{source.name}'
9+
query: |
10+
IF OBJECT_ID('dbo.merge_sc_src1', 'U') IS NOT NULL DROP TABLE dbo.merge_sc_src1;
11+
IF OBJECT_ID('dbo.merge_sc_src2', 'U') IS NOT NULL DROP TABLE dbo.merge_sc_src2;
12+
13+
CREATE TABLE dbo.merge_sc_src1 (
14+
id INT PRIMARY KEY,
15+
name NVARCHAR(100),
16+
status NVARCHAR(50),
17+
update_dt DATETIME2 DEFAULT GETDATE()
18+
);
19+
20+
CREATE TABLE dbo.merge_sc_src2 (
21+
id INT PRIMARY KEY,
22+
name NVARCHAR(100),
23+
status NVARCHAR(50),
24+
update_dt DATETIME2 DEFAULT GETDATE()
25+
);
26+
27+
INSERT INTO dbo.merge_sc_src1 VALUES
28+
(1, N'Alice', N'active', '2024-01-01'),
29+
(2, N'Bob', N'inactive', '2024-01-02'),
30+
(3, N'Charlie', N'active', '2024-01-03'),
31+
(4, N'Diana', N'pending', '2024-01-04'),
32+
(5, N'Eve', N'active', '2024-01-05');
33+
34+
-- src2 has updates for existing rows + new rows (all newer than src1)
35+
INSERT INTO dbo.merge_sc_src2 VALUES
36+
(3, N'Charlie', N'inactive', '2024-02-01'),
37+
(5, N'Eve', N'inactive', '2024-02-02'),
38+
(6, N'Frank', N'active', '2024-02-03'),
39+
(7, N'Grace', N'active', '2024-02-04');
40+
41+
# Drop target table if exists
42+
- type: query
43+
connection: '{target.name}'
44+
query: |
45+
IF OBJECT_ID('dbo.merge_test', 'U') IS NOT NULL DROP TABLE dbo.merge_test;
46+
47+
end:
48+
- type: check
49+
check: execution.status.error == 0
50+
on_failure: break
51+
52+
# Verify target has 7 rows (5 from full-refresh + 2 new from incremental merge)
53+
- type: query
54+
connection: '{target.name}'
55+
query: SELECT COUNT(*) as cnt FROM dbo.merge_test
56+
into: result
57+
58+
- type: check
59+
check: int_parse(store.result[0].cnt) == 7
60+
failure_message: "Row count mismatch: expected 7, got {store.result[0].cnt}"
61+
62+
# Clean up
63+
- type: query
64+
connection: '{source.name}'
65+
query: |
66+
IF OBJECT_ID('dbo.merge_sc_src1', 'U') IS NOT NULL DROP TABLE dbo.merge_sc_src1;
67+
IF OBJECT_ID('dbo.merge_sc_src2', 'U') IS NOT NULL DROP TABLE dbo.merge_sc_src2;
68+
69+
- type: query
70+
connection: '{target.name}'
71+
query: |
72+
IF OBJECT_ID('dbo.merge_test', 'U') IS NOT NULL DROP TABLE dbo.merge_test;
73+
74+
defaults:
75+
mode: incremental
76+
update_key: update_dt
77+
primary_key: id
78+
object: dbo.merge_test
79+
target_options:
80+
use_bulk: true
81+
table_keys:
82+
primary: [id]
83+
84+
streams:
85+
dbo.merge_sc_src1:
86+
mode: full-refresh
87+
88+
dbo.merge_sc_src2:

cmd/sling/tests/suite.cli.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1930,4 +1930,11 @@
19301930
output_contains:
19311931
- 'no new data found in source stream'
19321932
- 'execution succeeded'
1933-
- 'SUCCESS: backfill with 0 rows did not error'
1933+
- 'SUCCESS: backfill with 0 rows did not error'
1934+
1935+
- id: 200
1936+
name: Test MSSQL to AZURE_SQL merge semicolon (full-refresh + incremental)
1937+
run: 'sling run -d -r cmd/sling/tests/replications/r.100.mssql_merge_semicolon.yaml'
1938+
streams: 2
1939+
output_contains:
1940+
- 'execution succeeded'

core/dbio/filesys/fs_s3.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package filesys
22

33
import (
4+
"bufio"
45
"context"
56
"errors"
67
"fmt"
@@ -513,8 +514,12 @@ func (fs *S3FileSysClient) Write(uri string, reader io.Reader) (bw int64, err er
513514
uploader := manager.NewUploader(svc)
514515
uploader.Concurrency = fs.Context().Wg.Limit
515516

516-
// Create pipe to get bytes written
517+
// Create pipe to get bytes written.
518+
// Wrap with a buffered reader (10MB) to prevent deadlocks when the S3
519+
// multipart uploader blocks trying to fill a 5MB chunk while the upstream
520+
// pipeline (CSV → gzip → pipe) is waiting for the pipe to drain.
517521
pr, pw := io.Pipe()
522+
br := bufio.NewReaderSize(pr, 10*1024*1024)
518523
fs.Context().Wg.Write.Add()
519524
go func() {
520525
defer fs.Context().Wg.Write.Done()
@@ -530,7 +535,7 @@ func (fs *S3FileSysClient) Write(uri string, reader io.Reader) (bw int64, err er
530535
_, err = uploader.Upload(fs.Context().Ctx, &s3.PutObjectInput{
531536
Bucket: aws.String(fs.bucket),
532537
Key: aws.String(key),
533-
Body: pr,
538+
Body: br,
534539
ServerSideEncryption: ServerSideEncryption,
535540
SSEKMSKeyId: SSEKMSKeyId,
536541
})

core/dbio/templates/azuresql.yaml

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
core:
2-
drop_table: drop table if exists {table}
3-
drop_view: drop view if exists {view}
2+
drop_table: IF OBJECT_ID(N'{table}', N'U') IS NOT NULL DROP TABLE {table}
3+
drop_view: IF OBJECT_ID(N'{view}', N'V') IS NOT NULL DROP VIEW {view}
4+
drop_index: |
5+
if exists (
6+
select name
7+
from sys.indexes
8+
where name = '{name}' and object_id = OBJECT_ID('{table}')
9+
) drop index {index} on {table}
410
replace: insert into {table} ({fields}) values ({values}) on conflict ({pk_fields}) do update set {set_fields}
511
replace_temp: |
612
insert into {table} ({names})
@@ -9,6 +15,12 @@ core:
915
update {table} as t1 set {set_fields2}
1016
from (select * from {temp_table}) as t2
1117
where {pk_fields_equal}
18+
limit: select top {limit} {fields} from {table}{where_clause}
19+
limit_offset: select top {limit} * from ( select {fields} from {table}{where_clause} order by 1 offset {offset} rows) as t
20+
limit_sql: select top {limit} * from ( {sql} ) as t
21+
incremental_select_limit: select top {limit} {fields} from {table} where ({incremental_where_cond}){where_and} order by {update_key} asc
22+
incremental_select_limit_offset: select top {limit} * from ( select {fields} from {table} where ({incremental_where_cond}){where_and} order by {update_key} asc offset {offset} rows) as t
23+
incremental_select: select {fields} from {table} where ({incremental_where_cond}){where_and}
1224
insert: insert into {table} ({fields}) values ({values})
1325
insert_temp: insert into {table} ({fields}) select {cols} from {temp_table}
1426
insert_ignore: insert into {table} ({fields}) values ({values}) on conflict ({pk_fields}) do nothing
@@ -20,11 +32,6 @@ core:
2032
sample: select {fields} from {table} TABLESAMPLE SYSTEM (50) limit {n}
2133
rename_table: ALTER TABLE {table} RENAME TO {new_table}
2234
rename_column: EXEC sp_rename '{table}.{column}', '{new_column}', 'COLUMN'
23-
limit: select top {limit} {fields} from {table}{where_clause}
24-
limit_offset: select top {limit} * from ( select {fields} from {table}{where_clause} order by 1 offset {offset} rows) as t
25-
limit_sql: select top {limit} * from ( {sql} ) as t
26-
incremental_select_limit: select top {limit} {fields} from {table} where ({incremental_where_cond}){where_and} order by {update_key} asc
27-
incremental_select_limit_offset: select top {limit} * from ( select {fields} from {table} where ({incremental_where_cond}){where_and} order by {update_key} asc offset {offset} rows) as t
2835
bulk_insert: |
2936
BULK INSERT {table}
3037
from '/dev/stdin'
@@ -37,6 +44,36 @@ core:
3744
)
3845
column_names: '{sql}'
3946
add_column: alter table {table} add {column} {type}
47+
alter_columns: alter table {table} alter column {col_ddl}
48+
modify_column: '{column} {type}'
49+
50+
# SQL Server supports all 4 merge strategies
51+
merge_insert: |
52+
INSERT INTO {tgt_table} ({insert_fields})
53+
SELECT {src_fields} FROM {src_table} src
54+
55+
merge_update: |
56+
UPDATE tgt
57+
SET {set_fields}
58+
FROM {tgt_table} tgt
59+
INNER JOIN {src_table} src
60+
ON {src_tgt_pk_equal}
61+
62+
merge_update_insert: |
63+
MERGE INTO {tgt_table} tgt
64+
USING (SELECT {src_fields} FROM {src_table}) src
65+
ON ({src_tgt_pk_equal})
66+
WHEN MATCHED THEN UPDATE SET {set_fields}
67+
WHEN NOT MATCHED THEN INSERT ({insert_fields}) VALUES ({src_insert_fields});
68+
69+
merge_delete_insert: |
70+
DELETE tgt FROM {tgt_table} tgt
71+
WHERE EXISTS (
72+
SELECT 1 FROM {src_table} src
73+
WHERE {src_tgt_pk_equal}
74+
);
75+
INSERT INTO {tgt_table} ({insert_fields})
76+
SELECT {src_fields} FROM {src_table} src
4077
4178
4279
metadata:
@@ -323,8 +360,10 @@ variable:
323360
timestampz_layout: "2006-01-02 15:04:05.0000000', '-07:00"
324361

325362
bind_string: "@p{c}"
326-
bool_as: string
327363
batch_rows: 200
364+
batch_values: 1000
365+
bool_as: string
366+
error_filter_table_exists: already
328367
max_string_type: nvarchar(max)
329368
max_string_length: 4000
330369
max_column_length: 128
@@ -352,6 +391,7 @@ native_type_map:
352391
smallint: smallint
353392
smallmoney: decimal
354393
sql_variant: text
394+
text: text
355395
time: time
356396
timestamp: binary
357397
tinyint: smallint
@@ -377,3 +417,36 @@ general_type_map:
377417
timestampz: datetimeoffset
378418
timez: "varchar()"
379419
uuid: uniqueidentifier
420+
421+
# Schema migration: default value translation between native and generalized forms
422+
default_value_map:
423+
to_general:
424+
"getdate()": "current_timestamp"
425+
"(getdate())": "current_timestamp"
426+
"GETDATE()": "current_timestamp"
427+
"(GETDATE())": "current_timestamp"
428+
"sysdatetime()": "current_timestamp"
429+
"(sysdatetime())": "current_timestamp"
430+
"getutcdate()": "current_timestamp_utc"
431+
"(getutcdate())": "current_timestamp_utc"
432+
"GETUTCDATE()": "current_timestamp_utc"
433+
"(GETUTCDATE())": "current_timestamp_utc"
434+
"newid()": "uuid()"
435+
"(newid())": "uuid()"
436+
"NEWID()": "uuid()"
437+
"(NEWID())": "uuid()"
438+
"newsequentialid()": "uuid()"
439+
"(newsequentialid())": "uuid()"
440+
"((1))": "true"
441+
"((0))": "false"
442+
"(1)": "true"
443+
"(0)": "false"
444+
from_general:
445+
"current_timestamp": "GETDATE()"
446+
"current_timestamp_utc": "GETUTCDATE()"
447+
"current_date": "CAST(GETDATE() AS DATE)"
448+
"current_time": "CAST(GETDATE() AS TIME)"
449+
"uuid()": "NEWID()"
450+
"true": "1"
451+
"false": "0"
452+
"null": "NULL"

core/dbio/templates/fabric.yaml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,33 @@ core:
6464
FILE_TYPE = 'PARQUET'{credential_expr}
6565
)
6666
67+
# SQL Server supports all 4 merge strategies
68+
merge_insert: |
69+
INSERT INTO {tgt_table} ({insert_fields})
70+
SELECT {src_fields} FROM {src_table} src
71+
72+
merge_update: |
73+
UPDATE tgt
74+
SET {set_fields}
75+
FROM {tgt_table} tgt
76+
INNER JOIN {src_table} src
77+
ON {src_tgt_pk_equal}
78+
79+
merge_update_insert: |
80+
MERGE INTO {tgt_table} tgt
81+
USING (SELECT {src_fields} FROM {src_table}) src
82+
ON ({src_tgt_pk_equal})
83+
WHEN MATCHED THEN UPDATE SET {set_fields}
84+
WHEN NOT MATCHED THEN INSERT ({insert_fields}) VALUES ({src_insert_fields});
85+
86+
merge_delete_insert: |
87+
DELETE tgt FROM {tgt_table} tgt
88+
WHERE EXISTS (
89+
SELECT 1 FROM {src_table} src
90+
WHERE {src_tgt_pk_equal}
91+
);
92+
INSERT INTO {tgt_table} ({insert_fields})
93+
SELECT {src_fields} FROM {src_table} src
6794
6895
metadata:
6996
databases: select db_name() as name
@@ -444,3 +471,36 @@ general_type_map:
444471
timestampz: "datetime2(6)"
445472
timez: "varchar()"
446473
uuid: uniqueidentifier
474+
475+
# Schema migration: default value translation between native and generalized forms
476+
default_value_map:
477+
to_general:
478+
"getdate()": "current_timestamp"
479+
"(getdate())": "current_timestamp"
480+
"GETDATE()": "current_timestamp"
481+
"(GETDATE())": "current_timestamp"
482+
"sysdatetime()": "current_timestamp"
483+
"(sysdatetime())": "current_timestamp"
484+
"getutcdate()": "current_timestamp_utc"
485+
"(getutcdate())": "current_timestamp_utc"
486+
"GETUTCDATE()": "current_timestamp_utc"
487+
"(GETUTCDATE())": "current_timestamp_utc"
488+
"newid()": "uuid()"
489+
"(newid())": "uuid()"
490+
"NEWID()": "uuid()"
491+
"(NEWID())": "uuid()"
492+
"newsequentialid()": "uuid()"
493+
"(newsequentialid())": "uuid()"
494+
"((1))": "true"
495+
"((0))": "false"
496+
"(1)": "true"
497+
"(0)": "false"
498+
from_general:
499+
"current_timestamp": "GETDATE()"
500+
"current_timestamp_utc": "GETUTCDATE()"
501+
"current_date": "CAST(GETDATE() AS DATE)"
502+
"current_time": "CAST(GETDATE() AS TIME)"
503+
"uuid()": "NEWID()"
504+
"true": "1"
505+
"false": "0"
506+
"null": "NULL"

0 commit comments

Comments
 (0)