Stream stops receiving messages

Hello there. I am trying to build two different Node.js services and make them able to communicate over XMTP with each other and reply other XMTP accounts outside of this group as well. Each of them listens the messages via client.conversations.streamAllMessages() with an argument of message handler function.

Still cannot figure it out the source of the issue, but sometimes one service doesn’t receive the message sent by the other one. I can confirm that the message is sent from one service but on the other side, the handler/callback function of .streamAllMessages() is not triggered. Since the code is built on top of my abstraction layer it would be hard to copy and paste it as a whole. That’s why I’ll give some portions of it to provide context:

This is the listener method where the .streamAllMessages() is called:

private async listen() {
  await this.client!.conversations.sync();

  const stream = await this.client!.conversations.streamAllMessages(
    (err, message) => {
      if (err) {
        console.log(
          `XMTP v3 error while streaming the messages: ${err?.stack || err}`
        );
        return;
      }

      this.handleMessage(message);
    }
  );
  // ...
}

And methods to send a message:


type DMConversation = Awaited<ReturnType<XMTPv3Client["findOrCreateDm"]>>;

/**
 * Finds inboxId for the given EOA address
*/
private async addressToInboxId(address: Address | string) {
  const targetInboxId = await this.client!.getInboxIdByIdentifier({
    identifierKind: IdentifierKind.Ethereum,
    identifier: address,
  });

  if (targetInboxId === null) {
    throw new Error("Target is not available");
  }

  return targetInboxId;
}

/**
 * Finds or creates a new DM for the given inbox ID
 */
private async findOrCreateDm(peerInboxId: string) {
  const dm = this.client!.conversations.getDmByInboxId(peerInboxId);

  // Use existing DM there is
  if (dm !== undefined) {
    return dm;
  }

  const newDm = await this.client!.conversations.newDm(peerInboxId);

  return newDm;
}

private async sendMessage(to: string | DMConversation, content: unknown) {
  this.checkInit();

  let dm: DMConversation;

  if (typeof to === "string") {
    const inboxId = await this.addressToInboxId(to);
    dm = await this.findOrCreateDm(inboxId);
  } else {
    dm = to;
  }

  return await dm.send(JSON.stringify(content))
}


private async function send(to: string, req: any): Promise<any> {
  const peerInboxId = await this.addressToInboxId(to);
  const dm = await this.findOrCreateDm(peerInboxId);
  const requestId = uuidv7();

  return await new Promise<any>((res, rej) => {
    const clear = () => {
      this.removeListener("response", responseHandler);
      clearTimeout(timeout);
    };

    const timeout = setTimeout(() => {
      clear();
      rej(new Error("XMTP v3 request"));
    }, req.timeout || 30_000);

    const responseHandler = (response: any | undefined) => {
      // If the response is undefined that means the listener has stopped
      if (response === undefined) {
        return rej(new Error("Connection closed before receiving the response"));
      }

      // If this response doesn't belong to the request that we sent, ignore it
      if (response.id !== requestId) {
        return;
      }

      clear();
      res(response);
    };

    // Add event listener for handling the response
    // this event will be triggered by `this.handleMessage` from `listen()` method
    this.addListener("response", responseHandler);

    const errorHandler = (e: unknown) => {
      clear();
      rej(e);
    };

    this.sendMessage(dm, {
      ...req,

      id: requestId,
      requester: this.account.address,
    })
      .then(() =>
        console.log(`XMTP v3: Message sent to ${to} with id ${requestId}`)
      )
      .catch(errorHandler);
  });
}
  • All of them located in a class called XMTPv3Client
  • This class is used as both listener and sender
  • this.handleMessage takes and parse the content of the message that is received and sends back a response via this.sendMessage
  • this.client refers to the XMTP Client instance which is created via Client.create.
  • After the client creation, I call this.client!.conversations.syncAll([ConsentState.Allowed, ConsentState.Denied, ConsentState.Unknown]) only once.
  • this.account refers to the Account object created via privateKeyToAccount from viem package
  • I am using @xmtp/node-sdk 2.1.0
  • The purposes of those two services are:
  • Communicate with each other and to some tasks based on the messages that are being sent.
  • Communicate with any XMTP account and provide information that asked by them
  • But the issue (stream stops receiving messages) only happens with the communication between two services

Please let me know if the context is not enough. Thanks!

1 Like

hey @mkaramuk, thanks for the detailed writeup!

i’m not sure i fully understand your issue. are you saying that messages from other users are coming through the streamAllMessages callback, but not service-to-service messages?

1 Like

Hey @rygine, thanks for the reply!

Yeah, that’s right. Since I couldn’t figure out the source of this behavior, I am not sure about that but, sometimes I also experience it with other users as well. But primarily on service-to-service communication. By the way once before I was using .streamAllDmMessages and facing with the same issue too. I think there is no significant difference between those two methods right?