Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,23 @@ By adhering to the DB API 2.0 specification, the mssql-python module ensures com

### Support for Microsoft Entra ID Authentication

The Microsoft mssql-python driver enables Python applications to connect to Microsoft SQL Server, Azure SQL Database, or Azure SQL Managed Instance using Microsoft Entra ID identities. It supports various authentication methods, including username and password, Microsoft Entra managed identity, and Integrated Windows Authentication in a federated, domain-joined environment. Additionally, the driver supports Microsoft Entra interactive authentication and Microsoft Entra managed identity authentication for both system-assigned and user-assigned managed identities.
The Microsoft mssql-python driver enables Python applications to connect to Microsoft SQL Server, Azure SQL Database, or Azure SQL Managed Instance using Microsoft Entra ID identities. It supports a variety of authentication methods, including username and password, Microsoft Entra managed identity (system-assigned and user-assigned), Integrated Windows Authentication in a federated, domain-joined environment, interactive authentication via browser, device code flow for environments without browser access, and the default authentication method based on environment and configuration. This flexibility allows developers to choose the most suitable authentication approach for their deployment scenario.

EntraID authentication is now fully supported on MacOS and Linux but with certain limitations as mentioned in the table:

| Authentication Method | Windows Support | macOS/Linux Support | Notes |
|----------------------|----------------|---------------------|-------|
| ActiveDirectoryPassword | ✅ Yes | ✅ Yes | Username/password-based authentication |
| ActiveDirectoryInteractive | ✅ Yes | ❌ No | Only works on Windows |
| ActiveDirectoryInteractive | ✅ Yes | ✅ Yes | Interactive login via browser; requires user interaction |
| ActiveDirectoryMSI (Managed Identity) | ✅ Yes | ✅ Yes | For Azure VMs/containers with managed identity |
| ActiveDirectoryServicePrincipal | ✅ Yes | ✅ Yes | Use client ID and secret or certificate |
| ActiveDirectoryIntegrated | ✅ Yes | ❌ No | Only works on Windows (requires Kerberos/SSPI) |
| ActiveDirectoryDeviceCode | ✅ Yes | ✅ Yes | Device code flow for authentication; suitable for environments without browser access |
| ActiveDirectoryDefault | ✅ Yes | ✅ Yes | Uses default authentication method based on environment and configuration |

**NOTE**: For using Access Token, the connection string *must not* contain `UID`, `PWD`, `Authentication`, or `Trusted_Connection` keywords.

**NOTE**: For using ActiveDirectoryDeviceCode, make sure to specify a `Connect Timeout` that provides enough time to go through the device code flow authentication process.

### Enhanced Pythonic Features

Expand Down
172 changes: 172 additions & 0 deletions mssql_python/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
This module handles authentication for the mssql_python package.
"""

import platform
import struct
from typing import Tuple, Dict, Optional, Union
from mssql_python.logging_config import get_logger, ENABLE_LOGGING

logger = get_logger()

class AuthType:
Comment thread
jahnvi480 marked this conversation as resolved.
Outdated
"""Constants for authentication types"""
INTERACTIVE = "activedirectoryinteractive"
DEVICE_CODE = "activedirectorydevicecode"
DEFAULT = "activedirectorydefault"

class AADAuth:
"""Handles Azure Active Directory authentication"""

@staticmethod
def get_token_struct(token: str) -> bytes:
"""Convert token to SQL Server compatible format"""
token_bytes = token.encode("UTF-16-LE")
return struct.pack(f"<I{len(token_bytes)}s", len(token_bytes), token_bytes)

@staticmethod
def get_default_token() -> bytes:
"""Get token using DefaultAzureCredential"""
from azure.identity import DefaultAzureCredential

try:
# DefaultAzureCredential will automatically use the best available method
# based on the environment (e.g., managed identity, environment variables)
credential = DefaultAzureCredential()
token = credential.get_token("https://database.windows.net/.default").token
return AADAuth.get_token_struct(token)
except Exception as e:
Comment thread
jahnvi480 marked this conversation as resolved.
Outdated
raise RuntimeError(f"Failed to create DefaultAzureCredential: {e}")
Comment thread
jahnvi480 marked this conversation as resolved.
Outdated

@staticmethod
def get_device_code_token() -> bytes:
"""Get token using DeviceCodeCredential"""
from azure.identity import DeviceCodeCredential

try:
credential = DeviceCodeCredential()
token = credential.get_token("https://database.windows.net/.default").token
return AADAuth.get_token_struct(token)
except Exception as e:
Comment thread
jahnvi480 marked this conversation as resolved.
Outdated
raise RuntimeError(f"Failed to create DeviceCodeCredential: {e}")

@staticmethod
def get_interactive_token() -> bytes:
"""Get token using InteractiveBrowserCredential"""
from azure.identity import InteractiveBrowserCredential

try:
credential = InteractiveBrowserCredential()
token = credential.get_token("https://database.windows.net/.default").token
return AADAuth.get_token_struct(token)
except Exception as e:
Comment thread
jahnvi480 marked this conversation as resolved.
raise RuntimeError(f"Failed to create InteractiveBrowserCredential: {e}")

def process_auth_parameters(parameters: list) -> Tuple[list, Optional[str]]:
Comment thread
jahnvi480 marked this conversation as resolved.
"""
Process connection parameters and extract authentication type.

