66from seclab_taskflow_agent .mcp_servers .codeql .client import run_query , _debug_log
77
88from pydantic import Field
9- #from mcp.server.fastmcp import FastMCP, Context
10- from fastmcp import FastMCP # use FastMCP 2.0
9+
10+ # from mcp.server.fastmcp import FastMCP, Context
11+ from fastmcp import FastMCP # use FastMCP 2.0
1112from pathlib import Path
1213import os
1314import csv
2324
2425logging .basicConfig (
2526 level = logging .DEBUG ,
26- format = ' %(asctime)s - %(levelname)s - %(message)s' ,
27- filename = log_file_name (' mcp_codeql_python.log' ),
28- filemode = 'a'
27+ format = " %(asctime)s - %(levelname)s - %(message)s" ,
28+ filename = log_file_name (" mcp_codeql_python.log" ),
29+ filemode = "a" ,
2930)
3031
31- MEMORY = mcp_data_dir (' seclab-taskflows' , ' codeql' , ' DATA_DIR' )
32- CODEQL_DBS_BASE_PATH = mcp_data_dir (' seclab-taskflows' , ' codeql' , ' CODEQL_DBS_BASE_PATH' )
32+ MEMORY = mcp_data_dir (" seclab-taskflows" , " codeql" , " DATA_DIR" )
33+ CODEQL_DBS_BASE_PATH = mcp_data_dir (" seclab-taskflows" , " codeql" , " CODEQL_DBS_BASE_PATH" )
3334
3435mcp = FastMCP ("CodeQL-Python" )
3536
3637# tool name -> templated query lookup for supported languages
3738TEMPLATED_QUERY_PATHS = {
3839 # to add a language, port the templated query pack and add its definition here
39- 'python' : {
40- 'remote_sources' : 'queries/mcp-python/remote_sources.ql'
41- }
40+ "python" : {"remote_sources" : "queries/mcp-python/remote_sources.ql" }
4241}
4342
4443
@@ -49,9 +48,10 @@ def source_to_dict(result):
4948 "source_location" : result .source_location ,
5049 "line" : result .line ,
5150 "source_type" : result .source_type ,
52- "notes" : result .notes
51+ "notes" : result .notes ,
5352 }
5453
54+
5555def _resolve_query_path (language : str , query : str ) -> Path :
5656 global TEMPLATED_QUERY_PATHS
5757 if language not in TEMPLATED_QUERY_PATHS :
@@ -66,7 +66,7 @@ def _resolve_db_path(relative_db_path: str | Path):
6666 global CODEQL_DBS_BASE_PATH
6767 # path joins will return "/B" if "/A" / "////B" etc. as well
6868 # not windows compatible and probably needs additional hardening
69- relative_db_path = str (relative_db_path ).strip ().lstrip ('/' )
69+ relative_db_path = str (relative_db_path ).strip ().lstrip ("/" )
7070 relative_db_path = Path (relative_db_path )
7171 absolute_path = (CODEQL_DBS_BASE_PATH / relative_db_path ).resolve ()
7272 if not absolute_path .is_relative_to (CODEQL_DBS_BASE_PATH .resolve ()):
@@ -76,36 +76,38 @@ def _resolve_db_path(relative_db_path: str | Path):
7676 raise RuntimeError (f"Error: Database not found at { absolute_path } !" )
7777 return str (absolute_path )
7878
79+
7980# This sqlite database is specifically made for CodeQL for Python MCP.
8081class CodeqlSqliteBackend :
8182 def __init__ (self , memcache_state_dir : str ):
8283 self .memcache_state_dir = memcache_state_dir
8384 if not Path (self .memcache_state_dir ).exists ():
84- db_dir = ' sqlite://'
85+ db_dir = " sqlite://"
8586 else :
86- db_dir = f' sqlite:///{ self .memcache_state_dir } /codeql_sqlite.db'
87+ db_dir = f" sqlite:///{ self .memcache_state_dir } /codeql_sqlite.db"
8788 self .engine = create_engine (db_dir , echo = False )
8889 Base .metadata .create_all (self .engine , tables = [Source .__table__ ])
8990
90-
91- def store_new_source (self , repo , source_location , line , source_type , notes , update = False ):
91+ def store_new_source (self , repo , source_location , line , source_type , notes , update = False ):
9292 with Session (self .engine ) as session :
93- existing = session .query (Source ).filter_by (repo = repo , source_location = source_location , line = line ).first ()
93+ existing = session .query (Source ).filter_by (repo = repo , source_location = source_location , line = line ).first ()
9494 if existing :
9595 existing .notes = (existing .notes or "" ) + notes
9696 session .commit ()
9797 return f"Updated notes for source at { source_location } , line { line } in { repo } ."
9898 else :
9999 if update :
100100 return f"No source exists at repo { repo } , location { source_location } , line { line } to update."
101- new_source = Source (repo = repo , source_location = source_location , line = line , source_type = source_type , notes = notes )
101+ new_source = Source (
102+ repo = repo , source_location = source_location , line = line , source_type = source_type , notes = notes
103+ )
102104 session .add (new_source )
103105 session .commit ()
104106 return f"Added new source for { source_location } in { repo } ."
105107
106108 def get_sources (self , repo ):
107109 with Session (self .engine ) as session :
108- results = session .query (Source ).filter_by (repo = repo ).all ()
110+ results = session .query (Source ).filter_by (repo = repo ).all ()
109111 sources = [source_to_dict (source ) for source in results ]
110112 return sources
111113
@@ -119,8 +121,8 @@ def _csv_parse(raw):
119121 if i == 0 :
120122 continue
121123 # col1 has what we care about, but offer flexibility
122- keys = row [1 ].split (',' )
123- this_obj = {' description' : row [0 ].format (* row [2 :])}
124+ keys = row [1 ].split ("," )
125+ this_obj = {" description" : row [0 ].format (* row [2 :])}
124126 for j , k in enumerate (keys ):
125127 this_obj [k .strip ()] = row [j + 2 ]
126128 results .append (this_obj )
@@ -141,27 +143,32 @@ def _run_query(query_name: str, database_path: str, language: str, template_valu
141143 except RuntimeError :
142144 return f"The query { query_name } is not supported for language: { language } "
143145 try :
144- csv = run_query (Path (__file__ ).parent .resolve () /
145- query_path ,
146- database_path ,
147- fmt = 'csv' ,
148- template_values = template_values ,
149- log_stderr = True )
146+ csv = run_query (
147+ Path (__file__ ).parent .resolve () / query_path ,
148+ database_path ,
149+ fmt = "csv" ,
150+ template_values = template_values ,
151+ log_stderr = True ,
152+ )
150153 return _csv_parse (csv )
151154 except Exception as e :
152155 return f"The query { query_name } encountered an error: { e } "
153156
157+
154158backend = CodeqlSqliteBackend (MEMORY )
155159
160+
156161@mcp .tool ()
157- def remote_sources (owner : str = Field (description = "The owner of the GitHub repository" ),
158- repo : str = Field (description = "The name of the GitHub repository" ),
159- database_path : str = Field (description = "The CodeQL database path." ),
160- language : str = Field (description = "The language used for the CodeQL database." )):
162+ def remote_sources (
163+ owner : str = Field (description = "The owner of the GitHub repository" ),
164+ repo : str = Field (description = "The name of the GitHub repository" ),
165+ database_path : str = Field (description = "The CodeQL database path." ),
166+ language : str = Field (description = "The language used for the CodeQL database." ),
167+ ):
161168 """List all remote sources and their locations in a CodeQL database, then store the results in a database."""
162169
163170 repo = process_repo (owner , repo )
164- results = _run_query (' remote_sources' , database_path , language , {})
171+ results = _run_query (" remote_sources" , database_path , language , {})
165172
166173 # Check if results is an error (list of strings) or valid data (list of dicts)
167174 if isinstance (results , str ):
@@ -172,53 +179,67 @@ def remote_sources(owner: str = Field(description="The owner of the GitHub repos
172179 for result in results :
173180 backend .store_new_source (
174181 repo = repo ,
175- source_location = result .get (' location' , '' ),
176- source_type = result .get (' source' , '' ),
177- line = int (result .get (' line' , '0' )),
178- notes = None , # result.get('description', ''),
179- update = False
182+ source_location = result .get (" location" , "" ),
183+ source_type = result .get (" source" , "" ),
184+ line = int (result .get (" line" , "0" )),
185+ notes = None , # result.get('description', ''),
186+ update = False ,
180187 )
181188 stored_count += 1
182189
183190 return f"Stored { stored_count } remote sources in { repo } ."
184191
192+
185193@mcp .tool ()
186- def fetch_sources (owner : str = Field (description = "The owner of the GitHub repository" ),
187- repo : str = Field (description = "The name of the GitHub repository" )):
194+ def fetch_sources (
195+ owner : str = Field (description = "The owner of the GitHub repository" ),
196+ repo : str = Field (description = "The name of the GitHub repository" ),
197+ ):
188198 """
189199 Fetch all sources from the repo
190200 """
191201 repo = process_repo (owner , repo )
192202 return json .dumps (backend .get_sources (repo ))
193203
204+
194205@mcp .tool ()
195- def add_source_notes (owner : str = Field (description = "The owner of the GitHub repository" ),
196- repo : str = Field (description = "The name of the GitHub repository" ),
197- source_location : str = Field (description = "The path to the file" ),
198- line : int = Field (description = "The line number of the source" ),
199- notes : str = Field (description = "The notes to append to this source" )):
206+ def add_source_notes (
207+ owner : str = Field (description = "The owner of the GitHub repository" ),
208+ repo : str = Field (description = "The name of the GitHub repository" ),
209+ source_location : str = Field (description = "The path to the file" ),
210+ line : int = Field (description = "The line number of the source" ),
211+ notes : str = Field (description = "The notes to append to this source" ),
212+ ):
200213 """
201214 Add new notes to an existing source. The notes will be appended to any existing notes.
202215 """
203216 repo = process_repo (owner , repo )
204- return backend .store_new_source (repo = repo , source_location = source_location , line = line , source_type = "" , notes = notes , update = True )
217+ return backend .store_new_source (
218+ repo = repo , source_location = source_location , line = line , source_type = "" , notes = notes , update = True
219+ )
220+
205221
206222@mcp .tool ()
207- def clear_codeql_repo (owner : str = Field (description = "The owner of the GitHub repository" ),
208- repo : str = Field (description = "The name of the GitHub repository" )):
223+ def clear_codeql_repo (
224+ owner : str = Field (description = "The owner of the GitHub repository" ),
225+ repo : str = Field (description = "The name of the GitHub repository" ),
226+ ):
209227 """
210228 Clear all data for a given repo from the database
211229 """
212230 repo = process_repo (owner , repo )
213231 with Session (backend .engine ) as session :
214- deleted_sources = session .query (Source ).filter_by (repo = repo ).delete ()
232+ deleted_sources = session .query (Source ).filter_by (repo = repo ).delete ()
215233 session .commit ()
216234 return f"Cleared { deleted_sources } sources from repo { repo } ."
217235
236+
218237if __name__ == "__main__" :
219238 # Check if codeql/python-all pack is installed, if not install it
220- if not os .path .isdir ('/.codeql/packages/codeql/python-all' ):
221- pack_path = importlib .resources .files ('seclab_taskflows.mcp_servers.codeql_python.queries' ).joinpath ('mcp-python' )
239+ if not os .path .isdir ("/.codeql/packages/codeql/python-all" ):
240+ pack_path = importlib .resources .files ("seclab_taskflows.mcp_servers.codeql_python.queries" ).joinpath (
241+ "mcp-python"
242+ )
222243 print (f"Installing CodeQL pack from { pack_path } " )
223244 subprocess .run (["codeql" , "pack" , "install" , pack_path ])
224245 mcp .run (show_banner = False , transport = "http" , host = "127.0.0.1" , port = 9998 )
0 commit comments