Skip to content
54 changes: 8 additions & 46 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import copy
import importlib.util
import json
import os
import shutil
Expand Down Expand Up @@ -51,51 +50,14 @@ def _generate_module_name(abs_path):


def load_custom_operators(paths):
"""Dynamically load custom operator modules or packages in the specified path."""
for path in paths:
abs_path = os.path.abspath(path)
if os.path.isfile(abs_path):
module_name = _generate_module_name(abs_path)
if module_name in sys.modules:
existing_path = sys.modules[module_name].__file__
raise RuntimeError(
f"Module '{module_name}' already loaded from '{existing_path}'. "
f"Conflict detected while loading '{abs_path}'."
)
try:
spec = importlib.util.spec_from_file_location(module_name, abs_path)
if spec is None:
raise RuntimeError(f"Failed to create spec for '{abs_path}'")
module = importlib.util.module_from_spec(spec)
# register the module first to avoid recursive import issues
sys.modules[module_name] = module
spec.loader.exec_module(module)
except Exception as e:
raise RuntimeError(f"Error loading '{abs_path}' as '{module_name}': {e}")

elif os.path.isdir(abs_path):
if not os.path.isfile(os.path.join(abs_path, "__init__.py")):
raise ValueError(f"Package directory '{abs_path}' must contain __init__.py")
package_name = os.path.basename(abs_path)
parent_dir = os.path.dirname(abs_path)
if package_name in sys.modules:
existing_path = sys.modules[package_name].__path__[0]
raise RuntimeError(
f"Package '{package_name}' already loaded from '{existing_path}'. "
f"Conflict detected while loading '{abs_path}'."
)
original_sys_path = sys.path.copy()
try:
sys.path.insert(0, parent_dir)
importlib.import_module(package_name)
# record the loading path of the package (for subsequent conflict detection)
sys.modules[package_name].__loaded_from__ = abs_path
except Exception as e:
raise RuntimeError(f"Error loading package '{abs_path}': {e}")
finally:
sys.path = original_sys_path
else:
raise ValueError(f"Path '{abs_path}' is neither a file nor a directory")
"""Dynamically load custom operator modules or packages in the specified path.

This is a re-export from ``data_juicer.utils.custom_op`` kept here for
backward compatibility.
"""
from data_juicer.utils.custom_op import load_custom_operators as _impl

_impl(paths)


def build_base_parser() -> ArgumentParser:
Expand Down
45 changes: 28 additions & 17 deletions data_juicer/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ def timing_context(description):

# yapf: disable
with timing_context('Importing operator modules'):
# 1. Built-in operators (registered via @OPERATORS.register_module decorators
# that fire as each sub-package is imported)
# 2. Persistent custom operators (loaded from ~/.data_juicer/custom_op.json;
# no-op when the registry file does not exist)
from data_juicer.utils.custom_op import (
load_persistent_custom_ops as _load_persistent,
)

from . import aggregator, deduplicator, filter, grouper, mapper, pipeline, selector
_load_persistent()
del _load_persistent

from .base_op import (
ATTRIBUTION_FILTERS,
NON_STATS_FILTERS,
Expand All @@ -39,21 +50,21 @@ def timing_context(description):
)

__all__ = [
'load_ops',
'Filter',
'Mapper',
'Deduplicator',
'Selector',
'Grouper',
'Aggregator',
'UNFORKABLE',
'NON_STATS_FILTERS',
'OPERATORS',
'TAGGING_OPS',
'Pipeline',
'OPEnvSpec',
'op_requirements_to_op_env_spec',
'OPEnvManager',
'analyze_lazy_loaded_requirements',
'analyze_lazy_loaded_requirements_for_code_file',
"load_ops",
"Filter",
"Mapper",
"Deduplicator",
"Selector",
"Grouper",
"Aggregator",
"UNFORKABLE",
"NON_STATS_FILTERS",
"OPERATORS",
"TAGGING_OPS",
"Pipeline",
"OPEnvSpec",
"op_requirements_to_op_env_spec",
"OPEnvManager",
"analyze_lazy_loaded_requirements",
"analyze_lazy_loaded_requirements_for_code_file",
]
48 changes: 42 additions & 6 deletions data_juicer/tools/DJ_mcp_granular_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import inspect
import os
import sys
from typing import Annotated, Optional
from typing import Annotated, Optional, get_type_hints

from pydantic import Field

Expand All @@ -13,6 +13,30 @@
fastmcp = LazyLoader("mcp.server.fastmcp", "mcp[cli]")


def resolve_signature_annotations(func, sig: inspect.Signature) -> inspect.Signature:
"""Resolve postponed/string annotations into real runtime types.

When a module uses ``from __future__ import annotations``, all
annotations are stored as strings. This helper calls
``typing.get_type_hints`` on the original callable to obtain the
real type objects and rebuilds the signature with them.
"""
try:
module = sys.modules.get(func.__module__, None) if hasattr(func, "__module__") else None
globalns = module.__dict__ if module else {}
hints = get_type_hints(func, globalns=globalns, localns=globalns)
except Exception:
hints = {}

new_params = []
for name, param in sig.parameters.items():
resolved_annotation = hints.get(name, param.annotation)
new_params.append(param.replace(annotation=resolved_annotation))

return_annotation = hints.get("return", sig.return_annotation)
return sig.replace(parameters=new_params, return_annotation=return_annotation)


# Dynamic MCP Tool Creation
def process_parameter(name: str, param: inspect.Parameter) -> inspect.Parameter:
"""
Expand All @@ -31,13 +55,18 @@ def create_operator_function(op, mcp):
This function dynamically creates a function that can be registered as an MCP tool,
with proper signature and documentation based on the operator's __init__ method.
"""
sig = op["sig"]
raw_sig = op["sig"]
init_func = op.get("init_func")
if init_func is not None:
sig = resolve_signature_annotations(init_func, raw_sig)
else:
sig = raw_sig
docstring = op["desc"]
param_docstring = op["param_desc"]

# Create new function signature with dataset_path as first parameter
# Consider adding other common parameters later, such as export_psth
new_parameters = [
# Consider adding other common parameters later, such as export_path
fixed_params = [
inspect.Parameter("dataset_path", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=str),
inspect.Parameter(
"export_path",
Expand All @@ -51,11 +80,18 @@ def create_operator_function(op, mcp):
annotation=Optional[int],
default=None,
),
] + [
]
op_params = [
process_parameter(name, param)
for name, param in sig.parameters.items()
if name not in ("args", "kwargs", "self")
]
# Merge all params, then reorder: required (no default) first,
# optional (with default) second, to satisfy Python's signature rule.
all_params = fixed_params + op_params
required_params = [p for p in all_params if p.default is inspect.Parameter.empty]
optional_params = [p for p in all_params if p.default is not inspect.Parameter.empty]
new_parameters = required_params + optional_params
new_signature = sig.replace(parameters=new_parameters, return_annotation=str)

def func(*args, **kwargs):
Expand All @@ -66,7 +102,7 @@ def func(*args, **kwargs):
export_path = bound_arguments.arguments.pop("export_path")
dataset_path = bound_arguments.arguments.pop("dataset_path")
np = bound_arguments.arguments.pop("np")
args_dict = {k: v for k, v in bound_arguments.arguments.items() if v}
args_dict = {k: v for k, v in bound_arguments.arguments.items() if v is not None}

dj_cfg = {
"dataset_path": dataset_path,
Expand Down
Loading
Loading