+
LinkedIn MCP Server
+
Enter the server password to authorize this connection.
+ {error_html}
+
+
+
+"""
diff --git a/linkedin_mcp_server/authentication.py b/linkedin_mcp_server/authentication.py
new file mode 100644
index 00000000..bdd8c30d
--- /dev/null
+++ b/linkedin_mcp_server/authentication.py
@@ -0,0 +1,86 @@
+"""
+Authentication logic for LinkedIn MCP Server.
+
+Handles LinkedIn session management with persistent browser profile.
+"""
+
+import logging
+import shutil
+from pathlib import Path
+
+from linkedin_mcp_server.session_state import (
+ clear_auth_state as clear_all_auth_state,
+ get_source_profile_dir,
+ portable_cookie_path,
+ profile_exists,
+ source_state_path,
+ load_source_state,
+)
+from linkedin_mcp_server.exceptions import CredentialsNotFoundError
+
+logger = logging.getLogger(__name__)
+
+
+def get_authentication_source() -> bool:
+ """
+ Check if authentication is available via persistent profile.
+
+ Returns:
+ True if profile exists
+
+ Raises:
+ CredentialsNotFoundError: If no authentication method available
+ """
+ profile_dir = get_source_profile_dir()
+ cookies_path = portable_cookie_path(profile_dir)
+ source_state = load_source_state(profile_dir)
+ if profile_exists(profile_dir) and cookies_path.exists() and source_state:
+ logger.info("Using source profile from %s", profile_dir)
+ return True
+
+ if profile_exists(profile_dir) or cookies_path.exists():
+ raise CredentialsNotFoundError(
+ "LinkedIn source session metadata is missing or incomplete.\n\n"
+ f"Expected source metadata: {source_state_path(profile_dir)}\n"
+ f"Expected portable cookies: {cookies_path}\n\n"
+ "Run with --login to create a fresh source session generation."
+ )
+
+ raise CredentialsNotFoundError(
+ "No LinkedIn source session found.\n\n"
+ "Options:\n"
+ " 1. Run with --login to create a source browser profile (recommended)\n"
+ " 2. Run with --no-headless to login interactively\n\n"
+ "For Docker users:\n"
+ " Create profile on host first: uv run -m linkedin_mcp_server --login\n"
+ " Then mount into Docker: -v ~/.linkedin-mcp:/home/pwuser/.linkedin-mcp"
+ )
+
+
+def clear_profile(profile_dir: Path | None = None) -> bool:
+ """
+ Clear stored browser profile directory.
+
+ Args:
+ profile_dir: Path to profile directory
+
+ Returns:
+ True if clearing was successful
+ """
+ if profile_dir is None:
+ profile_dir = get_source_profile_dir()
+
+ if profile_dir.exists():
+ try:
+ shutil.rmtree(profile_dir)
+ logger.info(f"Profile cleared from {profile_dir}")
+ return True
+ except OSError as e:
+ logger.warning(f"Could not clear profile: {e}")
+ return False
+ return True
+
+
+def clear_auth_state(profile_dir: Path | None = None) -> bool:
+ """Clear source session artifacts and all derived runtime sessions."""
+ return clear_all_auth_state(profile_dir or get_source_profile_dir())
diff --git a/linkedin_mcp_server/callbacks.py b/linkedin_mcp_server/callbacks.py
new file mode 100644
index 00000000..be087a85
--- /dev/null
+++ b/linkedin_mcp_server/callbacks.py
@@ -0,0 +1,51 @@
+"""
+Progress callbacks for MCP tools.
+
+Provides callback implementations that report progress for LinkedIn scraping
+operations to MCP clients via FastMCP Context.
+"""
+
+from typing import Any
+
+from fastmcp import Context
+
+
+class ProgressCallback:
+ """Base callback class for progress tracking."""
+
+ async def on_start(self, scraper_type: str, url: str) -> None:
+ pass
+
+ async def on_progress(self, message: str, percent: int) -> None:
+ pass
+
+ async def on_complete(self, scraper_type: str, result: Any) -> None:
+ pass
+
+ async def on_error(self, error: Exception) -> None:
+ pass
+
+
+class MCPContextProgressCallback(ProgressCallback):
+ """Callback that reports progress to MCP clients via FastMCP Context."""
+
+ def __init__(self, ctx: Context):
+ self.ctx = ctx
+
+ async def on_start(self, scraper_type: str, url: str) -> None:
+ """Report start to MCP client."""
+ await self.ctx.report_progress(
+ progress=0, total=100, message=f"Starting {scraper_type}"
+ )
+
+ async def on_progress(self, message: str, percent: int) -> None:
+ """Report progress to MCP client."""
+ await self.ctx.report_progress(progress=percent, total=100, message=message)
+
+ async def on_complete(self, scraper_type: str, result: Any) -> None:
+ """Report completion to MCP client."""
+ await self.ctx.report_progress(progress=100, total=100, message="Complete")
+
+ async def on_error(self, error: Exception) -> None:
+ """Report error to MCP client."""
+ await self.ctx.report_progress(progress=0, total=100, message=f"Error: {error}")
diff --git a/linkedin_mcp_server/cli_main.py b/linkedin_mcp_server/cli_main.py
new file mode 100644
index 00000000..ca3e9a02
--- /dev/null
+++ b/linkedin_mcp_server/cli_main.py
@@ -0,0 +1,426 @@
+"""
+LinkedIn MCP Server - Main CLI application entry point.
+
+Implements a simplified two-phase startup:
+1. Authentication Check - Verify browser profile is available
+2. Server Runtime - MCP server startup with transport selection
+"""
+
+import asyncio
+import logging
+import sys
+from typing import Literal
+
+import inquirer
+
+from linkedin_mcp_server.core import AuthenticationError, RateLimitError
+
+from linkedin_mcp_server.authentication import (
+ clear_auth_state,
+ get_authentication_source,
+)
+from linkedin_mcp_server.config import get_config
+from linkedin_mcp_server.drivers.browser import (
+ experimental_persist_derived_runtime,
+ close_browser,
+ get_or_create_browser,
+ get_profile_dir,
+ profile_exists,
+ set_headless,
+)
+from linkedin_mcp_server.debug_trace import should_keep_traces
+from linkedin_mcp_server.exceptions import CredentialsNotFoundError
+from linkedin_mcp_server.logging_config import configure_logging, teardown_trace_logging
+from linkedin_mcp_server.session_state import (
+ get_runtime_id,
+ load_runtime_state,
+ load_source_state,
+ portable_cookie_path,
+ runtime_profile_dir,
+ runtime_storage_state_path,
+ source_state_path,
+)
+from linkedin_mcp_server.server import create_mcp_server
+from linkedin_mcp_server.setup import run_interactive_setup, run_profile_creation
+
+logger = logging.getLogger(__name__)
+
+
+def choose_transport_interactive() -> Literal["stdio", "streamable-http"]:
+ """Prompt user for transport mode using inquirer."""
+ questions = [
+ inquirer.List(
+ "transport",
+ message="Choose mcp transport mode",
+ choices=[
+ ("stdio (Default CLI mode)", "stdio"),
+ ("streamable-http (HTTP server mode)", "streamable-http"),
+ ],
+ default="stdio",
+ )
+ ]
+ answers = inquirer.prompt(questions)
+
+ if not answers:
+ raise KeyboardInterrupt("Transport selection cancelled by user")
+
+ return answers["transport"]
+
+
+def clear_profile_and_exit() -> None:
+ """Clear LinkedIn browser profile and exit."""
+ config = get_config()
+
+ configure_logging(
+ log_level=config.server.log_level,
+ json_format=not config.is_interactive and config.server.log_level != "DEBUG",
+ )
+
+ version = get_version()
+ logger.info(f"LinkedIn MCP Server v{version} - Profile Clear mode")
+
+ auth_root = get_profile_dir().parent
+
+ if not (
+ profile_exists(get_profile_dir())
+ or portable_cookie_path(get_profile_dir()).exists()
+ or source_state_path(get_profile_dir()).exists()
+ ):
+ print("âšī¸ No authentication state found")
+ print("Nothing to clear.")
+ sys.exit(0)
+
+ print(f"đ Clear LinkedIn authentication state from {auth_root}?")
+
+ try:
+ confirmation = (
+ input("Are you sure you want to clear the profile? (y/N): ").strip().lower()
+ )
+ if confirmation not in ("y", "yes"):
+ print("â Operation cancelled")
+ sys.exit(0)
+ except KeyboardInterrupt:
+ print("\nâ Operation cancelled")
+ sys.exit(0)
+
+ if clear_auth_state(get_profile_dir()):
+ print("â
LinkedIn authentication state cleared successfully!")
+ else:
+ print("â Failed to clear authentication state")
+ sys.exit(1)
+
+ sys.exit(0)
+
+
+def get_profile_and_exit() -> None:
+ """Create profile interactively and exit."""
+ config = get_config()
+
+ configure_logging(
+ log_level=config.server.log_level,
+ json_format=not config.is_interactive and config.server.log_level != "DEBUG",
+ )
+
+ version = get_version()
+ logger.info(f"LinkedIn MCP Server v{version} - Session Creation mode")
+
+ user_data_dir = config.browser.user_data_dir
+ success = run_profile_creation(user_data_dir)
+
+ sys.exit(0 if success else 1)
+
+
+def profile_info_and_exit() -> None:
+ """Check profile validity and display info, then exit."""
+ config = get_config()
+
+ configure_logging(
+ log_level=config.server.log_level,
+ json_format=not config.is_interactive and config.server.log_level != "DEBUG",
+ )
+
+ version = get_version()
+ logger.info(f"LinkedIn MCP Server v{version} - Session Info mode")
+
+ profile_dir = get_profile_dir()
+ cookies_path = portable_cookie_path(profile_dir)
+ source_state = load_source_state(profile_dir)
+ current_runtime = get_runtime_id()
+
+ if not source_state or not profile_exists(profile_dir) or not cookies_path.exists():
+ print(f"â No valid source session found at {profile_dir}")
+ print(" Run with --login to create a source session")
+ sys.exit(1)
+
+ print(f"Current runtime: {current_runtime}")
+ print(f"Source runtime: {source_state.source_runtime_id}")
+ print(f"Login generation: {source_state.login_generation}")
+
+ runtime_state = None
+ runtime_profile = None
+ runtime_storage_state = None
+ bridge_required = False
+
+ if current_runtime == source_state.source_runtime_id:
+ print(f"Profile mode: source ({profile_dir})")
+ else:
+ runtime_state = load_runtime_state(current_runtime, profile_dir)
+ runtime_profile = runtime_profile_dir(current_runtime, profile_dir)
+ runtime_storage_state = runtime_storage_state_path(current_runtime, profile_dir)
+ if not experimental_persist_derived_runtime():
+ bridge_required = True
+ print("Profile mode: foreign runtime (fresh bridge each startup)")
+ if runtime_profile.exists():
+ print(
+ f"Derived runtime cache present but ignored by default: {runtime_profile}"
+ )
+ else:
+ if (
+ runtime_state
+ and runtime_state.source_login_generation
+ == source_state.login_generation
+ and profile_exists(runtime_profile)
+ and runtime_storage_state.exists()
+ ):
+ print(
+ f"Profile mode: derived (committed, current generation) ({runtime_profile})"
+ )
+ else:
+ bridge_required = True
+ state = "stale generation" if runtime_state else "missing"
+ print(f"Profile mode: derived ({state})")
+ print(
+ "Storage snapshot: "
+ f"{runtime_storage_state if runtime_storage_state and runtime_storage_state.exists() else 'missing'}"
+ )
+
+ async def check_session() -> bool:
+ try:
+ set_headless(True) # Always check headless
+ browser = await get_or_create_browser()
+ return browser.is_authenticated
+ except AuthenticationError:
+ return False
+ except Exception as e:
+ logger.exception(f"Unexpected error checking session: {e}")
+ raise
+ finally:
+ await close_browser()
+
+ if bridge_required:
+ if experimental_persist_derived_runtime():
+ print(
+ "âšī¸ A derived runtime profile will be created and checkpoint-committed on the next server startup."
+ )
+ else:
+ print(
+ "âšī¸ A fresh bridged foreign-runtime session will be created on the next server startup."
+ )
+ print(
+ "âšī¸ Source cookie validity is not verified in this mode. Run the server to test the bridge end-to-end."
+ )
+ sys.exit(0)
+
+ try:
+ valid = asyncio.run(check_session())
+ except Exception as e:
+ print(f"â Could not validate session: {e}")
+ print(" Check logs and browser configuration.")
+ sys.exit(1)
+
+ active_profile = profile_dir if runtime_profile is None else runtime_profile
+ if valid:
+ print(f"â
Session is valid (profile: {active_profile})")
+ sys.exit(0)
+
+ print(f"â Session expired or invalid (profile: {active_profile})")
+ print(" Run with --login to re-authenticate")
+ sys.exit(1)
+
+
+def ensure_authentication_ready() -> None:
+ """
+ Phase 1: Ensure authentication is ready.
+
+ Checks for existing browser profile.
+ If not found, runs interactive setup in interactive mode.
+
+ Raises:
+ CredentialsNotFoundError: If authentication setup fails
+ """
+ config = get_config()
+
+ # Check for existing profile
+ try:
+ get_authentication_source()
+ return
+
+ except CredentialsNotFoundError:
+ pass
+
+ # No authentication found - try interactive setup if possible
+ if not config.is_interactive:
+ raise CredentialsNotFoundError(
+ "No LinkedIn profile found.\n"
+ "Options:\n"
+ " 1. Run with --login to create a profile\n"
+ " 2. Run with --no-headless to login interactively"
+ )
+
+ # Run interactive setup
+ logger.info("No authentication found, starting interactive setup...")
+ success = run_interactive_setup()
+
+ if not success:
+ raise CredentialsNotFoundError("Interactive setup was cancelled or failed")
+
+
+def get_version() -> str:
+ """Get version from installed metadata with a source fallback."""
+ try:
+ from importlib.metadata import PackageNotFoundError, version
+
+ for package_name in ("linkedin-scraper-mcp", "linkedin-mcp-server"):
+ try:
+ return version(package_name)
+ except PackageNotFoundError:
+ continue
+ except Exception:
+ pass
+
+ try:
+ import os
+ import tomllib
+
+ pyproject_path = os.path.join(
+ os.path.dirname(os.path.dirname(__file__)), "pyproject.toml"
+ )
+ with open(pyproject_path, "rb") as f:
+ data = tomllib.load(f)
+ return data["project"]["version"]
+ except Exception:
+ return "unknown"
+
+
+def main() -> None:
+ """Main application entry point."""
+ config = get_config()
+
+ # Configure logging
+ configure_logging(
+ log_level=config.server.log_level,
+ json_format=not config.is_interactive and config.server.log_level != "DEBUG",
+ )
+
+ version = get_version()
+
+ # Print banner in interactive mode
+ if config.is_interactive:
+ print(f"đ LinkedIn MCP Server v{version} đ")
+ print("=" * 40)
+
+ logger.info(f"LinkedIn MCP Server v{version}")
+
+ try:
+ # Set headless mode from config
+ set_headless(config.browser.headless)
+
+ # Handle --logout flag
+ if config.server.logout:
+ clear_profile_and_exit()
+
+ # Handle --login flag
+ if config.server.login:
+ get_profile_and_exit()
+
+ # Handle --status flag
+ if config.server.status:
+ profile_info_and_exit()
+
+ logger.debug(f"Server configuration: {config}")
+
+ # Phase 1: Ensure Authentication is Ready
+ try:
+ ensure_authentication_ready()
+ if config.is_interactive:
+ print("â
Authentication ready")
+ logger.info("Authentication ready")
+
+ except CredentialsNotFoundError as e:
+ logger.error(f"Authentication setup failed: {e}")
+ if config.is_interactive:
+ print("\nâ Authentication required")
+ print(str(e))
+ sys.exit(1)
+
+ except KeyboardInterrupt:
+ if config.is_interactive:
+ print("\n\nđ Setup cancelled by user")
+ sys.exit(0)
+
+ except (AuthenticationError, RateLimitError) as e:
+ logger.error(f"LinkedIn error during setup: {e}")
+ if config.is_interactive:
+ print(f"\nâ {str(e)}")
+ sys.exit(1)
+
+ except Exception as e:
+ logger.exception(f"Unexpected error during authentication setup: {e}")
+ if config.is_interactive:
+ print(f"\nâ Setup failed: {e}")
+ sys.exit(1)
+
+ # Phase 2: Server Runtime
+ try:
+ transport = config.server.transport
+
+ # Prompt for transport in interactive mode if not explicitly set
+ if config.is_interactive and not config.server.transport_explicitly_set:
+ print("\nđ Server ready! Choose transport mode:")
+ transport = choose_transport_interactive()
+
+ # Create and run the MCP server
+ mcp = create_mcp_server(oauth_config=config.server.oauth)
+
+ if transport == "streamable-http":
+ mcp.run(
+ transport=transport,
+ host=config.server.host,
+ port=config.server.port,
+ path=config.server.path,
+ )
+ else:
+ mcp.run(transport=transport)
+
+ except KeyboardInterrupt:
+ exit_gracefully(0)
+
+ except Exception as e:
+ logger.exception(f"Server runtime error: {e}")
+ if config.is_interactive:
+ print(f"\nâ Server error: {e}")
+ exit_gracefully(1)
+ finally:
+ teardown_trace_logging(keep_traces=should_keep_traces())
+
+
+def exit_gracefully(exit_code: int = 0) -> None:
+ """Exit the application gracefully with browser cleanup."""
+ try:
+ asyncio.run(close_browser())
+ except Exception:
+ pass # Best effort cleanup
+ sys.exit(exit_code)
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ exit_gracefully(0)
+ except Exception as e:
+ logger.exception(
+ f"Error running MCP server: {e}",
+ extra={"exception_type": type(e).__name__, "exception_message": str(e)},
+ )
+ exit_gracefully(1)
diff --git a/linkedin_mcp_server/common_utils.py b/linkedin_mcp_server/common_utils.py
new file mode 100644
index 00000000..91c486fb
--- /dev/null
+++ b/linkedin_mcp_server/common_utils.py
@@ -0,0 +1,16 @@
+"""Small shared helpers used across diagnostics and session-state modules."""
+
+from __future__ import annotations
+
+from datetime import UTC, datetime
+import re
+
+
+def slugify_fragment(value: str) -> str:
+ """Return a lowercase URL/file-safe fragment."""
+ return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
+
+
+def utcnow_iso() -> str:
+ """Return the current UTC timestamp in a compact ISO-8601 form."""
+ return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
diff --git a/linkedin_mcp_server/config/__init__.py b/linkedin_mcp_server/config/__init__.py
new file mode 100644
index 00000000..d82b3b77
--- /dev/null
+++ b/linkedin_mcp_server/config/__init__.py
@@ -0,0 +1,42 @@
+"""
+Configuration system for LinkedIn MCP Server.
+
+Provides a singleton pattern for configuration management with
+loading from CLI arguments and environment variables.
+"""
+
+import logging
+
+from .loaders import load_config
+from .schema import AppConfig, BrowserConfig, OAuthConfig, ServerConfig
+
+logger = logging.getLogger(__name__)
+
+# Singleton pattern for configuration
+_config: AppConfig | None = None
+
+
+def get_config() -> AppConfig:
+ """Get the application configuration, initializing it if needed."""
+ global _config
+ if _config is None:
+ _config = load_config()
+ logger.debug("Configuration loaded")
+ return _config
+
+
+def reset_config() -> None:
+ """Reset the configuration to force reloading."""
+ global _config
+ _config = None
+ logger.debug("Configuration reset")
+
+
+__all__ = [
+ "AppConfig",
+ "BrowserConfig",
+ "OAuthConfig",
+ "ServerConfig",
+ "get_config",
+ "reset_config",
+]
diff --git a/linkedin_mcp_server/config/loaders.py b/linkedin_mcp_server/config/loaders.py
new file mode 100644
index 00000000..ce7dfd90
--- /dev/null
+++ b/linkedin_mcp_server/config/loaders.py
@@ -0,0 +1,406 @@
+"""
+Configuration loading and argument parsing for LinkedIn MCP Server.
+
+Loads settings from CLI arguments and environment variables.
+"""
+
+import argparse
+import logging
+import os
+import sys
+from typing import Literal, cast
+
+from dotenv import load_dotenv
+
+from .schema import AppConfig, ConfigurationError
+
+# Load .env file if present
+load_dotenv()
+
+logger = logging.getLogger(__name__)
+
+# Boolean value mappings for environment variable parsing
+TRUTHY_VALUES = ("1", "true", "True", "yes", "Yes")
+FALSY_VALUES = ("0", "false", "False", "no", "No")
+
+
+def positive_int(value: str) -> int:
+ """Argparse type for positive integers."""
+ ivalue = int(value)
+ if ivalue <= 0:
+ raise argparse.ArgumentTypeError(f"must be positive, got {value}")
+ return ivalue
+
+
+class EnvironmentKeys:
+ """Environment variable names used by the application."""
+
+ HEADLESS = "HEADLESS"
+ LOG_LEVEL = "LOG_LEVEL"
+ TRANSPORT = "TRANSPORT"
+ TIMEOUT = "TIMEOUT"
+ USER_AGENT = "USER_AGENT"
+ HOST = "HOST"
+ PORT = "PORT"
+ HTTP_PATH = "HTTP_PATH"
+ SLOW_MO = "SLOW_MO"
+ VIEWPORT = "VIEWPORT"
+ CHROME_PATH = "CHROME_PATH"
+ USER_DATA_DIR = "USER_DATA_DIR"
+ AUTH = "AUTH"
+ OAUTH_BASE_URL = "OAUTH_BASE_URL"
+ OAUTH_PASSWORD = "OAUTH_PASSWORD"
+
+
+def is_interactive_environment() -> bool:
+ """
+ Detect if running in an interactive environment (TTY).
+
+ Returns:
+ True if both stdin and stdout are TTY devices
+ """
+ try:
+ return sys.stdin.isatty() and sys.stdout.isatty()
+ except (AttributeError, OSError):
+ return False
+
+
+def load_from_env(config: AppConfig) -> AppConfig:
+ """Load configuration from environment variables."""
+
+ # Log level
+ if log_level_env := os.environ.get(EnvironmentKeys.LOG_LEVEL):
+ log_level_upper = log_level_env.upper()
+ if log_level_upper in ("DEBUG", "INFO", "WARNING", "ERROR"):
+ config.server.log_level = cast(
+ Literal["DEBUG", "INFO", "WARNING", "ERROR"], log_level_upper
+ )
+
+ # Headless mode
+ if os.environ.get(EnvironmentKeys.HEADLESS) in FALSY_VALUES:
+ config.browser.headless = False
+ elif os.environ.get(EnvironmentKeys.HEADLESS) in TRUTHY_VALUES:
+ config.browser.headless = True
+
+ # Transport mode
+ if transport_env := os.environ.get(EnvironmentKeys.TRANSPORT):
+ config.server.transport_explicitly_set = True
+ if transport_env == "stdio":
+ config.server.transport = "stdio"
+ elif transport_env == "streamable-http":
+ config.server.transport = "streamable-http"
+ else:
+ raise ConfigurationError(
+ f"Invalid TRANSPORT: '{transport_env}'. Must be 'stdio' or 'streamable-http'."
+ )
+
+ # Persistent browser profile directory
+ if user_data_dir := os.environ.get(EnvironmentKeys.USER_DATA_DIR):
+ config.browser.user_data_dir = user_data_dir
+
+ # Timeout for page operations (validated in BrowserConfig.validate())
+ if timeout_env := os.environ.get(EnvironmentKeys.TIMEOUT):
+ try:
+ config.browser.default_timeout = int(timeout_env)
+ except ValueError:
+ raise ConfigurationError(
+ f"Invalid TIMEOUT: '{timeout_env}'. Must be an integer."
+ )
+
+ # Custom user agent
+ if user_agent_env := os.environ.get(EnvironmentKeys.USER_AGENT):
+ config.browser.user_agent = user_agent_env
+
+ # HTTP server host
+ if host_env := os.environ.get(EnvironmentKeys.HOST):
+ config.server.host = host_env
+
+ # HTTP server port (validated in AppConfig.validate())
+ if port_env := os.environ.get(EnvironmentKeys.PORT):
+ try:
+ config.server.port = int(port_env)
+ except ValueError:
+ raise ConfigurationError(f"Invalid PORT: '{port_env}'. Must be an integer.")
+
+ # HTTP server path
+ if path_env := os.environ.get(EnvironmentKeys.HTTP_PATH):
+ config.server.path = path_env
+
+ # Slow motion delay for debugging (validated in BrowserConfig.validate())
+ if slow_mo_env := os.environ.get(EnvironmentKeys.SLOW_MO):
+ try:
+ config.browser.slow_mo = int(slow_mo_env)
+ except ValueError:
+ raise ConfigurationError(
+ f"Invalid SLOW_MO: '{slow_mo_env}'. Must be an integer."
+ )
+
+ # Browser viewport (validated in BrowserConfig.validate())
+ if viewport_env := os.environ.get(EnvironmentKeys.VIEWPORT):
+ try:
+ width, height = viewport_env.lower().split("x")
+ config.browser.viewport_width = int(width)
+ config.browser.viewport_height = int(height)
+ except ValueError:
+ raise ConfigurationError(
+ f"Invalid VIEWPORT: '{viewport_env}'. Must be in format WxH (e.g., 1280x720)."
+ )
+
+ # Custom Chrome/Chromium executable path
+ if chrome_path_env := os.environ.get(EnvironmentKeys.CHROME_PATH):
+ config.browser.chrome_path = chrome_path_env
+
+ # OAuth authentication
+ if auth_env := os.environ.get(EnvironmentKeys.AUTH):
+ if auth_env == "oauth":
+ config.server.oauth.enabled = True
+ else:
+ raise ConfigurationError(f"Invalid AUTH: '{auth_env}'. Must be 'oauth'.")
+
+ if oauth_base_url := os.environ.get(EnvironmentKeys.OAUTH_BASE_URL):
+ config.server.oauth.base_url = oauth_base_url
+
+ if oauth_password := os.environ.get(EnvironmentKeys.OAUTH_PASSWORD):
+ config.server.oauth.password = oauth_password
+
+ return config
+
+
+def load_from_args(config: AppConfig) -> AppConfig:
+ """Load configuration from command line arguments."""
+ parser = argparse.ArgumentParser(
+ description="LinkedIn MCP Server - A Model Context Protocol server for LinkedIn integration"
+ )
+
+ parser.add_argument(
+ "--no-headless",
+ action="store_true",
+ help="Run browser with a visible window (useful for login and debugging)",
+ )
+
+ parser.add_argument(
+ "--log-level",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR"],
+ help="Set logging level (default: WARNING)",
+ )
+
+ parser.add_argument(
+ "--transport",
+ choices=["stdio", "streamable-http"],
+ default=None,
+ help="Specify the transport mode (stdio or streamable-http)",
+ )
+
+ parser.add_argument(
+ "--host",
+ type=str,
+ default=None,
+ help="HTTP server host (default: 127.0.0.1)",
+ )
+
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=None,
+ help="HTTP server port (default: 8000)",
+ )
+
+ parser.add_argument(
+ "--path",
+ type=str,
+ default=None,
+ help="HTTP server path (default: /mcp)",
+ )
+
+ # Browser configuration
+ parser.add_argument(
+ "--slow-mo",
+ type=int,
+ default=0,
+ metavar="MS",
+ help="Slow down browser actions by N milliseconds (debugging)",
+ )
+
+ parser.add_argument(
+ "--user-agent",
+ type=str,
+ default=None,
+ help="Custom browser user agent",
+ )
+
+ parser.add_argument(
+ "--viewport",
+ type=str,
+ default=None,
+ metavar="WxH",
+ help="Browser viewport size (default: 1280x720)",
+ )
+
+ parser.add_argument(
+ "--timeout",
+ type=positive_int,
+ default=None,
+ metavar="MS",
+ help="Browser timeout for page operations in milliseconds (default: 5000)",
+ )
+
+ parser.add_argument(
+ "--chrome-path",
+ type=str,
+ default=None,
+ metavar="PATH",
+ help="Path to Chrome/Chromium executable (for custom browser installations)",
+ )
+
+ # Session management
+ parser.add_argument(
+ "--login",
+ action="store_true",
+ help="Login interactively via browser and save persistent profile",
+ )
+
+ parser.add_argument(
+ "--status",
+ action="store_true",
+ help="Check if current session is valid and exit",
+ )
+
+ parser.add_argument(
+ "--logout",
+ action="store_true",
+ help="Clear stored LinkedIn browser profile",
+ )
+
+ parser.add_argument(
+ "--user-data-dir",
+ type=str,
+ default=None,
+ metavar="PATH",
+ help="Path to persistent browser profile directory (default: ~/.linkedin-mcp/profile)",
+ )
+
+ # OAuth authentication
+ parser.add_argument(
+ "--auth",
+ choices=["oauth"],
+ default=None,
+ help="Enable authentication (oauth for OAuth 2.1)",
+ )
+
+ parser.add_argument(
+ "--oauth-base-url",
+ type=str,
+ default=None,
+ metavar="URL",
+ help="Public URL of this server for OAuth (e.g. https://my-mcp.example.com)",
+ )
+
+ parser.add_argument(
+ "--oauth-password",
+ type=str,
+ default=None,
+ metavar="PASSWORD",
+ help="Password for the OAuth login page (visible in process list; prefer OAUTH_PASSWORD env var)",
+ )
+
+ args = parser.parse_args()
+
+ # Update configuration with parsed arguments
+ if args.no_headless:
+ config.browser.headless = False
+
+ if args.log_level:
+ config.server.log_level = args.log_level
+
+ if args.transport:
+ config.server.transport = args.transport
+ config.server.transport_explicitly_set = True
+
+ if args.host:
+ config.server.host = args.host
+
+ if args.port:
+ config.server.port = args.port
+
+ if args.path:
+ config.server.path = args.path
+
+ # Browser configuration
+ if args.slow_mo:
+ config.browser.slow_mo = args.slow_mo
+
+ if args.user_agent:
+ config.browser.user_agent = args.user_agent
+
+ # Viewport (validated in BrowserConfig.validate())
+ if args.viewport:
+ try:
+ width, height = args.viewport.lower().split("x")
+ config.browser.viewport_width = int(width)
+ config.browser.viewport_height = int(height)
+ except ValueError:
+ raise ConfigurationError(
+ f"Invalid --viewport: '{args.viewport}'. Must be in format WxH (e.g., 1280x720)."
+ )
+
+ if args.timeout is not None:
+ config.browser.default_timeout = args.timeout
+
+ if args.chrome_path:
+ config.browser.chrome_path = args.chrome_path
+
+ # Session management
+ if args.login:
+ config.server.login = True
+
+ if args.status:
+ config.server.status = True
+
+ if args.logout:
+ config.server.logout = True
+
+ if args.user_data_dir:
+ config.browser.user_data_dir = args.user_data_dir
+
+ # OAuth authentication
+ if args.auth == "oauth":
+ config.server.oauth.enabled = True
+
+ if args.oauth_base_url:
+ config.server.oauth.base_url = args.oauth_base_url
+
+ if args.oauth_password:
+ config.server.oauth.password = args.oauth_password
+
+ return config
+
+
+def load_config() -> AppConfig:
+ """
+ Load configuration with clear precedence order.
+
+ Configuration is loaded in the following priority order:
+ 1. Command line arguments (highest priority)
+ 2. Environment variables
+ 3. Defaults (lowest priority)
+
+ Returns:
+ Fully configured application settings
+ """
+ # Start with default configuration
+ config = AppConfig()
+
+ # Set interactive mode
+ config.is_interactive = is_interactive_environment()
+ logger.debug(f"Interactive mode: {config.is_interactive}")
+
+ # Override with environment variables
+ config = load_from_env(config)
+
+ # Override with command line arguments (highest priority)
+ config = load_from_args(config)
+
+ # Validate final configuration
+ config.validate()
+
+ return config
diff --git a/linkedin_mcp_server/config/schema.py b/linkedin_mcp_server/config/schema.py
new file mode 100644
index 00000000..82a4152a
--- /dev/null
+++ b/linkedin_mcp_server/config/schema.py
@@ -0,0 +1,158 @@
+"""
+Configuration schema definitions for LinkedIn MCP Server.
+
+Defines the dataclass schemas that represent the application's configuration
+structure with type-safe configuration objects and default values.
+"""
+
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Literal
+from urllib.parse import urlparse
+
+
+class ConfigurationError(Exception):
+ """Raised when configuration validation fails."""
+
+
+@dataclass
+class BrowserConfig:
+ """Configuration for browser settings."""
+
+ headless: bool = True
+ slow_mo: int = 0 # Milliseconds between browser actions (debugging)
+ user_agent: str | None = None # Custom browser user agent
+ viewport_width: int = 1280
+ viewport_height: int = 720
+ default_timeout: int = 5000 # Milliseconds for page operations
+ chrome_path: str | None = None # Path to Chrome/Chromium executable
+ user_data_dir: str = "~/.linkedin-mcp/profile" # Persistent browser profile
+
+ def validate(self) -> None:
+ """Validate browser configuration values."""
+ if self.slow_mo < 0:
+ raise ConfigurationError(
+ f"slow_mo must be non-negative, got {self.slow_mo}"
+ )
+ if self.default_timeout <= 0:
+ raise ConfigurationError(
+ f"default_timeout must be positive, got {self.default_timeout}"
+ )
+ if self.viewport_width <= 0 or self.viewport_height <= 0:
+ raise ConfigurationError(
+ f"viewport dimensions must be positive, got {self.viewport_width}x{self.viewport_height}"
+ )
+ if self.chrome_path:
+ chrome_path = Path(self.chrome_path)
+ if not chrome_path.exists():
+ raise ConfigurationError(
+ f"chrome_path '{self.chrome_path}' does not exist"
+ )
+ if not chrome_path.is_file():
+ raise ConfigurationError(
+ f"chrome_path '{self.chrome_path}' is not a file"
+ )
+
+
+@dataclass
+class OAuthConfig:
+ """OAuth 2.1 authentication configuration for remote deployments."""
+
+ enabled: bool = False
+ base_url: str | None = (
+ None # Public URL of this server (e.g. https://my-mcp.example.com)
+ )
+ password: str | None = None # Password for the OAuth login page
+
+
+@dataclass
+class ServerConfig:
+ """MCP server configuration."""
+
+ transport: Literal["stdio", "streamable-http"] = "stdio"
+ transport_explicitly_set: bool = False
+ log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = "WARNING"
+ login: bool = False
+ status: bool = False # Check session validity and exit
+ logout: bool = False
+ # HTTP transport configuration
+ host: str = "127.0.0.1"
+ port: int = 8000
+ path: str = "/mcp"
+ # OAuth authentication
+ oauth: OAuthConfig = field(default_factory=OAuthConfig)
+
+
+@dataclass
+class AppConfig:
+ """Main application configuration."""
+
+ browser: BrowserConfig = field(default_factory=BrowserConfig)
+ server: ServerConfig = field(default_factory=ServerConfig)
+ is_interactive: bool = field(default=False)
+
+ def validate(self) -> None:
+ """Validate all configuration values. Call after modifying config."""
+ self.browser.validate()
+ if self.server.transport == "streamable-http":
+ self._validate_transport_config()
+ self._validate_path_format()
+ self._validate_port_range()
+ self._validate_oauth()
+
+ def _validate_transport_config(self) -> None:
+ """Validate transport configuration is consistent."""
+ if not self.server.host:
+ raise ConfigurationError("HTTP transport requires a valid host")
+ if not self.server.port:
+ raise ConfigurationError("HTTP transport requires a valid port")
+
+ def _validate_port_range(self) -> None:
+ """Validate port is in valid range."""
+ if not (1 <= self.server.port <= 65535):
+ raise ConfigurationError(
+ f"Port {self.server.port} is not in valid range (1-65535)"
+ )
+
+ def _validate_path_format(self) -> None:
+ """Validate path format for HTTP transport."""
+ if not self.server.path.startswith("/"):
+ raise ConfigurationError(
+ f"HTTP path '{self.server.path}' must start with '/'"
+ )
+ if len(self.server.path) < 2:
+ raise ConfigurationError(
+ f"HTTP path '{self.server.path}' must be at least 2 characters"
+ )
+
+ def _validate_oauth(self) -> None:
+ """Validate OAuth configuration when enabled.
+
+ Skipped for command-only modes (--login, --status, --logout) that exit
+ before starting the server, so AUTH=oauth in the environment doesn't
+ break maintenance commands.
+ """
+ if not self.server.oauth.enabled:
+ return
+ if self.server.login or self.server.status or self.server.logout:
+ return
+ if self.server.transport != "streamable-http":
+ raise ConfigurationError("OAuth requires --transport streamable-http")
+ if not self.server.oauth.base_url:
+ raise ConfigurationError(
+ "OAuth requires OAUTH_BASE_URL (the public URL of this server)"
+ )
+ if not self.server.oauth.base_url.startswith("https://"):
+ raise ConfigurationError(
+ "OAuth requires OAUTH_BASE_URL to use HTTPS (e.g. https://my-mcp.example.com)"
+ )
+ parsed = urlparse(self.server.oauth.base_url)
+ if parsed.path not in ("", "/"):
+ raise ConfigurationError(
+ "OAuth base URL must not contain a path component "
+ "(e.g. https://my-mcp.example.com, not https://my-mcp.example.com/api)"
+ )
+ if not self.server.oauth.password:
+ raise ConfigurationError(
+ "OAuth requires OAUTH_PASSWORD (password for the login page)"
+ )
diff --git a/linkedin_mcp_server/constants.py b/linkedin_mcp_server/constants.py
new file mode 100644
index 00000000..5f366d45
--- /dev/null
+++ b/linkedin_mcp_server/constants.py
@@ -0,0 +1,3 @@
+"""Project-wide constants."""
+
+TOOL_TIMEOUT_SECONDS: float = 90.0
diff --git a/linkedin_mcp_server/core/__init__.py b/linkedin_mcp_server/core/__init__.py
new file mode 100644
index 00000000..aba9ff76
--- /dev/null
+++ b/linkedin_mcp_server/core/__init__.py
@@ -0,0 +1,41 @@
+"""Core browser management, authentication, and scraping utilities."""
+
+from .auth import (
+ detect_auth_barrier,
+ detect_auth_barrier_quick,
+ is_logged_in,
+ resolve_remember_me_prompt,
+ wait_for_manual_login,
+ warm_up_browser,
+)
+from .browser import BrowserManager
+from .exceptions import (
+ AuthenticationError,
+ ElementNotFoundError,
+ LinkedInScraperException,
+ NetworkError,
+ ProfileNotFoundError,
+ RateLimitError,
+ ScrapingError,
+)
+from .utils import detect_rate_limit, handle_modal_close, scroll_to_bottom
+
+__all__ = [
+ "AuthenticationError",
+ "BrowserManager",
+ "detect_auth_barrier",
+ "detect_auth_barrier_quick",
+ "ElementNotFoundError",
+ "LinkedInScraperException",
+ "NetworkError",
+ "ProfileNotFoundError",
+ "RateLimitError",
+ "ScrapingError",
+ "detect_rate_limit",
+ "handle_modal_close",
+ "is_logged_in",
+ "resolve_remember_me_prompt",
+ "scroll_to_bottom",
+ "wait_for_manual_login",
+ "warm_up_browser",
+]
diff --git a/linkedin_mcp_server/core/auth.py b/linkedin_mcp_server/core/auth.py
new file mode 100644
index 00000000..08eb2b9e
--- /dev/null
+++ b/linkedin_mcp_server/core/auth.py
@@ -0,0 +1,298 @@
+"""Authentication functions for LinkedIn."""
+
+import asyncio
+import logging
+import re
+from urllib.parse import urlparse
+
+from patchright.async_api import Page, TimeoutError as PlaywrightTimeoutError
+
+from .exceptions import AuthenticationError
+
+logger = logging.getLogger(__name__)
+
+_AUTH_BLOCKER_URL_PATTERNS = (
+ "/login",
+ "/authwall",
+ "/checkpoint",
+ "/challenge",
+ "/uas/login",
+ "/uas/consumer-email-challenge",
+)
+_LOGIN_TITLE_PATTERNS = (
+ "linkedin login",
+ "sign in | linkedin",
+)
+_AUTH_BARRIER_TEXT_MARKERS = (
+ ("welcome back", "sign in using another account"),
+ ("welcome back", "join now"),
+ ("choose an account", "sign in using another account"),
+ ("continue as", "sign in using another account"),
+)
+_REMEMBER_ME_CONTAINER_SELECTOR = "#rememberme-div"
+_REMEMBER_ME_BUTTON_SELECTOR = "#rememberme-div button"
+
+
+async def warm_up_browser(page: Page) -> None:
+ """Visit normal sites to appear more human-like before LinkedIn access."""
+ sites = [
+ "https://www.google.com",
+ "https://www.wikipedia.org",
+ "https://www.github.com",
+ ]
+
+ logger.info("Warming up browser by visiting normal sites...")
+
+ failures = 0
+ for site in sites:
+ try:
+ await page.goto(site, wait_until="domcontentloaded", timeout=10000)
+ await asyncio.sleep(1)
+ logger.debug("Visited %s", site)
+ except Exception as e:
+ failures += 1
+ logger.debug("Could not visit %s: %s", site, e)
+ continue
+
+ if failures == len(sites):
+ logger.warning("Browser warm-up failed: none of %d sites reachable", len(sites))
+ else:
+ logger.info("Browser warm-up complete")
+
+
+async def is_logged_in(page: Page) -> bool:
+ """Check if currently logged in to LinkedIn.
+
+ Uses a three-tier strategy:
+ 1. Fail-fast on auth blocker URLs
+ 2. Check for navigation elements (primary)
+ 3. URL-based fallback for authenticated-only pages
+ """
+ try:
+ current_url = page.url
+
+ # Step 1: Fail-fast on auth blockers
+ if _is_auth_blocker_url(current_url):
+ return False
+
+ # Step 2: Selector check (PRIMARY)
+ old_selectors = '.global-nav__primary-link, [data-control-name="nav.settings"]'
+ old_count = await page.locator(old_selectors).count()
+
+ new_selectors = 'nav a[href*="/feed"], nav button:has-text("Home"), nav a[href*="/mynetwork"]'
+ new_count = await page.locator(new_selectors).count()
+
+ has_nav_elements = old_count > 0 or new_count > 0
+
+ # Step 3: URL fallback
+ authenticated_only_pages = [
+ "/feed",
+ "/mynetwork",
+ "/messaging",
+ "/notifications",
+ ]
+ is_authenticated_page = any(
+ pattern in current_url for pattern in authenticated_only_pages
+ )
+
+ if not is_authenticated_page:
+ return has_nav_elements
+
+ if has_nav_elements:
+ return True
+
+ # Empty authenticated-only pages are a false positive during cookie
+ # bridge recovery. Require some real page content before trusting URL.
+ body_text = await page.evaluate("() => document.body?.innerText || ''")
+ if not isinstance(body_text, str):
+ return False
+
+ return bool(body_text.strip())
+ except PlaywrightTimeoutError:
+ logger.warning(
+ "Timeout checking login status on %s â treating as not logged in",
+ page.url,
+ )
+ return False
+ except Exception:
+ logger.error("Unexpected error checking login status", exc_info=True)
+ raise
+
+
+async def detect_auth_barrier(page: Page) -> str | None:
+ """Detect LinkedIn auth/account-picker barriers on the current page."""
+ return await _detect_auth_barrier(page, include_body_text=True)
+
+
+async def _detect_auth_barrier(
+ page: Page,
+ *,
+ include_body_text: bool,
+) -> str | None:
+ """Detect LinkedIn auth/account-picker barriers on the current page."""
+ try:
+ current_url = page.url
+ if _is_auth_blocker_url(current_url):
+ return f"auth blocker URL: {current_url}"
+
+ try:
+ title = (await page.title()).strip().lower()
+ except Exception:
+ title = ""
+ if any(pattern in title for pattern in _LOGIN_TITLE_PATTERNS):
+ return f"login title: {title}"
+
+ if not include_body_text:
+ return None
+
+ try:
+ body_text = await page.evaluate("() => document.body?.innerText || ''")
+ except Exception:
+ body_text = ""
+ if not isinstance(body_text, str):
+ body_text = ""
+
+ normalized = re.sub(r"\s+", " ", body_text).strip().lower()
+ for marker_group in _AUTH_BARRIER_TEXT_MARKERS:
+ if all(marker in normalized for marker in marker_group):
+ return f"auth barrier text: {' + '.join(marker_group)}"
+
+ return None
+ except PlaywrightTimeoutError:
+ logger.warning(
+ "Timeout checking auth barrier on %s â continuing without barrier detection",
+ page.url,
+ )
+ return None
+ except Exception:
+ logger.error("Unexpected error checking auth barrier", exc_info=True)
+ return None
+
+
+async def detect_auth_barrier_quick(page: Page) -> str | None:
+ """Cheap auth-barrier check for normal navigations.
+
+ Uses URL and title only, avoiding a full body-text fetch on healthy pages.
+ """
+ return await _detect_auth_barrier(page, include_body_text=False)
+
+
+async def resolve_remember_me_prompt(page: Page) -> bool:
+ """Click through LinkedIn's saved-account chooser when it appears."""
+ try:
+ logger.debug("Checking remember-me prompt on %s", page.url)
+ try:
+ await page.wait_for_selector(_REMEMBER_ME_CONTAINER_SELECTOR, timeout=3000)
+ logger.debug("Remember-me container appeared")
+ except PlaywrightTimeoutError:
+ logger.debug("Remember-me container did not appear in time")
+ return False
+
+ target_locator = page.locator(_REMEMBER_ME_BUTTON_SELECTOR)
+ target = target_locator.first
+ try:
+ target_count = await target_locator.count()
+ except Exception:
+ logger.debug(
+ "Could not count remember-me buttons; continuing with first match",
+ exc_info=True,
+ )
+ target_count = -1
+ logger.debug(
+ "Remember-me target count for %s: %d",
+ _REMEMBER_ME_BUTTON_SELECTOR,
+ target_count,
+ )
+ if target_count == 0:
+ logger.debug(
+ "Remember-me container appeared without any matching button selector"
+ )
+ return False
+ try:
+ await target.wait_for(state="visible", timeout=3000)
+ logger.debug("Remember-me button became visible")
+ except PlaywrightTimeoutError:
+ logger.debug(
+ "Remember-me prompt container appeared without a visible login button"
+ )
+ return False
+
+ logger.info("Clicking LinkedIn saved-account chooser to resume session")
+ try:
+ await target.scroll_into_view_if_needed(timeout=3000)
+ except PlaywrightTimeoutError:
+ logger.debug("Remember-me button did not scroll into view in time")
+
+ try:
+ await target.click(timeout=5000)
+ logger.debug("Remember-me button click succeeded")
+ except PlaywrightTimeoutError:
+ logger.debug("Retrying remember-me prompt click with force=True")
+ await target.click(timeout=5000, force=True)
+ logger.debug("Remember-me button force-click succeeded")
+ try:
+ await page.wait_for_load_state("domcontentloaded", timeout=10000)
+ except PlaywrightTimeoutError:
+ logger.debug("Remember-me prompt click did not finish loading in time")
+ await asyncio.sleep(1)
+ return True
+ except PlaywrightTimeoutError:
+ logger.debug("Remember-me prompt was present but not clickable in time")
+ return False
+ except Exception:
+ logger.debug("Failed to resolve remember-me prompt", exc_info=True)
+ return False
+
+
+def _is_auth_blocker_url(url: str) -> bool:
+ """Return True only for real auth routes, not arbitrary slug substrings."""
+ path = urlparse(url).path or "/"
+
+ if path in _AUTH_BLOCKER_URL_PATTERNS:
+ return True
+
+ return any(
+ path == f"{pattern}/" or path.startswith(f"{pattern}/")
+ for pattern in _AUTH_BLOCKER_URL_PATTERNS
+ )
+
+
+async def wait_for_manual_login(page: Page, timeout: int = 300000) -> None:
+ """Wait for user to manually complete login.
+
+ Args:
+ page: Patchright page object
+ timeout: Timeout in milliseconds (default: 5 minutes)
+
+ Raises:
+ AuthenticationError: If timeout or login not completed
+ """
+ logger.info(
+ "Please complete the login process manually in the browser. "
+ "Waiting up to 5 minutes..."
+ )
+
+ loop = asyncio.get_running_loop()
+ start_time = loop.time()
+
+ while True:
+ if await resolve_remember_me_prompt(page):
+ logger.info("Resolved saved-account chooser during manual login flow")
+ elapsed = (loop.time() - start_time) * 1000
+ if elapsed > timeout:
+ raise AuthenticationError(
+ "Manual login timeout. Please try again and complete login faster."
+ )
+ continue
+
+ if await is_logged_in(page):
+ logger.info("Manual login completed successfully")
+ return
+
+ elapsed = (loop.time() - start_time) * 1000
+ if elapsed > timeout:
+ raise AuthenticationError(
+ "Manual login timeout. Please try again and complete login faster."
+ )
+
+ await asyncio.sleep(1)
diff --git a/linkedin_mcp_server/core/browser.py b/linkedin_mcp_server/core/browser.py
new file mode 100644
index 00000000..19df362d
--- /dev/null
+++ b/linkedin_mcp_server/core/browser.py
@@ -0,0 +1,330 @@
+"""Browser lifecycle management using Patchright with persistent context."""
+
+import json
+import logging
+import os
+from pathlib import Path
+from typing import Any
+
+from patchright.async_api import (
+ BrowserContext,
+ Page,
+ Playwright,
+ async_playwright,
+)
+
+from .exceptions import NetworkError
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_USER_DATA_DIR = Path.home() / ".linkedin-mcp" / "profile"
+
+
+class BrowserManager:
+ """Async context manager for Patchright browser with persistent profile.
+
+ Session persistence is handled automatically by the persistent browser
+ context -- all cookies, localStorage, and session state are retained in
+ the ``user_data_dir`` between runs.
+ """
+
+ def __init__(
+ self,
+ user_data_dir: str | Path = _DEFAULT_USER_DATA_DIR,
+ headless: bool = True,
+ slow_mo: int = 0,
+ viewport: dict[str, int] | None = None,
+ user_agent: str | None = None,
+ **launch_options: Any,
+ ):
+ self.user_data_dir = str(Path(user_data_dir).expanduser())
+ self.headless = headless
+ self.slow_mo = slow_mo
+ self.viewport = viewport or {"width": 1280, "height": 720}
+ self.user_agent = user_agent
+ self.launch_options = launch_options
+
+ self._playwright: Playwright | None = None
+ self._context: BrowserContext | None = None
+ self._page: Page | None = None
+ self._is_authenticated = False
+
+ async def __aenter__(self) -> "BrowserManager":
+ await self.start()
+ return self
+
+ async def __aexit__(
+ self, exc_type: object, exc_val: object, exc_tb: object
+ ) -> None:
+ await self.close()
+
+ async def start(self) -> None:
+ """Start Patchright and launch persistent browser context."""
+ if self._context is not None:
+ raise RuntimeError("Browser already started. Call close() first.")
+ try:
+ self._playwright = await async_playwright().start()
+
+ Path(self.user_data_dir).mkdir(parents=True, exist_ok=True)
+
+ context_options: dict[str, Any] = {
+ "headless": self.headless,
+ "slow_mo": self.slow_mo,
+ "viewport": self.viewport,
+ **self.launch_options,
+ }
+
+ if self.user_agent:
+ context_options["user_agent"] = self.user_agent
+
+ self._context = await self._playwright.chromium.launch_persistent_context(
+ self.user_data_dir,
+ **context_options,
+ )
+
+ logger.info(
+ "Persistent browser launched (headless=%s, user_data_dir=%s)",
+ self.headless,
+ self.user_data_dir,
+ )
+
+ if self._context.pages:
+ self._page = self._context.pages[0]
+ else:
+ self._page = await self._context.new_page()
+
+ logger.info("Browser context and page ready")
+
+ except Exception as e:
+ await self.close()
+ raise NetworkError(f"Failed to start browser: {e}") from e
+
+ async def close(self) -> None:
+ """Close persistent context and cleanup resources."""
+ context = self._context
+ playwright = self._playwright
+ self._context = None
+ self._page = None
+ self._playwright = None
+
+ if context is None and playwright is None:
+ return
+
+ if context is not None:
+ try:
+ await context.close()
+ except Exception as exc:
+ logger.error("Error closing browser context: %s", exc)
+
+ if playwright is not None:
+ try:
+ await playwright.stop()
+ except Exception as exc:
+ logger.error("Error stopping playwright: %s", exc)
+
+ logger.info("Browser closed")
+
+ @property
+ def page(self) -> Page:
+ if not self._page:
+ raise RuntimeError(
+ "Browser not started. Use async context manager or call start()."
+ )
+ return self._page
+
+ @property
+ def context(self) -> BrowserContext:
+ if not self._context:
+ raise RuntimeError("Browser context not initialized.")
+ return self._context
+
+ async def set_cookie(
+ self, name: str, value: str, domain: str = ".linkedin.com"
+ ) -> None:
+ if not self._context:
+ raise RuntimeError("No browser context")
+
+ await self._context.add_cookies(
+ [{"name": name, "value": value, "domain": domain, "path": "/"}]
+ )
+ logger.debug("Cookie set: %s", name)
+
+ @property
+ def is_authenticated(self) -> bool:
+ return self._is_authenticated
+
+ @is_authenticated.setter
+ def is_authenticated(self, value: bool) -> None:
+ self._is_authenticated = value
+
+ def _default_cookie_path(self) -> Path:
+ return Path(self.user_data_dir).parent / "cookies.json"
+
+ @staticmethod
+ def _normalize_cookie_domain(cookie: Any) -> dict[str, Any]:
+ """Normalize cookie domain for cross-platform compatibility.
+
+ Playwright reports some LinkedIn cookies with ``.www.linkedin.com``
+ domain, but Chromium's internal store uses ``.linkedin.com``.
+ """
+ domain = cookie.get("domain", "")
+ if domain in (".www.linkedin.com", "www.linkedin.com"):
+ cookie = {**cookie, "domain": ".linkedin.com"}
+ return cookie
+
+ async def export_cookies(self, cookie_path: str | Path | None = None) -> bool:
+ """Export LinkedIn cookies to a portable JSON file."""
+ if not self._context:
+ logger.warning("Cannot export cookies: no browser context")
+ return False
+
+ path = Path(cookie_path) if cookie_path else self._default_cookie_path()
+ try:
+ all_cookies = await self._context.cookies()
+ cookies = [
+ self._normalize_cookie_domain(c)
+ for c in all_cookies
+ if "linkedin.com" in c.get("domain", "")
+ ]
+ path.write_text(json.dumps(cookies, indent=2))
+ logger.info("Exported %d LinkedIn cookies to %s", len(cookies), path)
+ return True
+ except Exception:
+ logger.exception("Failed to export cookies")
+ return False
+
+ async def export_storage_state(
+ self, path: str | Path, *, indexed_db: bool = True
+ ) -> bool:
+ """Export the current browser storage state for diagnostics and recovery."""
+ if not self._context:
+ logger.warning("Cannot export storage state: no browser context")
+ return False
+
+ storage_path = Path(path)
+ storage_path.parent.mkdir(parents=True, exist_ok=True)
+ try:
+ await self._context.storage_state(
+ path=storage_path,
+ indexed_db=indexed_db,
+ )
+ logger.info(
+ "Exported runtime storage snapshot to %s (indexed_db=%s)",
+ storage_path,
+ indexed_db,
+ )
+ return True
+ except Exception:
+ logger.exception("Failed to export storage state to %s", storage_path)
+ return False
+
+ _BRIDGE_COOKIE_PRESETS = {
+ "bridge_core": frozenset(
+ {
+ "li_at",
+ "li_rm",
+ "JSESSIONID",
+ "bcookie",
+ "bscookie",
+ "liap",
+ "lidc",
+ "li_gc",
+ "lang",
+ "timezone",
+ "li_mc",
+ }
+ ),
+ "auth_minimal": frozenset(
+ {
+ "li_at",
+ "JSESSIONID",
+ "bcookie",
+ "bscookie",
+ "lidc",
+ }
+ ),
+ }
+
+ @classmethod
+ def _bridge_cookie_names(
+ cls, preset_name: str | None = None
+ ) -> tuple[str, frozenset[str]]:
+ preset_name = (
+ preset_name
+ or os.getenv(
+ "LINKEDIN_DEBUG_BRIDGE_COOKIE_SET",
+ "auth_minimal",
+ ).strip()
+ or "auth_minimal"
+ )
+ preset = cls._BRIDGE_COOKIE_PRESETS.get(preset_name)
+ if preset is None:
+ logger.warning(
+ "Unknown LINKEDIN_DEBUG_BRIDGE_COOKIE_SET=%r, falling back to auth_minimal",
+ preset_name,
+ )
+ preset_name = "auth_minimal"
+ preset = cls._BRIDGE_COOKIE_PRESETS[preset_name]
+ return preset_name, preset
+
+ async def import_cookies(
+ self,
+ cookie_path: str | Path | None = None,
+ *,
+ preset_name: str | None = None,
+ ) -> bool:
+ """Import the portable LinkedIn bridge cookie subset.
+
+ Fresh browser-side cookies are preserved. The imported subset is the
+ smallest known set that can reconstruct a usable authenticated page in
+ a fresh profile.
+ """
+ if not self._context:
+ logger.warning("Cannot import cookies: no browser context")
+ return False
+
+ path = Path(cookie_path) if cookie_path else self._default_cookie_path()
+ if not path.exists():
+ logger.debug("No portable cookie file at %s", path)
+ return False
+
+ try:
+ all_cookies = json.loads(path.read_text())
+ if not all_cookies:
+ logger.debug("Cookie file is empty")
+ return False
+
+ resolved_preset_name, bridge_cookie_names = self._bridge_cookie_names(
+ preset_name
+ )
+
+ cookies = [
+ self._normalize_cookie_domain(c)
+ for c in all_cookies
+ if "linkedin.com" in c.get("domain", "")
+ and c.get("name") in bridge_cookie_names
+ ]
+
+ has_li_at = any(c.get("name") == "li_at" for c in cookies)
+ if not has_li_at:
+ logger.warning("No li_at cookie found in %s", path)
+ return False
+
+ await self._context.add_cookies(cookies) # type: ignore[arg-type]
+ logger.info(
+ "Imported %d LinkedIn bridge cookies from %s (preset=%s, li_at=%s): %s",
+ len(cookies),
+ path,
+ resolved_preset_name,
+ has_li_at,
+ ", ".join(c["name"] for c in cookies),
+ )
+ return True
+ except Exception:
+ logger.exception("Failed to import cookies from %s", path)
+ return False
+
+ def cookie_file_exists(self, cookie_path: str | Path | None = None) -> bool:
+ """Check if a portable cookie file exists."""
+ path = Path(cookie_path) if cookie_path else self._default_cookie_path()
+ return path.exists()
diff --git a/linkedin_mcp_server/core/exceptions.py b/linkedin_mcp_server/core/exceptions.py
new file mode 100644
index 00000000..0186c8df
--- /dev/null
+++ b/linkedin_mcp_server/core/exceptions.py
@@ -0,0 +1,45 @@
+"""Custom exceptions for LinkedIn scraping operations."""
+
+
+class LinkedInScraperException(Exception):
+ """Base exception for LinkedIn scraper."""
+
+ pass
+
+
+class AuthenticationError(LinkedInScraperException):
+ """Raised when authentication fails."""
+
+ pass
+
+
+class RateLimitError(LinkedInScraperException):
+ """Raised when rate limiting is detected."""
+
+ def __init__(self, message: str, suggested_wait_time: int = 300):
+ super().__init__(message)
+ self.suggested_wait_time = suggested_wait_time
+
+
+class ElementNotFoundError(LinkedInScraperException):
+ """Raised when an expected element is not found."""
+
+ pass
+
+
+class ProfileNotFoundError(LinkedInScraperException):
+ """Raised when a profile/page returns 404."""
+
+ pass
+
+
+class NetworkError(LinkedInScraperException):
+ """Raised when network-related issues occur."""
+
+ pass
+
+
+class ScrapingError(LinkedInScraperException):
+ """Raised when scraping fails for various reasons."""
+
+ pass
diff --git a/linkedin_mcp_server/core/utils.py b/linkedin_mcp_server/core/utils.py
new file mode 100644
index 00000000..7f3a3ebe
--- /dev/null
+++ b/linkedin_mcp_server/core/utils.py
@@ -0,0 +1,194 @@
+"""Utility functions for scraping operations."""
+
+import asyncio
+import logging
+
+from patchright.async_api import Page, TimeoutError as PlaywrightTimeoutError
+
+from .exceptions import RateLimitError
+
+logger = logging.getLogger(__name__)
+
+
+async def detect_rate_limit(page: Page) -> None:
+ """Detect if LinkedIn has rate-limited or security-challenged the session.
+
+ Checks (in order):
+ 1. URL contains /checkpoint or /authwall (security challenge)
+ 2. Page contains CAPTCHA iframe (bot detection)
+ 3. Body text contains rate-limit phrases on error-shaped pages (throttling)
+
+ The body-text heuristic only runs on pages without a ``