Update for API Traders: New Changes in DhanHQ API Authentication Process and Updates

@Dev_Goswami2 You can use this sample code. It works as this

  • First checks if a saved token (dhan_token.json) exists and is valid for at least 7 more hours; if yes, it reuses it.
  • If not, it calls Dhan’s /generate-consent API to get a consentAppId.
  • Using this, it prints a login URL (from https://auth.dhan.co/login/consentApp-login?...) which you must open in your browser and log in with your Dhan account + 2FA.
  • After login, Dhan redirects you to your configured redirect URL (e.g., http://localhost:3000) with a tokenId in the query - you copy this tokenId and paste it into the script.
  • The script then exchanges this tokenId for a valid accessToken + expiry time, saves them in dhan_token.json, and returns them for your API calls.
import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
from typing import Optional, Tuple

########################################################################
# ----------------------
# Config & Constants
# ----------------------
API_KEY = ""
API_SECRET = ""
DHAN_CLIENT_ID = ''

AUTH_BASE = "https://auth.dhan.co"
REDIRECT_URL = "http://localhost:3000"  # whatever you configured when generating the API key/secret
API_BASE = "https://api.dhan.co"

# Save token JSON alongside this script
TOKEN_FILE: Path = Path(__file__).resolve().parent / "dhan_token.json"
# How much validity time (buffer) we require to reuse the token
REUSE_BUFFER = timedelta(hours=7)


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
#########################################################################


# ----------------------
# Helpers: Token IO
# ----------------------
def _parse_expiry(expiry_str: str) -> datetime:
    """
    Parse expiry returned by API. Dhan usually returns ISO like '2025-09-23T12:37:23'.
    Treat it as UTC if tzinfo is missing.
    """
    dt = datetime.fromisoformat(expiry_str)
    if dt.tzinfo is None:
        # Assume UTC if tz not provided
        dt = dt.replace(tzinfo=timezone.utc)
    return dt


def _load_existing_token() -> Optional[Tuple[str, datetime]]:
    """
    Returns (access_token, expiry_dt) if file exists and is valid JSON with one key.
    Otherwise returns None.
    """
    if not TOKEN_FILE.exists():
        logging.info("Token file not found at %s", TOKEN_FILE)
        return None

    try:
        with TOKEN_FILE.open("r", encoding="utf-8") as f:
            data = json.load(f)

        if not isinstance(data, dict) or len(data) != 1:
            logging.warning("Token file format invalid; expected single {expiry: token} entry.")
            return None

        expiry_str, access_token = next(iter(data.items()))
        if not isinstance(expiry_str, str) or not isinstance(access_token, str):
            logging.warning("Token file content types invalid.")
            return None

        expiry_dt = _parse_expiry(expiry_str)
        return access_token, expiry_dt

    except Exception as e:
        logging.error("Failed to read/parse token file: %s", e)
        return None



def _save_token(expiry_str: str, access_token: str) -> None:
    """
    Save token as { "expiry": "token" } overwriting any previous content.
    """
    try:
        with TOKEN_FILE.open("w", encoding="utf-8") as f:
            json.dump({expiry_str: access_token}, f, indent=2)
        logging.info("Token saved to %s (expires at %s).", TOKEN_FILE, expiry_str)
    except Exception as e:
        logging.error("Failed to save token: %s", e)


# ----------------------
# Dhan Auth API Calls
# ----------------------
def _generate_consent() -> str:
    """
    Step 1: POST /app/generate-consent?client_id={dhanClientId}
    Headers: app_id, app_secret
    Returns consentAppId
    """
    url = f"{AUTH_BASE}/app/generate-consent"
    headers = {"app_id": API_KEY, "app_secret": API_SECRET}
    params = {"client_id": DHAN_CLIENT_ID}
    logging.info("Requesting consent for client_id %s ...", DHAN_CLIENT_ID)
    resp = requests.post(url, headers=headers, params=params, timeout=30)
    resp.raise_for_status()
    data = resp.json()
    consent_app_id = data["consentAppId"]
    logging.info("Consent generated. consentAppId=%s", consent_app_id)
    return consent_app_id


def _prompt_user_for_token_id(consent_app_id: str) -> str:
    """
    Step 2 (user in browser): Open login URL, complete login/2FA.
    Copy tokenId from your redirect URL and paste here.
    """
    login_url = f"{AUTH_BASE}/login/consentApp-login?consentAppId={consent_app_id}"
    logging.info("Open this URL in your browser and complete login/2FA:")
    logging.info(login_url)
    token_id = input("Paste tokenId from your redirect URL here: ").strip()
    if not token_id:
        raise ValueError("Empty tokenId provided.")
    logging.info("tokenId received.")
    return token_id


def _consume_consent(token_id: str) -> Tuple[str, str]:
    """
    Step 3: POST /app/consumeApp-consent?tokenId={tokenId}
    Headers: app_id, app_secret
    Returns (accessToken, expiryTimeStr)
    """
    url = f"{AUTH_BASE}/app/consumeApp-consent"
    headers = {"app_id": API_KEY, "app_secret": API_SECRET}
    params = {"tokenId": token_id}
    logging.info("Consuming consent to obtain access token...")
    resp = requests.post(url, headers=headers, params=params, timeout=30)
    resp.raise_for_status()
    data = resp.json()

    # Validate presence
    if "accessToken" not in data or "expiryTime" not in data:
        raise KeyError("Response missing 'accessToken' or 'expiryTime'.")

    logging.info("Access token obtained successfully.")
    return data["accessToken"], data["expiryTime"]


# ----------------------
# Public: Get Valid Token
# ----------------------
def get_dhan_token():
    """
    Returns a valid Dhan access token.
    - If a saved token exists and has >= 7 hours remaining, reuse it.
    - Otherwise, runs the 3-step flow and saves the new token.
    """
    # Try reuse
    loaded = _load_existing_token()
    now_utc = datetime.now(timezone.utc)

    if loaded:
        token, expiry_dt = loaded
        remaining = expiry_dt - now_utc
        if remaining >= REUSE_BUFFER:
            logging.info(
                "Reusing existing token (remaining: %s >= buffer: %s).",
                remaining, REUSE_BUFFER
            )
            return token, str(expiry_dt)
        else:
            logging.info(
                "Existing token expires soon (remaining: %s < buffer: %s). Generating new token.",
                remaining, REUSE_BUFFER
            )

    # Fresh flow
    consent_app_id = _generate_consent()
    token_id = _prompt_user_for_token_id(consent_app_id)
    access_token, expiry_str = _consume_consent(token_id)

    # Persist
    _save_token(expiry_str, access_token)

    return access_token, expiry_str


def get_access_token():
    try:
        token, expiry_str = get_dhan_token()
        logging.info(f"Access Token: {token} and expiry {expiry_str}...")
        return token, expiry_str
    except Exception as e:
        logging.exception("Failed to obtain Dhan access token: %s", e)
        return '', ''

get_access_token()