Stream stops receiving messages

Hello there. I am trying to build two different Node.js services and make them able to communicate over XMTP with each other and reply other XMTP accounts outside of this group as well. Each of them listens the messages via client.conversations.streamAllMessages() with an argument of message handler function.

Still cannot figure it out the source of the issue, but sometimes one service doesn’t receive the message sent by the other one. I can confirm that the message is sent from one service but on the other side, the handler/callback function of .streamAllMessages() is not triggered. Since the code is built on top of my abstraction layer it would be hard to copy and paste it as a whole. That’s why I’ll give some portions of it to provide context:

This is the listener method where the .streamAllMessages() is called:

private async listen() {
  await this.client!.conversations.sync();

  const stream = await this.client!.conversations.streamAllMessages(
    (err, message) => {
      if (err) {
        console.log(
          `XMTP v3 error while streaming the messages: ${err?.stack || err}`
        );
        return;
      }

      this.handleMessage(message);
    }
  );
  // ...
}

And methods to send a message:


type DMConversation = Awaited<ReturnType<XMTPv3Client["findOrCreateDm"]>>;

/**
 * Finds inboxId for the given EOA address
*/
private async addressToInboxId(address: Address | string) {
  const targetInboxId = await this.client!.getInboxIdByIdentifier({
    identifierKind: IdentifierKind.Ethereum,
    identifier: address,
  });

  if (targetInboxId === null) {
    throw new Error("Target is not available");
  }

  return targetInboxId;
}

/**
 * Finds or creates a new DM for the given inbox ID
 */
private async findOrCreateDm(peerInboxId: string) {
  const dm = this.client!.conversations.getDmByInboxId(peerInboxId);

  // Use existing DM there is
  if (dm !== undefined) {
    return dm;
  }

  const newDm = await this.client!.conversations.newDm(peerInboxId);

  return newDm;
}

private async sendMessage(to: string | DMConversation, content: unknown) {
  this.checkInit();

  let dm: DMConversation;

  if (typeof to === "string") {
    const inboxId = await this.addressToInboxId(to);
    dm = await this.findOrCreateDm(inboxId);
  } else {
    dm = to;
  }

  return await dm.send(JSON.stringify(content))
}


private async function send(to: string, req: any): Promise<any> {
  const peerInboxId = await this.addressToInboxId(to);
  const dm = await this.findOrCreateDm(peerInboxId);
  const requestId = uuidv7();

  return await new Promise<any>((res, rej) => {
    const clear = () => {
      this.removeListener("response", responseHandler);
      clearTimeout(timeout);
    };

    const timeout = setTimeout(() => {
      clear();
      rej(new Error("XMTP v3 request"));
    }, req.timeout || 30_000);

    const responseHandler = (response: any | undefined) => {
      // If the response is undefined that means the listener has stopped
      if (response === undefined) {
        return rej(new Error("Connection closed before receiving the response"));
      }

      // If this response doesn't belong to the request that we sent, ignore it
      if (response.id !== requestId) {
        return;
      }

      clear();
      res(response);
    };

    // Add event listener for handling the response
    // this event will be triggered by `this.handleMessage` from `listen()` method
    this.addListener("response", responseHandler);

    const errorHandler = (e: unknown) => {
      clear();
      rej(e);
    };

    this.sendMessage(dm, {
      ...req,

      id: requestId,
      requester: this.account.address,
    })
      .then(() =>
        console.log(`XMTP v3: Message sent to ${to} with id ${requestId}`)
      )
      .catch(errorHandler);
  });
}
  • All of them located in a class called XMTPv3Client
  • This class is used as both listener and sender
  • this.handleMessage takes and parse the content of the message that is received and sends back a response via this.sendMessage
  • this.client refers to the XMTP Client instance which is created via Client.create.
  • After the client creation, I call this.client!.conversations.syncAll([ConsentState.Allowed, ConsentState.Denied, ConsentState.Unknown]) only once.
  • this.account refers to the Account object created via privateKeyToAccount from viem package
  • I am using @xmtp/node-sdk 2.1.0
  • The purposes of those two services are:
  • Communicate with each other and to some tasks based on the messages that are being sent.
  • Communicate with any XMTP account and provide information that asked by them
  • But the issue (stream stops receiving messages) only happens with the communication between two services

Please let me know if the context is not enough. Thanks!

1 Like

hey @mkaramuk, thanks for the detailed writeup!

i’m not sure i fully understand your issue. are you saying that messages from other users are coming through the streamAllMessages callback, but not service-to-service messages?

1 Like

Hey @rygine, thanks for the reply!

Yeah, that’s right. Since I couldn’t figure out the source of this behavior, I am not sure about that but, sometimes I also experience it with other users as well. But primarily on service-to-service communication. By the way once before I was using .streamAllDmMessages and facing with the same issue too. I think there is no significant difference between those two methods right?

@rygine I was trying to write a whole example that reproduces the issue. I hope it helps to understand better, please have a look:

import { Client, IdentifierKind, Signer } from "@xmtp/node-sdk";
import { Address, Hex, hexToBytes } from "viem";
import { privateKeyToAccount } from "viem/accounts";

