@Dev_Goswami2 You can use this sample code. It works as this
- First checks if a saved token (
dhan_token.json) exists and is valid for at least 7 more hours; if yes, it reuses it.
- If not, it calls Dhanās
/generate-consent API to get a consentAppId.
- Using this, it prints a login URL (from
https://auth.dhan.co/login/consentApp-login?...) which you must open in your browser and log in with your Dhan account + 2FA.
- After login, Dhan redirects you to your configured redirect URL (e.g.,
http://localhost:3000) with a tokenId in the query - you copy this tokenId and paste it into the script.
- The script then exchanges this
tokenId for a valid accessToken + expiry time, saves them in dhan_token.json, and returns them for your API calls.
import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
from typing import Optional, Tuple
########################################################################
# ----------------------
# Config & Constants
# ----------------------
API_KEY = ""
API_SECRET = ""
DHAN_CLIENT_ID = ''
AUTH_BASE = "https://auth.dhan.co"
REDIRECT_URL = "http://localhost:3000" # whatever you configured when generating the API key/secret
API_BASE = "https://api.dhan.co"
# Save token JSON alongside this script
TOKEN_FILE: Path = Path(__file__).resolve().parent / "dhan_token.json"
# How much validity time (buffer) we require to reuse the token
REUSE_BUFFER = timedelta(hours=7)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
#########################################################################
# ----------------------
# Helpers: Token IO
# ----------------------
def _parse_expiry(expiry_str: str) -> datetime:
"""
Parse expiry returned by API. Dhan usually returns ISO like '2025-09-23T12:37:23'.
Treat it as UTC if tzinfo is missing.
"""
dt = datetime.fromisoformat(expiry_str)
if dt.tzinfo is None:
# Assume UTC if tz not provided
dt = dt.replace(tzinfo=timezone.utc)
return dt
def _load_existing_token() -> Optional[Tuple[str, datetime]]:
"""
Returns (access_token, expiry_dt) if file exists and is valid JSON with one key.
Otherwise returns None.
"""
if not TOKEN_FILE.exists():
logging.info("Token file not found at %s", TOKEN_FILE)
return None
try:
with TOKEN_FILE.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict) or len(data) != 1:
logging.warning("Token file format invalid; expected single {expiry: token} entry.")
return None
expiry_str, access_token = next(iter(data.items()))
if not isinstance(expiry_str, str) or not isinstance(access_token, str):
logging.warning("Token file content types invalid.")
return None
expiry_dt = _parse_expiry(expiry_str)
return access_token, expiry_dt
except Exception as e:
logging.error("Failed to read/parse token file: %s", e)
return None
def _save_token(expiry_str: str, access_token: str) -> None:
"""
Save token as { "expiry": "token" } overwriting any previous content.
"""
try:
with TOKEN_FILE.open("w", encoding="utf-8") as f:
json.dump({expiry_str: access_token}, f, indent=2)
logging.info("Token saved to %s (expires at %s).", TOKEN_FILE, expiry_str)
except Exception as e:
logging.error("Failed to save token: %s", e)
# ----------------------
# Dhan Auth API Calls
# ----------------------
def _generate_consent() -> str:
"""
Step 1: POST /app/generate-consent?client_id={dhanClientId}
Headers: app_id, app_secret
Returns consentAppId
"""
url = f"{AUTH_BASE}/app/generate-consent"
headers = {"app_id": API_KEY, "app_secret": API_SECRET}
params = {"client_id": DHAN_CLIENT_ID}
logging.info("Requesting consent for client_id %s ...", DHAN_CLIENT_ID)
resp = requests.post(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
consent_app_id = data["consentAppId"]
logging.info("Consent generated. consentAppId=%s", consent_app_id)
return consent_app_id
def _prompt_user_for_token_id(consent_app_id: str) -> str:
"""
Step 2 (user in browser): Open login URL, complete login/2FA.
Copy tokenId from your redirect URL and paste here.
"""
login_url = f"{AUTH_BASE}/login/consentApp-login?consentAppId={consent_app_id}"
logging.info("Open this URL in your browser and complete login/2FA:")
logging.info(login_url)
token_id = input("Paste tokenId from your redirect URL here: ").strip()
if not token_id:
raise ValueError("Empty tokenId provided.")
logging.info("tokenId received.")
return token_id
def _consume_consent(token_id: str) -> Tuple[str, str]:
"""
Step 3: POST /app/consumeApp-consent?tokenId={tokenId}
Headers: app_id, app_secret
Returns (accessToken, expiryTimeStr)
"""
url = f"{AUTH_BASE}/app/consumeApp-consent"
headers = {"app_id": API_KEY, "app_secret": API_SECRET}
params = {"tokenId": token_id}
logging.info("Consuming consent to obtain access token...")
resp = requests.post(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
# Validate presence
if "accessToken" not in data or "expiryTime" not in data:
raise KeyError("Response missing 'accessToken' or 'expiryTime'.")
logging.info("Access token obtained successfully.")
return data["accessToken"], data["expiryTime"]
# ----------------------
# Public: Get Valid Token
# ----------------------
def get_dhan_token():
"""
Returns a valid Dhan access token.
- If a saved token exists and has >= 7 hours remaining, reuse it.
- Otherwise, runs the 3-step flow and saves the new token.
"""
# Try reuse
loaded = _load_existing_token()
now_utc = datetime.now(timezone.utc)
if loaded:
token, expiry_dt = loaded
remaining = expiry_dt - now_utc
if remaining >= REUSE_BUFFER:
logging.info(
"Reusing existing token (remaining: %s >= buffer: %s).",
remaining, REUSE_BUFFER
)
return token, str(expiry_dt)
else:
logging.info(
"Existing token expires soon (remaining: %s < buffer: %s). Generating new token.",
remaining, REUSE_BUFFER
)
# Fresh flow
consent_app_id = _generate_consent()
token_id = _prompt_user_for_token_id(consent_app_id)
access_token, expiry_str = _consume_consent(token_id)
# Persist
_save_token(expiry_str, access_token)
return access_token, expiry_str
def get_access_token():
try:
token, expiry_str = get_dhan_token()
logging.info(f"Access Token: {token} and expiry {expiry_str}...")
return token, expiry_str
except Exception as e:
logging.exception("Failed to obtain Dhan access token: %s", e)
return '', ''
get_access_token()