1- import csv
21import glob
32import os
3+ import re
44from datetime import datetime
5+ from pathlib import Path
6+ from typing import Any
57
68from sqlcompare .config import get_tests_folder , load_test_runs
79from sqlcompare .log import log
810from sqlcompare .utils .format import format_table
911
12+ SUMMARY_TAB_LIMIT = 200
13+ XLSX_MAX_ROWS = 1_048_576
14+ XLSX_MAX_COLUMNS = 16_384
15+
1016
1117def find_diff_file (diff_id : str ) -> str | None :
1218 tests_folder = get_tests_folder ()
@@ -45,7 +51,6 @@ def _display(
4551 rows : list [tuple ],
4652 column : str | None ,
4753 limit : int ,
48- save : bool ,
4954 diff_id : str ,
5055 * ,
5156 is_stats : bool = False ,
@@ -106,12 +111,6 @@ def _display(
106111 log .info (format_table (columns , display_rows ))
107112 if len (filtered_rows ) > limit :
108113 log .info (f"💡 Use --limit { len (filtered_rows )} to see all results" )
109- if save :
110- timestamp = datetime .now ().strftime ("%Y%m%d_%H%M%S" )
111- column_suffix = f"_{ column } " if column else ""
112- output_file = f"analysis_{ diff_id } { column_suffix } _{ timestamp } .csv"
113- _write_csv (output_file , columns , filtered_rows )
114- log .info (f"💾 Results saved to: { output_file } " )
115114
116115
117116def _column_index (columns : list [str ], name : str ) -> int | None :
@@ -122,11 +121,87 @@ def _column_index(columns: list[str], name: str) -> int | None:
122121 return None
123122
124123
125- def _write_csv (path : str , columns : list [str ], rows : list [tuple ]) -> None :
126- with open (path , "w" , newline = "" , encoding = "utf-8" ) as handle :
127- writer = csv .writer (handle )
128- writer .writerow (columns )
129- writer .writerows (rows )
124+ def resolve_report_path (
125+ diff_id : str , save_mode : str , file_path : str | None = None
126+ ) -> Path :
127+ if file_path :
128+ path = Path (file_path ).expanduser ()
129+ else :
130+ timestamp = datetime .now ().strftime ("%Y%m%d_%H%M%S" )
131+ path = Path .cwd () / f"inspect_report_{ diff_id } _{ save_mode } _{ timestamp } .xlsx"
132+
133+ if path .suffix == "" :
134+ path = path .with_suffix (".xlsx" )
135+ if path .suffix .lower () != ".xlsx" :
136+ raise ValueError ("Inspect report file must use the .xlsx extension." )
137+ path .parent .mkdir (parents = True , exist_ok = True )
138+ return path
139+
140+
141+ def write_inspect_report_xlsx (
142+ output_path : Path ,
143+ overview_rows : list [tuple [Any , Any , Any ]],
144+ column_diffs : dict [str , tuple [list [str ], list [tuple [Any , ...]]]],
145+ sql_reference_rows : list [tuple [str , str , str ]],
146+ ) -> dict [str , int ]:
147+ try :
148+ from openpyxl import Workbook
149+ except ImportError as exc :
150+ raise RuntimeError (
151+ "openpyxl is required for inspect report export. Install sqlcompare with openpyxl available."
152+ ) from exc
153+
154+ workbook = Workbook ()
155+ default_sheet = workbook .active
156+ workbook .remove (default_sheet )
157+
158+ truncated_rows = 0
159+
160+ overview_sheet = workbook .create_sheet (title = "Overview" )
161+ overview_sheet .append (["Section" , "Name" , "Value" ])
162+ for row in overview_rows :
163+ overview_sheet .append (list (row [:3 ]))
164+
165+ used_sheet_names = {"Overview" , "SQL Reference" }
166+ for column_name , (headers , rows ) in column_diffs .items ():
167+ sheet_name = _unique_sheet_name (column_name , used_sheet_names )
168+ used_sheet_names .add (sheet_name )
169+ sheet = workbook .create_sheet (title = sheet_name )
170+
171+ clipped_headers = [str (h ) for h in headers [:XLSX_MAX_COLUMNS ]]
172+ sheet .append (clipped_headers )
173+
174+ max_data_rows = XLSX_MAX_ROWS - 1
175+ clipped_rows = rows [:max_data_rows ]
176+ for row in clipped_rows :
177+ sheet .append (list (row [:XLSX_MAX_COLUMNS ]))
178+
179+ if len (rows ) > max_data_rows :
180+ truncated_rows += len (rows ) - max_data_rows
181+
182+ sql_sheet = workbook .create_sheet (title = "SQL Reference" )
183+ sql_sheet .append (["Category" , "Name" , "SQL" ])
184+ for row in sql_reference_rows :
185+ sql_sheet .append (list (row [:3 ]))
186+
187+ workbook .save (output_path )
188+ return {"truncated_rows" : truncated_rows }
189+
190+
191+ def _unique_sheet_name (raw_name : str , used : set [str ]) -> str :
192+ cleaned = re .sub (r"[\[\]\*\:/\\\?]" , "_" , raw_name ).strip () or "Sheet"
193+ cleaned = cleaned [:31 ]
194+ if cleaned not in used :
195+ return cleaned
196+
197+ suffix = 1
198+ while True :
199+ candidate_suffix = f"_{ suffix } "
200+ base = cleaned [: 31 - len (candidate_suffix )]
201+ candidate = f"{ base } { candidate_suffix } "
202+ if candidate not in used :
203+ return candidate
204+ suffix += 1
130205
131206
132207def list_available_diffs ():
0 commit comments