function generateEncryptionKey(input: string) {
  // Be sure input is longer than 32 character
  if (input.length < 32) {
    input = input.repeat(32 % input.length);
  }

  const encoder = new TextEncoder();
  const encoded = encoder.encode(input);
  const fixedLengthArray = new Uint8Array(32);
  fixedLengthArray.set(encoded.slice(0, 32));

  return fixedLengthArray;
}
async function main() {
  const key = process.argv[2]! as Hex;
  const target = process.argv[3]! as Address;
  const account = privateKeyToAccount(key);
  console.log("account:", account.address);

  const signed: Signer = {
    type: "EOA",
    getIdentifier: async () => ({
      identifier: account.address,
      identifierKind: IdentifierKind.Ethereum,
    }),
    signMessage: async (message: string) => {
      return hexToBytes(await account.signMessage({ message }));
    },
  };

  const client = await Client.create(signed, {
    dbEncryptionKey: generateEncryptionKey(key),
    env: "production",
    dbPath: `./xmtp-db-${account.address}.db`,
  });

  console.log("Syncing all");
  await client.conversations.syncAll();

  const stream = await client.conversations.streamAllMessages();
  process.on("SIGINT", () => {
    stream.end();
    clearInterval(interval);
    console.log(new Date(), "stream end");
  });

  const interval = setInterval(async () => {
    if (stream.isDone) return;

    const inboxId = await client.getInboxIdByIdentifier({
      identifierKind: IdentifierKind.Ethereum,
      identifier: target as Address,
    });
    if (!inboxId) {
      console.error(new Date(), "Target inbox not found");
      return;
    }

    const dm =
      client.conversations.getDmByInboxId(inboxId) ||
      (await client.conversations.newDm(inboxId));

    const timestamp = Date.now();
    const data = Math.floor(Math.random() * 1000).toString();
    await dm.send(
      JSON.stringify({
        timestamp,
        message: `Data: ${data}`,
        data,
      })
    );
    console.log(new Date(), "data sent    ", data);
  }, 1_000);

  for await (const msg of stream) {
    if (msg === undefined || msg.senderInboxId === client.inboxId) continue;
    console.log(
      new Date(),
      "data received",
      JSON.parse(msg.content?.toString() || "{}").data
    );
  }

  console.log(new Date(), "over");
}

main();

I’ve started two different processes of this code with different parameters. E.g first one uses second one’s account address and second one uses first one’s account address as the target. I’ve left them for a while then;

In the end I’ve faced with rate limit error which make sense since the interval value is equal to 1 second. But this doesn’t explain why at that point they’ve stopped receiving messages from each other. Because after the rate limit error, the processes continued to operate on.

Then I’ve restarted them but the issue was persist (they weren’t receiving messages from each other). [At this point I forgot to test if they were receiving messages from another account. My assumption is they weren’t] Then I deleted the databases and tried again for a couple of times, and now they are working but I don’t know when they are going to stop.

Edit: I’ve changed the interval to 3 seconds and after about 30 minutes of the process start, it stopped receiving messages from the other service. But it receives messages from another account.

1 Like

Hey @rygine, any updates on this?

Honestly this issue has been bothering us for a long time and we have started thinking of dropping XMTP and implementing another protocol. So I would be appreciate it if you could tell me when this issue can be solved or we can iterate to move forward. Thanks!

1 Like

@mkaramuk are you experiencing this issue on a local node?

No, both in dev and production environments

ok, if it’s working as intended on a local node, it may have something to do with the rate limiting. have you tried to implement a queue instead of sending a message every second?

I haven’t tried using a local node, will check it. I just need to to follow the instructions, right?

About the rate limiting; In my actual implementation, it doesn’t send messages in every second. It has a workflow like below;

  1. Service A: An event happens (mostly an interval tick)
  2. Service A: Sends a message to Service B and waits a corresponding message
  3. Service B: Receives the message and starts a background job and replies (sends) back another message to inform Service A.
  4. Service A: Waits couple of seconds then enters a while loop,
    4.1 Service A: Sends a message and asks what is status of the job? Waits max 10 seconds for a reply from Service B. If it is not received in that time range, this step is repeated.
    4.2 Service B: Receives the message from Service A and replies back with the job status.
    4.3 Service A: If the job status is not done, goes back to 4.1
    4.4 Service A: If the job status is done, breaks the loop and continue its work.

I think there are enough sleep times between the steps. For example Service A waits until the XMTP network transfers the first message to Service B and then transfers Service B’s response back.

But even so, it doesn’t start receiving messages in the second iteration of the workflow. If that is related with rate limiting, shouldn’t it start transferring messages again?

we setup this repo to help with running a local node on your machine. let me know if you have any issues.

1 Like

I’ve tested it with a local node but the result is the same.

They have started working and exchanging messages:

and stopped at this point (after about an hour later):

The message send interval was 3 seconds (they were sending messages to each other in every three seconds)

Note: The keys are random keys so that’s fine to include in the screenshots

I don’t know whether it is helpful but also realized something;

If I made the interval too short (like 50 or 100ms) the issue occurs faster like in a few minutes (or 30-40 seconds, it has been a while since I’ve tested this).

So I thought that maybe it is related with the data size or something? Because it seems like it happens after certain amount of messages are exchanged. Or at least this is my theory.

This part is not unexpected; But also stream.end() and client.send() were stuck and the process wasn’t able to exit after the issue happens. Probably something just messed up on the daemon side because of the concurrent requests.

Isn’t the stream just hanging ? Did you found a real fix for this? I found the same issue, can not keep a stream alive, after some time messages won’t come through. I ‘patched’, didn’t fix, by restarting my full code every couple minutes (easier to drop all the process and restart than to clean connections and clients) but would realy love for the stream to just stay streaming or have some restart mechanism under the hood, so i dont have to know about it

1 Like

Hey @Javier_SL ! Unfortunately, no I couldn’t find a solution and still waiting for a fix/workaround from the XMTP team.

You mentioned that you are restarting the process(es) as a patch, but in my case, I cannot do that because my services initialize bunch of stuff at the startup.

And regardless from that, if I remember correct I had also tried to end a stream and start another one (simply call .end() then .streamAllMessages() again) in a regular interval. But at that time the messages between ending and starting the stream weren’t received.

@rygine do you think the behavior that I mentioned above was expected or did I something wrong?