Args:
parameters: List of connection string parameters

Returns:
Tuple[list, Optional[str]]: Modified parameters and authentication type

Raises:
ValueError: If an invalid authentication type is provided
"""
modified_parameters = []
auth_type = None

for param in parameters:
param = param.strip()
if not param:
continue

if "=" not in param:
modified_parameters.append(param)
continue

key, value = param.split("=", 1)
key_lower = key.lower()
value_lower = value.lower()

Comment thread
jahnvi480 marked this conversation as resolved.
if key_lower == "authentication":
if value_lower == AuthType.INTERACTIVE:
auth_type = "interactive"
if platform.system().lower() != "windows":
modified_parameters.append(param)
elif value_lower == AuthType.DEVICE_CODE:
auth_type = "devicecode"
Comment thread
jahnvi480 marked this conversation as resolved.
elif value_lower == AuthType.DEFAULT:
auth_type = "default"
else:
raise ValueError(f"Invalid authentication type: {value}. "
Comment thread
jahnvi480 marked this conversation as resolved.
Outdated
f"Supported types are: {AuthType.INTERACTIVE}, "
f"{AuthType.DEVICE_CODE}, {AuthType.DEFAULT}")
else:
modified_parameters.append(param)

return modified_parameters, auth_type

def remove_sensitive_params(parameters: list) -> list:
"""Remove sensitive parameters from connection string"""
exclude_keys = [
"uid=", "pwd=", "encrypt=", "trustservercertificate=", "authentication="
]
return [
param for param in parameters
if not any(param.lower().startswith(exclude) for exclude in exclude_keys)
]

def get_auth_token(auth_type: str) -> Optional[bytes]:
"""Get authentication token based on auth type"""
if not auth_type:
return None

if auth_type == "default":
return AADAuth.get_default_token()
elif auth_type == "devicecode":
return AADAuth.get_device_code_token()
elif auth_type == "interactive" and platform.system().lower() != "windows":
Comment thread
jahnvi480 marked this conversation as resolved.
Outdated
return AADAuth.get_interactive_token()
return None

def process_connection_string(connection_string: str) -> Tuple[str, Optional[Dict]]:
"""
Process connection string and handle authentication.

Args:
connection_string: The connection string to process

Returns:
Tuple[str, Optional[Dict]]: Processed connection string and attrs_before dict if needed

Raises:
ValueError: If the connection string is invalid or empty
"""
# Check type first
if not isinstance(connection_string, str):
raise ValueError("Connection string must be a string")

# Then check if empty
if not connection_string:
raise ValueError("Connection string cannot be empty")

parameters = connection_string.split(";")
Comment thread
jahnvi480 marked this conversation as resolved.

# Validate that there's at least one valid parameter
if not any('=' in param for param in parameters):
raise ValueError("Invalid connection string format")

modified_parameters, auth_type = process_auth_parameters(parameters)

if auth_type:
modified_parameters = remove_sensitive_params(modified_parameters)
token_struct = get_auth_token(auth_type)
if token_struct:
return ";".join(modified_parameters) + ";", {1256: token_struct}

return ";".join(modified_parameters) + ";", None
8 changes: 8 additions & 0 deletions mssql_python/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
- Cursors are also cleaned up automatically when no longer referenced, to prevent memory leaks.
"""
import weakref
import re
from mssql_python.cursor import Cursor
from mssql_python.logging_config import get_logger, ENABLE_LOGGING
from mssql_python.constants import ConstantsDDBC as ddbc_sql_const
from mssql_python.helpers import add_driver_to_connection_str, check_error
from mssql_python import ddbc_bindings
from mssql_python.pooling import PoolingManager
from mssql_python.exceptions import DatabaseError, InterfaceError
from mssql_python.auth import process_connection_string

logger = get_logger()

Expand Down Expand Up @@ -64,6 +66,12 @@ def __init__(self, connection_str: str = "", autocommit: bool = False, attrs_bef
connection_str, **kwargs
)
self._attrs_before = attrs_before or {}
if re.search(r"authentication", self.connection_str, re.IGNORECASE):
connection_result = process_connection_string(self.connection_str)
self.connection_str = connection_result[0]
if connection_result[1]:
Comment thread
jahnvi480 marked this conversation as resolved.
self._attrs_before.update(connection_result[1])

self._closed = False

# Using WeakSet which automatically removes cursors when they are no longer in use
Expand Down