Hello there. I am trying to build two different Node.js services and make them able to communicate over XMTP with each other and reply other XMTP accounts outside of this group as well. Each of them listens the messages via client.conversations.streamAllMessages()
with an argument of message handler function.
Still cannot figure it out the source of the issue, but sometimes one service doesn’t receive the message sent by the other one. I can confirm that the message is sent from one service but on the other side, the handler/callback function of .streamAllMessages()
is not triggered. Since the code is built on top of my abstraction layer it would be hard to copy and paste it as a whole. That’s why I’ll give some portions of it to provide context:
This is the listener method where the .streamAllMessages()
is called:
private async listen() {
await this.client!.conversations.sync();
const stream = await this.client!.conversations.streamAllMessages(
(err, message) => {
if (err) {
console.log(
`XMTP v3 error while streaming the messages: ${err?.stack || err}`
);
return;
}
this.handleMessage(message);
}
);
// ...
}
And methods to send a message:
type DMConversation = Awaited<ReturnType<XMTPv3Client["findOrCreateDm"]>>;
/**
* Finds inboxId for the given EOA address
*/
private async addressToInboxId(address: Address | string) {
const targetInboxId = await this.client!.getInboxIdByIdentifier({
identifierKind: IdentifierKind.Ethereum,
identifier: address,
});
if (targetInboxId === null) {
throw new Error("Target is not available");
}
return targetInboxId;
}
/**
* Finds or creates a new DM for the given inbox ID
*/
private async findOrCreateDm(peerInboxId: string) {
const dm = this.client!.conversations.getDmByInboxId(peerInboxId);
// Use existing DM there is
if (dm !== undefined) {
return dm;
}
const newDm = await this.client!.conversations.newDm(peerInboxId);
return newDm;
}
private async sendMessage(to: string | DMConversation, content: unknown) {
this.checkInit();
let dm: DMConversation;
if (typeof to === "string") {
const inboxId = await this.addressToInboxId(to);
dm = await this.findOrCreateDm(inboxId);
} else {
dm = to;
}
return await dm.send(JSON.stringify(content))
}
private async function send(to: string, req: any): Promise<any> {
const peerInboxId = await this.addressToInboxId(to);
const dm = await this.findOrCreateDm(peerInboxId);
const requestId = uuidv7();
return await new Promise<any>((res, rej) => {
const clear = () => {
this.removeListener("response", responseHandler);
clearTimeout(timeout);
};
const timeout = setTimeout(() => {
clear();
rej(new Error("XMTP v3 request"));
}, req.timeout || 30_000);
const responseHandler = (response: any | undefined) => {
// If the response is undefined that means the listener has stopped
if (response === undefined) {
return rej(new Error("Connection closed before receiving the response"));
}
// If this response doesn't belong to the request that we sent, ignore it
if (response.id !== requestId) {
return;
}
clear();
res(response);
};
// Add event listener for handling the response
// this event will be triggered by `this.handleMessage` from `listen()` method
this.addListener("response", responseHandler);
const errorHandler = (e: unknown) => {
clear();
rej(e);
};
this.sendMessage(dm, {
...req,
id: requestId,
requester: this.account.address,
})
.then(() =>
console.log(`XMTP v3: Message sent to ${to} with id ${requestId}`)
)
.catch(errorHandler);
});
}
- All of them located in a class called
XMTPv3Client
- This class is used as both listener and sender
this.handleMessage
takes and parse the content of the message that is received and sends back a response viathis.sendMessage
this.client
refers to the XMTP Client instance which is created viaClient.create
.- After the client creation, I call
this.client!.conversations.syncAll([ConsentState.Allowed, ConsentState.Denied, ConsentState.Unknown])
only once. this.account
refers to the Account object created viaprivateKeyToAccount
fromviem
package- I am using @xmtp/node-sdk 2.1.0
- The purposes of those two services are:
- Communicate with each other and to some tasks based on the messages that are being sent.
- Communicate with any XMTP account and provide information that asked by them
- But the issue (stream stops receiving messages) only happens with the communication between two services
Please let me know if the context is not enough. Thanks!