import datetime
import warnings
import sqlite3
import pandas as pd
import time
import os
from dhanhq import marketfeed
warnings.filterwarnings(“ignore”)
Database setup
DB_FILE = “market_data.db”
def setup_database():
“”“Create necessary tables if they don’t exist”“”
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
# Create table for storing market data
cursor.execute("""
CREATE TABLE IF NOT EXISTS market_data (
security_id TEXT PRIMARY KEY,
ltp REAL,
avg_price REAL,
volume INTEGER,
total_sell_quantity INTEGER,
open REAL,
close REAL,
high REAL,
low REAL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create table for storing watchlist
cursor.execute("""
CREATE TABLE IF NOT EXISTS watchlist (
script_name TEXT PRIMARY KEY,
exchange TEXT
)
""")
conn.commit()
Initialize database
setup_database()
def get_instrument_file():
“”“Fetch instrument file from Dhan if needed”“”
global instrument_df
current_date = time.strftime(“%Y-%m-%d”)
expected_file = f’all_instrument {current_date}.csv’
for item in os.listdir("Dependencies"):
path = os.path.join("Dependencies", item)
if item.startswith("all_instrument") and current_date not in item.split(" ")[1]:
if os.path.isfile(path):
os.remove(path)
if expected_file in os.listdir("Dependencies"):
try:
print(f"Reading existing file {expected_file}")
instrument_df = pd.read_csv(f"Dependencies/{expected_file}", low_memory=False)
except Exception:
print("Instrument file is incomplete, fetching a new file from Dhan...")
instrument_df = pd.read_csv("https://images.dhan.co/api-data/api-scrip-master.csv", low_memory=False)
instrument_df.to_csv(f"Dependencies/{expected_file}")
else:
print("Fetching new instrument file from Dhan...")
instrument_df = pd.read_csv("https://images.dhan.co/api-data/api-scrip-master.csv", low_memory=False)
instrument_df.to_csv(f"Dependencies/{expected_file}")
return instrument_df
Dhan API credentials
client_id = “1100831340”
access_token = “”
Fetch instrument file
instrument_df = get_instrument_file()
def create_instruments():
“”“Fetch watchlist from database and create instrument list”“”
instruments =
rows = {}
instrument_exchange = {
'NSE': "NSE", 'BSE': "BSE", 'NFO': 'NSE', 'BFO': 'BSE',
'MCX': 'MCX', 'CUR': 'NSE', 'BSE_IDX': 'BSE', 'NSE_IDX': 'NSE'
}
exchange_id = {
'NSE': marketfeed.NSE, 'BSE': marketfeed.BSE, 'MCX': marketfeed.MCX,
'NFO': marketfeed.NSE_FNO, 'BFO': marketfeed.BSE_FNO, 'IDX': marketfeed.IDX,
'BSE_IDX': marketfeed.IDX, 'NSE_IDX': marketfeed.IDX
}
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("SELECT script_name, exchange FROM watchlist")
watchlist = cursor.fetchall()
for tradingsymbol, exchange_ in watchlist:
try:
exchange = instrument_exchange[exchange_]
security_id = instrument_df[
((instrument_df['SEM_TRADING_SYMBOL'] == tradingsymbol) |
(instrument_df['SEM_CUSTOM_SYMBOL'] == tradingsymbol)) &
(instrument_df['SEM_EXM_EXCH_ID'] == exchange)
].iloc[-1]['SEM_SMST_SECURITY_ID']
exchange_segment = exchange_id[exchange_]
instruments.append((exchange_segment, str(security_id), marketfeed.Quote))
rows[security_id] = tradingsymbol # Store mapping in dict
except Exception as e:
print(f"Error: {e} for {tradingsymbol}")
continue
return instruments, rows
def run_feed(client_id, access_token, instruments):
“”“WebSocket feed to receive live market data and store in SQLite”“”
try:
data = marketfeed.DhanFeed(client_id, access_token, instruments)
rows = {}
while True:
response = data.get_data()
if response:
print(f"{datetime.datetime.now().time()}: LTP Received")
if 'LTP' in response.keys():
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO market_data (security_id, ltp, avg_price, volume,
total_sell_quantity, open, close, high, low, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(security_id) DO UPDATE SET
ltp = excluded.ltp,
avg_price = excluded.avg_price,
volume = excluded.volume,
total_sell_quantity = excluded.total_sell_quantity,
open = excluded.open,
close = excluded.close,
high = excluded.high,
low = excluded.low,
updated_at = CURRENT_TIMESTAMP
""", (
response['security_id'], response['LTP'], response['avg_price'],
response['volume'], response['total_sell_quantity'], response['open'],
response['close'], response['high'], response['low']
))
conn.commit()
except Exception as e:
print(f"WebSocket connection error: {e}")
print("Reconnecting...")
run_feed(client_id, access_token, instruments) # Retry connection
def main_loop():
“”“Main function to fetch instruments and start WebSocket feed”“”
instruments, rows = create_instruments()
run_feed(client_id, access_token, instruments)
if name == “main”:
main_loop()