Websoket issue connecting with sqlite3

import datetime
import warnings
import sqlite3
import pandas as pd
import time
import os
from dhanhq import marketfeed

warnings.filterwarnings(“ignore”)

Database setup

DB_FILE = “market_data.db”

def setup_database():
“”“Create necessary tables if they don’t exist”“”
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()

    # Create table for storing market data
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS market_data (
            security_id TEXT PRIMARY KEY,
            ltp REAL,
            avg_price REAL,
            volume INTEGER,
            total_sell_quantity INTEGER,
            open REAL,
            close REAL,
            high REAL,
            low REAL,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    # Create table for storing watchlist
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS watchlist (
            script_name TEXT PRIMARY KEY,
            exchange TEXT
        )
    """)

    conn.commit()

Initialize database

setup_database()

def get_instrument_file():
“”“Fetch instrument file from Dhan if needed”“”
global instrument_df
current_date = time.strftime(“%Y-%m-%d”)
expected_file = f’all_instrument {current_date}.csv’

for item in os.listdir("Dependencies"):
    path = os.path.join("Dependencies", item)
    if item.startswith("all_instrument") and current_date not in item.split(" ")[1]:
        if os.path.isfile(path):
            os.remove(path)

if expected_file in os.listdir("Dependencies"):
    try:
        print(f"Reading existing file {expected_file}")
        instrument_df = pd.read_csv(f"Dependencies/{expected_file}", low_memory=False)
    except Exception:
        print("Instrument file is incomplete, fetching a new file from Dhan...")
        instrument_df = pd.read_csv("https://images.dhan.co/api-data/api-scrip-master.csv", low_memory=False)
        instrument_df.to_csv(f"Dependencies/{expected_file}")
else:
    print("Fetching new instrument file from Dhan...")
    instrument_df = pd.read_csv("https://images.dhan.co/api-data/api-scrip-master.csv", low_memory=False)
    instrument_df.to_csv(f"Dependencies/{expected_file}")

return instrument_df

Dhan API credentials

client_id = “1100831340”
access_token = “”

Fetch instrument file

instrument_df = get_instrument_file()

def create_instruments():
“”“Fetch watchlist from database and create instrument list”“”
instruments =
rows = {}

instrument_exchange = {
    'NSE': "NSE", 'BSE': "BSE", 'NFO': 'NSE', 'BFO': 'BSE',
    'MCX': 'MCX', 'CUR': 'NSE', 'BSE_IDX': 'BSE', 'NSE_IDX': 'NSE'
}
exchange_id = {
    'NSE': marketfeed.NSE, 'BSE': marketfeed.BSE, 'MCX': marketfeed.MCX,
    'NFO': marketfeed.NSE_FNO, 'BFO': marketfeed.BSE_FNO, 'IDX': marketfeed.IDX,
    'BSE_IDX': marketfeed.IDX, 'NSE_IDX': marketfeed.IDX
}

with sqlite3.connect(DB_FILE) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT script_name, exchange FROM watchlist")
    watchlist = cursor.fetchall()

for tradingsymbol, exchange_ in watchlist:
    try:
        exchange = instrument_exchange[exchange_]
        security_id = instrument_df[
            ((instrument_df['SEM_TRADING_SYMBOL'] == tradingsymbol) |
             (instrument_df['SEM_CUSTOM_SYMBOL'] == tradingsymbol)) &
            (instrument_df['SEM_EXM_EXCH_ID'] == exchange)
            ].iloc[-1]['SEM_SMST_SECURITY_ID']

        exchange_segment = exchange_id[exchange_]
        instruments.append((exchange_segment, str(security_id), marketfeed.Quote))
        rows[security_id] = tradingsymbol  # Store mapping in dict

    except Exception as e:
        print(f"Error: {e} for {tradingsymbol}")
        continue

return instruments, rows

def run_feed(client_id, access_token, instruments):
“”“WebSocket feed to receive live market data and store in SQLite”“”
try:
data = marketfeed.DhanFeed(client_id, access_token, instruments)
rows = {}

    while True:
        response = data.get_data()
        if response:
            print(f"{datetime.datetime.now().time()}: LTP Received")
            if 'LTP' in response.keys():
                with sqlite3.connect(DB_FILE) as conn:
                    cursor = conn.cursor()
                    cursor.execute("""
                        INSERT INTO market_data (security_id, ltp, avg_price, volume, 
                                                total_sell_quantity, open, close, high, low, updated_at)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                        ON CONFLICT(security_id) DO UPDATE SET
                            ltp = excluded.ltp,
                            avg_price = excluded.avg_price,
                            volume = excluded.volume,
                            total_sell_quantity = excluded.total_sell_quantity,
                            open = excluded.open,
                            close = excluded.close,
                            high = excluded.high,
                            low = excluded.low,
                            updated_at = CURRENT_TIMESTAMP
                    """, (
                        response['security_id'], response['LTP'], response['avg_price'],
                        response['volume'], response['total_sell_quantity'], response['open'],
                        response['close'], response['high'], response['low']
                    ))
                    conn.commit()

except Exception as e:
    print(f"WebSocket connection error: {e}")
    print("Reconnecting...")
    run_feed(client_id, access_token, instruments)  # Retry connection

def main_loop():
“”“Main function to fetch instruments and start WebSocket feed”“”
instruments, rows = create_instruments()
run_feed(client_id, access_token, instruments)

if name == “main”:
main_loop()