Hey guys, I’m trying to get market depth for one of my instruments. could you please help me with the standard example code. Or could you please let me know why below code is not working.
class DhanFeed:
def init(self, client_id, access_token, instruments, subscription_code, on_connect=None, on_message=None, on_close=None):
self.client_id = client_id
self.access_token = access_token
self.instruments = instruments
self.subscription_code = subscription_code
self.ws = None
self.on_connect = on_connect
self.on_message = on_message
self.on_close = on_close
self.loop = asyncio.get_event_loop()
self.is_authorized = False
async def run_forever(self):
await self.connect()
async def connect(self):
print("Connecting to WebSocket...")
if not self.ws or self.ws.closed:
self.ws = await websockets.connect(WSS_URL)
helper = DhanSDKHelper(self)
await helper.on_connection_established(self.ws)
await self.authorize()
await self.subscribe_instruments()
while True:
try:
response = await self.ws.recv()
print("Raw message received:", response)
self.data = self.process_data(response)
print("Processed message:", self.data)
await helper.on_message_received(self.data)
except websockets.exceptions.ConnectionClosed:
print("Connection has been closed")
break
except Exception as e:
print(f"Error while receiving message: {e}")
async def authorize(self):
try:
print("Authorizing...")
api_access_token = self.access_token.encode('utf-8')
api_access_token = self.pad_with_zeros(api_access_token, 500)
authentication_type = "2P".encode('utf-8')
payload = api_access_token + authentication_type
feed_request_code = 11
message_length = 83 + len(api_access_token) + len(authentication_type)
client_id = self.client_id.encode('utf-8')
client_id = self.pad_with_zeros(client_id, 30)
dhan_auth = b"\0" * 50
header = struct.pack('<bH30s50s', feed_request_code, message_length, client_id, dhan_auth)
authorization_packet = header + payload
await self.ws.send(authorization_packet)
self.is_authorized = True
print("Authorization successful")
except Exception as e:
print(f"Authorization failed: {e}")
self.is_authorized = False
async def subscribe_instruments(self):
if not self.is_authorized:
print("Not authorized. Please authorize first.")
return
print("Subscribing to instruments...")
subscription_packet = self.create_subscription_packet(self.instruments, self.subscription_code)
await self.ws.send(subscription_packet)
print("Subscription successful")
def process_data(self, data):
first_byte = struct.unpack('<B', data[0:1])[0]
if first_byte == 2:
return self.process_ticker(data)
elif first_byte == 3:
return self.process_market_depth(data)
elif first_byte == 4:
return self.process_quote(data)
elif first_byte == 5:
return self.process_oi(data)
elif first_byte == 6:
return self.process_prev_close(data)
elif first_byte == 7:
return self.process_status(data)
elif first_byte == 50:
return self.server_disconnection(data)
def process_market_depth(self, data):
market_data = [struct.unpack('<BHBIf100s', data[0:112])]
market_data_list = list(market_data[0][0:5])
market_depth_binary = market_data[0][5]
packet_format = '<IIHHff'
for i in range(5):
start_idx = i * struct.calcsize(packet_format)
end_idx = start_idx + struct.calcsize(packet_format)
current_packet = struct.unpack(packet_format, market_depth_binary[start_idx:end_idx])
market_data_list.append(current_packet)
exchange_segment = market_data_list[2]
sec_id = market_data_list[3]
ltp = market_data_list[4]
depth1 = market_data_list[5]
depth2 = market_data_list[6]
depth3 = market_data_list[7]
depth4 = market_data_list[8]
depth5 = market_data_list[9]
depth = [{"bid_quantity": depth1[0], "ask_quantity": depth1[1], "bid_orders": depth1[2], "ask_orders": depth1[3], "bid_price": depth1[4], "ask_price": depth1[5]},
{"bid_quantity": depth2[0], "ask_quantity": depth2[1], "bid_orders": depth2[2], "ask_orders": depth2[3], "bid_price": depth2[4], "ask_price": depth2[5]},
{"bid_quantity": depth3[0], "ask_quantity": depth3[1], "bid_orders": depth3[2], "ask_orders": depth3[3], "bid_price": depth3[4], "ask_price": depth3[5]},
{"bid_quantity": depth4[0], "ask_quantity": depth4[1], "bid_orders": depth4[2], "ask_orders": depth4[3], "bid_price": depth4[4], "ask_price": depth4[5]},
{"bid_quantity": depth5[0], "ask_quantity": depth5[1], "bid_orders": depth5[2], "ask_orders": depth5[3], "bid_price": depth5[4], "ask_price": depth5[5]}]
market_depth = {
"type": 'Market Depth',
"exchange_segment": exchange_segment,
"security_id": sec_id,
"LTP": ltp,
"depth": depth
}
return market_depth
def pad_with_zeros(self, data, length):
data = data.ljust(length, b'\0')
return data
def create_header(self, feed_request_code, message_length, client_id):
dhan_auth = b"\0" * 50
header = struct.pack('<bH30s50s', feed_request_code, message_length, client_id.encode('utf-8'), dhan_auth)
return header
def create_subscription_packet(self, instruments, feed_request_code):
num_instruments = len(instruments)
header = self.create_header(feed_request_code=feed_request_code, message_length=83 + 4 + num_instruments * 21, client_id=self.client_id)
num_instruments_bytes = struct.pack('<I', num_instruments)
instrument_info = b""
for exchange_segment, security_id in instruments:
instrument_info += struct.pack('<B20s', exchange_segment, security_id.encode('utf-8'))
subscription_packet = header + num_instruments_bytes + instrument_info
return subscription_packet
class DhanSDKHelper:
def init(self, sdk_instance):
self.sdk_instance = sdk_instance
async def on_connection_established(self, websocket):
print("WebSocket connection established")
if self.sdk_instance.on_connect:
await self.sdk_instance.on_connect(self.sdk_instance)
async def on_message_received(self, response):
print("Message received")
if self.sdk_instance.on_message:
await self.sdk_instance.on_message(self.sdk_instance, response)
async def on_close(self, websocket, close_status=None, close_msg=None):
print(f"WebSocket closed with status {close_status}: {close_msg}")
await websocket.close()
self.sdk_instance.ws = None
Function to process market depth messages
def process_market_depth_message(message):
try:
if message[‘type’] == ‘Market Depth’:
print(“Market Depth message received:”, message)
security_id = message[‘security_id’]
ltp = message[‘LTP’]
depth = message[‘depth’]
# Verify the structure of the depth data
if depth and isinstance(depth, list) and all(isinstance(d, dict) for d in depth):
columns = ['bid_quantity', 'bid_price', 'bid_orders', 'ask_quantity', 'ask_price', 'ask_orders']
df = pd.DataFrame(depth, columns=columns)
df['Security ID'] = security_id
df['Latest Traded Price'] = ltp
print(df)
for row in df.itertuples(index=False):
sheet.append_row(list(row))
else:
print("Invalid market depth data structure:", depth)
else:
print("Unknown message type:", message['type'])
except Exception as e:
print(f"Error processing market depth message: {e}")
Function to handle incoming messages
async def on_message(instance, message):
print(“Received:”, message)
process_market_depth_message(message)
Main function
async def main():
print(“Subscription code:”, subscription_code)
feed = DhanFeed(client_id, access_token, instruments, subscription_code, on_connect=on_connect, on_message=on_message)
await feed.run_forever()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Stopping the feed...")
Callback function for connection established
async def on_connect(instance):
print(“Connected to websocket”)
if name == “main”:
asyncio.run(main())