Error code is 1006
I updated the byte content in request based on exchange segment list in Annexure - DhanHQ Ver 1.0 / API Document. But the call is not coming in onMessage() now.
@Override
public void onClose(int code, String reason, boolean remote) {
LOGGER.info("Connection closed with exit code {}. Additional info: {}", code, reason);
}
prints
Connection closed with exit code 1006. Additional info:
Complete code:
package com.tradingbot.dhan.component;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.tradingbot.dhan.dto.Instrument;
import com.tradingbot.dhan.dto.MarketDepth;
import com.tradingbot.dhan.dto.MarketDepthData;
import com.tradingbot.dhan.dto.OIData;
import com.tradingbot.dhan.dto.PreviousCloseData;
import com.tradingbot.dhan.dto.QuoteData;
import com.tradingbot.dhan.dto.ResponseHeader;
import com.tradingbot.dhan.dto.TickerData;
@Component
public class DhanWebSocketClient extends WebSocketClient {
private static final Logger LOGGER = LoggerFactory.getLogger(DhanWebSocketClient.class);
private static final String SOCKET_URI = "wss://api-feed.dhan.co";
private static final String CLIENT_ID = "xxxxxxxxx";
private static final String API_ACCESS_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
public DhanWebSocketClient() throws Exception {
super(new URI(SOCKET_URI));
}
@PostConstruct
public void start() {
this.connect();
}
@Override
public void onOpen(ServerHandshake handshakedata) {
LOGGER.info("Connection opened");
sendAuthorizationPacket();
subscribeInstruments();
}
@Override
public void onMessage(String message) {
LOGGER.info("Received message: {}", message);
}
@Override
public void onMessage(ByteBuffer bytes) {
ResponseHeader header = new ResponseHeader();
header.setFeedResponseCode(bytes.get());
header.setMessageLength(bytes.getShort());
header.setExchangeSegment(bytes.get());
header.setSecurityId(bytes.getInt());
LOGGER.info("Feed Response Code: {}", header.getFeedResponseCode());
switch (header.getFeedResponseCode()) {
case 50:
handleDisconnection(bytes.getShort());
break;
case 2:
TickerData tickerData = parseTickerData(header, bytes);
LOGGER.info("Ticker Data: {}", tickerData);
break;
case 4:
QuoteData quoteData = parseQuoteData(header, bytes);
LOGGER.info("Quote Data: {}", quoteData);
break;
case 5:
OIData oiData = parseOIData(header, bytes);
LOGGER.info("OI Data: {}", oiData);
break;
case 3:
MarketDepthData marketDepthData = parseMarketDepthData(header, bytes);
LOGGER.info("Market Depth Data: {}", marketDepthData);
break;
case 6:
PreviousCloseData previousCloseData = parsePreviousCloseData(header, bytes);
LOGGER.info("Previous Close Data: {}", previousCloseData);
break;
case 7:
LOGGER.info("Market Status: Open/Close notification");
break;
default:
LOGGER.warn("Unknown response code: {}", header.getFeedResponseCode());
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
LOGGER.info("Connection closed with exit code {}. Additional info: {}", code, reason);
}
@Override
public void onError(Exception ex) {
LOGGER.error("WebSocket error: ", ex);
}
/**
* Sends the authorization packet to establish connection with DhanHQ WebSocket.
* The authorization packet consists of a binary message with header and API access token.
*/
private void sendAuthorizationPacket() {
ByteBuffer buffer = ByteBuffer.allocate(585);
// Header
buffer.put((byte) 11); // Feed Request Code to connect new feed
buffer.putShort((short) 585); // Message Length
// Client ID
putPaddedBytes(buffer, CLIENT_ID.getBytes(), 30);
// Dhan Auth - passed as zero
buffer.put(new byte[50]);
// API Access Token
putPaddedBytes(buffer, API_ACCESS_TOKEN.getBytes(), 500);
// Authentication Type - 2P by default
buffer.putShort((short) 2);
buffer.flip();
send(buffer);
}
/**
* Subscribes to market data for a list of instruments.
* A bulk subscribe request is sent with a header and up to 100 instruments.
*/
private void subscribeInstruments() {
List<Instrument> instruments = createInstrumentsList();
if (instruments.size() > 100) {
throw new IllegalArgumentException("Cannot subscribe more than 100 instruments in a single request.");
}
ByteBuffer buffer = ByteBuffer.allocate(2188);
// Header
buffer.put((byte) 15); // Subscription message code
buffer.putShort((short) 2188); // Message length
putPaddedBytes(buffer, CLIENT_ID.getBytes(), 30);
buffer.put(new byte[50]); // Dhan Auth - passed as zero
buffer.putInt(instruments.size()); // Number of instruments
// Instruments
for (Instrument instrument : instruments) {
buffer.put(mapExchangeSegmentToByte(instrument.getExchangeSegment()));
putPaddedBytes(buffer, instrument.getSecurityId().getBytes(), 20);
}
// Padding
buffer.put(new byte[2100 - instruments.size() * 21]);
// Logging buffer content
logBufferContent(buffer);
buffer.flip();
send(buffer);
}
/**
* Logs the content of the buffer for debugging purposes.
* Converts the buffer content to a hex string representation.
*/
private void logBufferContent(ByteBuffer buffer) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < buffer.position(); i++) {
sb.append(String.format("%02X ", buffer.get(i)));
}
LOGGER.info("Buffer contents before sending: {}", sb.toString());
}
/**
* Maps exchange segment strings to corresponding byte values.
*/
private byte mapExchangeSegmentToByte(String exchangeSegment) {
switch (exchangeSegment) {
case "IDX_I":
return 0;
case "NSE_EQ":
return 1;
case "NSE_FNO":
return 2;
case "NSE_CURRENCY":
return 3;
case "BSE_EQ":
return 4;
case "MCX_COMM":
return 5;
case "BSE_CURRENCY":
return 7;
case "BSE_FNO":
return 8;
default:
throw new IllegalArgumentException("Unknown exchange segment: " + exchangeSegment);
}
}
/**
* Adds padding bytes to the buffer.
* Ensures that the byte array fits the specified length by adding zero padding if necessary.
*/
private void putPaddedBytes(ByteBuffer buffer, byte[] bytes, int length) {
byte[] paddedBytes = new byte[length];
System.arraycopy(bytes, 0, paddedBytes, 0, Math.min(bytes.length, length));
buffer.put(paddedBytes);
}
/**
* Handles disconnection messages received from the server.
* Logs the disconnection reason based on the disconnection message code.
*/
private void handleDisconnection(short disconnectionMessageCode) {
String reason;
switch (disconnectionMessageCode) {
case 805:
reason = "Connection limit exceeded";
break;
case 806:
reason = "Data APIs not subscribed";
break;
case 807:
reason = "Access token is expired";
break;
case 808:
reason = "Authentication Failed - Check Client ID";
break;
case 809:
reason = "Access token is invalid";
break;
default:
reason = "Unknown reason code " + disconnectionMessageCode;
}
LOGGER.info("Disconnection Reason: {}", reason);
}
/**
* Parses ticker data from the received binary message.
* Constructs a TickerData object with the extracted information.
*/
private TickerData parseTickerData(ResponseHeader header, ByteBuffer bytes) {
TickerData tickerData = new TickerData();
tickerData.setHeader(header);
tickerData.setLastTradedPrice(bytes.getFloat());
tickerData.setLastTradeTime(bytes.getInt());
return tickerData;
}
/**
* Parses quote data from the received binary message.
* Constructs a QuoteData object with the extracted information.
*/
private QuoteData parseQuoteData(ResponseHeader header, ByteBuffer bytes) {
QuoteData quoteData = new QuoteData();
quoteData.setHeader(header);
quoteData.setLatestTradedPrice(bytes.getFloat());
quoteData.setLastTradedQuantity(bytes.getShort());
quoteData.setLastTradeTime(bytes.getInt());
quoteData.setAverageTradePrice(bytes.getFloat());
quoteData.setVolume(bytes.getInt());
// quoteData.setValue(bytes.getFloat());
// quoteData.setOpen(bytes.getFloat());
// quoteData.setHigh(bytes.getFloat());
// quoteData.setLow(bytes.getFloat());
// quoteData.setClose(bytes.getFloat());
return quoteData;
}
/**
* Parses OI data from the received binary message.
* Constructs an OIData object with the extracted information.
*/
private OIData parseOIData(ResponseHeader header, ByteBuffer bytes) {
OIData oiData = new OIData();
oiData.setHeader(header);
// oiData.setLatestTradedPrice(bytes.getFloat());
// oiData.setLastTradedQuantity(bytes.getShort());
// oiData.setLastTradeTime(bytes.getInt());
// oiData.setAverageTradePrice(bytes.getFloat());
// oiData.setVolume(bytes.getInt());
// oiData.setValue(bytes.getFloat());
// oiData.setOpen(bytes.getFloat());
// oiData.setHigh(bytes.getFloat());
// oiData.setLow(bytes.getFloat());
// oiData.setClose(bytes.getFloat());
// oiData.setOi(bytes.getFloat());
return oiData;
}
/**
* Parses market depth data from the received binary message.
* Constructs a MarketDepthData object with the extracted information.
*/
private MarketDepthData parseMarketDepthData(ResponseHeader header, ByteBuffer bytes) {
MarketDepthData marketDepthData = new MarketDepthData();
marketDepthData.setHeader(header);
List<MarketDepth> buyMarketDepth = new ArrayList<>();
List<MarketDepth> sellMarketDepth = new ArrayList<>();
for (int i = 0; i < 5; i++) {
MarketDepth buyDepth = new MarketDepth();
// buyDepth.setPrice(bytes.getFloat());
// buyDepth.setQuantity(bytes.getInt());
// buyDepth.setOrder(bytes.getInt());
buyMarketDepth.add(buyDepth);
}
for (int i = 0; i < 5; i++) {
MarketDepth sellDepth = new MarketDepth();
// sellDepth.setPrice(bytes.getFloat());
// sellDepth.setQuantity(bytes.getInt());
// sellDepth.setOrder(bytes.getInt());
sellMarketDepth.add(sellDepth);
}
// marketDepthData.setBuyMarketDepth(buyMarketDepth);
// marketDepthData.setSellMarketDepth(sellMarketDepth);
return marketDepthData;
}
/**
* Parses previous close data from the received binary message.
* Constructs a PreviousCloseData object with the extracted information.
*/
private PreviousCloseData parsePreviousCloseData(ResponseHeader header, ByteBuffer bytes) {
PreviousCloseData previousCloseData = new PreviousCloseData();
previousCloseData.setHeader(header);
// previousCloseData.setClosePrice(bytes.getFloat());
return previousCloseData;
}
/**
* Creates a list of Instrument objects to subscribe to market data.
* Replace this with actual instrument data in a real-world scenario.
*/
private List<Instrument> createInstrumentsList() {
List<Instrument> instruments = new ArrayList<>();
// Add instruments to the list
instruments.add(new Instrument("IDX_I", "2"));//NIFTY
instruments.add(new Instrument("NSE_EQ", "10666"));//PNB
// Add more instruments as needed
return instruments;
}
}